Loopback from DLX to Healthy Queue (AWS SQS)

JS
S
JavaScript

Snippet to Loopback from DLX to a normal Queue for processing after poisoned message clear up. // const moveFromDlxToQueueLib = await import('../migrations/move-from-dlx-to-queue'); // moveFromDlxToNormal = new moveFromDlxToQueueLib.default(); // moveFromDlxToNormal.start();

1import { Consumer } from 'sqs-consumer';
2import * as AWS from 'aws-sdk';
3import * as SQS from 'aws-sdk/clients/sqs';
4import clone from 'clone';
5import logService from '../services/log';
6
7let queueNormal: any;
8
9class DlxToNormal {
10  public queueNormal: SQS;
11  public sqsParamsBluePrint: any;
12  public queueDlxConsumer: Consumer;
13
14  constructor() {
15    AWS.config.update({
16      accessKeyId: process.env.AWS_ACCESS_KEY_ID,
17      secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
18      region: process.env.AWS_REGION,
19    });
20    queueNormal = new AWS.SQS();
21    logService.log(`notice`, `AWS Connection Configured`);
22    logService.log(`notice`, `Attempting to retrieve messages..`, { queue: process.env.MIGRATION_QUEUE_DLX });
23
24    this.sqsParamsBluePrint = {
25      QueueUrl: process.env.MIGRATION_QUEUE,
26      MessageBody: '',
27      MessageAttributes: {},
28    };
29  }
30
31  async handleDlx(message: any): Promise<void> {
32    console.log('handleDlx', message.Body);
33    const sendGoodRequest: any = (msg: any) => {
34      logService.log('info', 'SQS Sender send migration to SQS');
35      return new Promise((resolve, reject) => {
36        const sqsParamsBluePrint = {
37          QueueUrl: process.env.MIGRATION_QUEUE,
38          MessageBody: '',
39          MessageAttributes: {},
40        };
41        const params = clone(sqsParamsBluePrint);
42        const attributesMap: SQS.MessageBodyAttributeMap = {
43          type: {
44            DataType: 'String',
45            StringValue: 'normal',
46          },
47          tenantId: {
48            DataType: 'String',
49            StringValue: process.env.TENANT_ID,
50          },
51        };
52        params.MessageBody = JSON.stringify(msg);
53        params.MessageAttributes = attributesMap;
54        queueNormal.sendMessage(params, (err: Error) => {
55          if (err) {
56            logService.log('error', 'message failed', { err });
57            reject(err);
58          } else {
59            logService.log('info', 'message sent');
60            resolve(true);
61          }
62        });
63      });
64    };
65    return new Promise(async (resolve, reject) => {
66      try {
67        await sendGoodRequest(JSON.parse(message.Body));
68        resolve();
69      } catch (err) {
70        logService.log(`error`, 'failed to process SQS message', err);
71        reject(err);
72      }
73    });
74  }
75
76  async start() {
77    try {
78      this.queueDlxConsumer = Consumer.create({
79        queueUrl: process.env.MIGRATION_QUEUE_DLX,
80        handleMessage: this.handleDlx,
81      });
82      this.queueDlxConsumer.on('error', (err: Error) => {
83        if (err) throw err;
84      });
85      this.queueDlxConsumer.start();
86    } catch (err) {
87      throw err;
88    }
89  }
90}
91
92export default DlxToNormal;
93

Created on 10/12/2020