El conocimiento es el nuevo dinero.
Aprender es la nueva manera en la que inviertes
Acceso Cursos

Integración de NestJS con Amazon SQS: Una guía completa

En esta guía, exploraremos el proceso de integración de NestJS, un potente framework Node.js, con Amazon Simple Queue Service (SQS), un servicio de colas de mensajes totalmente gestionado proporcionado por Amazon Web Services (AWS).

· 5 min de lectura
Integración de NestJS con Amazon SQS: Una guía completa

En el vertiginoso mundo actual de las aplicaciones web, el procesamiento eficaz de los trabajos es primordial.

Ya se trate de enviar correos electrónicos, procesar datos o realizar tareas en segundo plano, un sistema de cola de trabajos sólido puede optimizar el rendimiento de su aplicación.

¿Por qué NestJS y Amazon SQS?


NestJS, un marco Node.js progresivo, ha ganado una inmensa popularidad para crear aplicaciones del lado del servidor eficientes y escalables.

Por otro lado, Amazon SQS proporciona una solución fiable y escalable para desacoplar los componentes de una aplicación en la nube. Combinando estas tecnologías, podemos aprovechar las ventajas de ambas para crear un sistema de procesamiento de trabajos sin fisuras.

Pasos para la integración
Paso 1: Configuración del entorno de desarrollo


Antes de sumergirnos en el código, asegurémonos de que nuestro entorno de desarrollo está listo. Necesitarás Node.js instalado, así como NestJS, AWS SDK y la biblioteca NestJS-SQS.

He aquí cómo instalar las dependencias necesarias:

# Install NestJS and Dependencies
npm install --save @nestjs/core @nestjs/common aws-sdk @ssut/nestjs-sqs

Paso 2: ProducerModule - Envío de trabajos a SQS


En NestJS, el ProducerModule juega un papel crucial en el envío de trabajos a SQS. Configura el servicio AWS SQS y proporciona ProducerService para interactuar con él.

Veamos cómo configurar el ProducerModule:

import { Module } from '@nestjs/common';
import { AWS } from 'aws-sdk';
import { ProducerService } from './producer.service';
import { TypeOrmModule } from '@nestjs/typeorm';
import { QueueJob } from './entities/queue-job.entity';
import { QueueJobService } from './queue-job.service';

@Module({
  imports: [TypeOrmModule.forFeature([QueueJob])],
  providers: [
    {
      provide: AWS.SQS, // Provide the SQS service from the AWS SDK
      useFactory: (configService: ConfigService) => {
        const region = configService.get<string>('sqs.region');
        const accessKeyId = configService.get<string>('sqs.accessKeyId');
        const secretAccessKey = configService.get<string>(
          'sqs.secretAccessKey',
        );
        const sqsConfig = {
          region,
          accessKeyId,
          secretAccessKey,
        };
        return new AWS.SQS(sqsConfig);
      },
      inject: [ConfigService],
    },
    ProducerService,
    QueueJobService,
  ],
  exports: [    AWS.SQS,
    ProducerService,
    QueueJobService,
    TypeOrmModule.forFeature([QueueJob]),],
})
export class ProducerModule {}

Paso 3: Entidad QueueJob - Seguimiento del estado del trabajo


Para asegurar un seguimiento robusto del estado de los trabajos en una base de datos, creamos la entidad QueueJob.

Esta entidad nos ayuda a monitorizar el progreso de cada trabajo.

He aquí un ejemplo simplificado de la entidad QueueJob:

@Entity()
export class QueueJob extends BaseEntity {
    @Column({ unique: true })
    message_id: string;


    @Column('json')
    message: any;

    @Column('json')
    entity: any;

    @Column()
    queue: string;


    @Column()
    job_type: string;

    @Column({default:0})
    @Index()
    status: number;

    @Column({default:0})
    counter: number;

    @Column('json', { nullable: true })
    error: any;

    @Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
    created_at: Date;
  
   
    @Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
    updated_at: Date;


    // Add other relevant fields as needed
  }

Paso 4: ProducerService - Envío de trabajos a SQS


El ProducerService es responsable de enviar los trabajos a AWS SQS. Prepara los detalles del trabajo, y los atributos del mensaje, y los envía a la cola. Exploremos el método de envío dentro del ProducerService:

export interface SQSMessage {
    QueueUrl: string;
    MessageBody: string;
    MessageGroupId?: string;
    MessageDeduplicationId?: string;
    DelaySeconds?: number;
    
  }

export interface Job {
  DataType: string;
  value: string;
}
export interface MessageAttributes {
  job: Job;
}

export interface MessageBody {
  messageId:string;
  message: any;
  date: string;
  MessageAttributes: MessageAttributes;
}

@Injectable()
export class ProducerService {
  constructor(
    private readonly configService: ConfigService,
    private readonly queueJobService: QueueJobService,
  ) {}

  send(message: any, jobType: string, messageGroupId: string = 'general') {
    if (!JOB_TYPES.includes(jobType)) {
      throw new BadRequestException('Invalid job type');
    }
    const region = this.configService.get<string>('sqs.region');
    const accessKeyId = this.configService.get<string>('sqs.accessKeyId');
    const secretAccessKey = this.configService.get<string>(
      'sqs.secretAccessKey',
    );
    const isFifo: boolean = JSON.parse(this.configService.get('sqs.isFifo'));

    const messageId = uuidv4();
    let sqsMessage: SQSMessage = {
      QueueUrl: this.configService.get<string>('sqs.url'),
      MessageBody: JSON.stringify({
        messageId,
        message,
        MessageAttributes: {
          job: {
            DataType: 'string',
            value: jobType,
          },
        },
      } as MessageBody),
    };
    if (isFifo == true) {
      sqsMessage = {
        ...sqsMessage,
        MessageGroupId: messageGroupId,
        MessageDeduplicationId: messageId,
      };
    }
    const sqs = new AWS.SQS({
      region,
      accessKeyId,
      secretAccessKey,
    });
    console.log('sqsMessage:', sqsMessage);

    const input: Partial<QueueJob> = {
      message_id: messageId,
      message: sqsMessage,
      entity: message,
      job_type: jobType,
      queue: this.configService.get<string>('sqs.queue_name'),
    };
    return defer(() => this.queueJobService.save(input)).pipe(
      switchMap(() => {
        return from(sqs.sendMessage(sqsMessage).promise()).pipe(
          tap(() => {
            return true;
          }),
          catchError((error) => {
            throw new InternalServerErrorException(error);
          }),
        );
      }),
    );
  }
}

Paso 5: ConsumerModule - Manejo de trabajos desde SQS


En NestJS, el ConsumerModule está diseñado para manejar los trabajos entrantes de la cola SQS. Configura los consumidores de AWS SQS y define cómo procesar los mensajes.

He aquí cómo configurar el ConsumerModule:

@Module({
  imports: [
    ProducerModule,
    SqsModule.registerAsync({
      imports: [ConfigModule], // Import the ConfigModule to use the ConfigService
      useFactory: async (configService: ConfigService) => {
        const accessKeyId = configService.get<string>('sqs.accessKeyId');
        const secretAccessKey = configService.get<string>(
          'sqs.secretAccessKey',
        );

        // Retrieve the required configuration values using ConfigService
        return {
          consumers: [
            {
              name: configService.get<string>('sqs.queue_name'), // name of the queue
              queueUrl: configService.get<string>('sqs.url'), // url of the queue
              region: configService.get<string>('sqs.region'), // using the same region for the producer
              batchSize: 10, // number of messages to receive at once
              // visibilityTimeout:10,
              // waitTimeSeconds:300,
              terminateGracefully: true, // gracefully shutdown when SIGINT/SIGTERM is received
              sqs: new SQSClient({
                region: configService.get<string>('sqs.region'),
                credentials: {
                  accessKeyId: accessKeyId,
                  secretAccessKey: secretAccessKey,
                },
              }),
            },
          ],
          producers: [],
        };
      },
      inject: [ConfigService],
    }),
  ],
  controllers: [],
  providers: [
  ],
  exports: [ConsumerService, JobHandlerFactory],
})
export class ConsumerModule {}

Paso 6: ConsumerService - Procesamiento de mensajes de trabajo


El ConsumerService es una parte fundamental de la integración. Procesa los mensajes de trabajo, comprueba los diferentes tipos de trabajo y delega las tareas en consecuencia.

El método handleMessage en el ConsumerService es donde ocurre la magia:

@Injectable()
export class ConsumerService {
  constructor(
    private readonly queueJobService: QueueJobService,
    private readonly jobHandlerFactory: JobHandlerFactory,
  ) {}

  @SqsMessageHandler(/** name: */ config.sqs.queue_name, /** batch: */ false)
  async handleMessage(message: Message) {
    const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;
    console.log('Consumer  Start ....:', msgBody.messageId);

    if (!JOB_TYPES.includes(msgBody.MessageAttributes.job.value)) {
      Logger.error('Invalid job type ' + msgBody.MessageAttributes.job.value);
      throw new InternalServerErrorException(
        'Invalid job type ' + msgBody.MessageAttributes.job.value,
      );
    }

    try {
      //Todo
      // handle the message here
   
    } catch (error) {
      console.log('consumer error', JSON.stringify(error));
      //keep the message in sqs
      Logger.error(error.message);
      throw new InternalServerErrorException(error);
   
    }
  }

Paso 7: Gestión de errores y mecanismos de reintento


Ningún sistema de procesamiento de trabajos está completo sin mecanismos sólidos de gestión de errores y reintentos

En esta sección, discutiremos estrategias para el manejo de errores, incluyendo tiempos de espera y errores de procesamiento. También proporcionaremos fragmentos de código para su referencia.

@SqsConsumerEventHandler(
    /** name: */ config.sqs.queue_name,
    /** eventName: */ 'timeout_error',
  )
  async onTimeoutError(error: Error, message: Message) {
    const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;

    Logger.error(error.message);
    return this.queueJobService
      .findOneBy({ message_id: msgBody.messageId })
      .pipe(
        switchMap((queueJob) => {
          if (queueJob) {
            return this.queueJobService.update(queueJob.id, {
              counter: queueJob.counter + 1,
              status: QueueJobStatus.TIMEOUT,
              error: JSON.parse(JSON.stringify(error)),
              updated_at: () => 'CURRENT_TIMESTAMP',
            });
          }
          return of(true);
        }),
      )
      .subscribe((res) => {
        Logger.error('Timeout Error messageId : ' + msgBody.messageId);
      });
  }

  @SqsConsumerEventHandler(
    /** name: */ config.sqs.queue_name,
    /** eventName: */ 'processing_error',
  )
  async onProcessingError(error: Error, message: Message) {
    const msgBody: MessageBody = JSON.parse(message.Body) as MessageBody;
    Logger.error(error.message);
    return this.queueJobService
      .findOneBy({ message_id: msgBody.messageId })
      .pipe(
        switchMap((queueJob) => {
          if (queueJob) {
            return this.queueJobService.update(queueJob.id, {
              counter: queueJob.counter + 1,
              status: QueueJobStatus.FAILED,
              error: JSON.parse(JSON.stringify(error)),
              updated_at: () => 'CURRENT_TIMESTAMP',
            });
          }
          return of(true);
        }),
      )
      .subscribe();
  }

Paso 8: Seguimiento del progreso de los trabajos y limpieza


Para garantizar la integridad de su sistema de procesamiento de trabajos, debe realizar un seguimiento del progreso de los trabajos y realizar limpiezas periódicas. El QueueJobService es responsable de estas tareas.

Compartiremos fragmentos de código y las mejores prácticas para el seguimiento del progreso y la programación de la limpieza.

@Injectable()
export class QueueJobService extends BaseService<QueueJob> {
  private readonly logger = new Logger(QueueJobService.name);
  constructor(
    @InjectRepository(QueueJob)
    protected readonly repository: Repository<QueueJob>,
  ) {
    super(repository);
  }

Paso 9: Pruebas y despliegue


Antes de desplegar su integración NestJS-SQS en producción, es crucial realizar pruebas exhaustivas. Proporcionaremos instrucciones sobre la ejecución de pruebas y la creación de la aplicación para el despliegue en producción.

# Run tests
npm test

# Build for production
npm build

Fuente

Conclusión


La integración de NestJS con Amazon SQS aporta la potencia de un sólido sistema de procesamiento de trabajos a sus aplicaciones. Con la eficiencia de NestJS y la escalabilidad de AWS SQS, puede optimizar su flujo de trabajo de procesamiento de trabajos. Esta completa guía le ha llevado a través de los pasos necesarios, desde la configuración de su entorno de desarrollo hasta la gestión de mensajes de trabajo y el seguimiento del progreso del trabajo.

Siéntase libre de explorar el código completo y llevar sus capacidades de procesamiento de trabajos al siguiente nivel. Si aprovechas estas tecnologías, podrás crear aplicaciones eficientes y escalables.

¡Feliz programación!