Loopback from DLX to Healthy Queue (AWS SQS)
JS
S
JavaScriptSnippet to Loopback from DLX to a normal Queue for processing after poisoned message clear up. // const moveFromDlxToQueueLib = await import('../migrations/move-from-dlx-to-queue'); // moveFromDlxToNormal = new moveFromDlxToQueueLib.default(); // moveFromDlxToNormal.start();
1import { Consumer } from 'sqs-consumer';
2import * as AWS from 'aws-sdk';
3import * as SQS from 'aws-sdk/clients/sqs';
4import clone from 'clone';
5import logService from '../services/log';
6
7let queueNormal: any;
8
9class DlxToNormal {
10 public queueNormal: SQS;
11 public sqsParamsBluePrint: any;
12 public queueDlxConsumer: Consumer;
13
14 constructor() {
15 AWS.config.update({
16 accessKeyId: process.env.AWS_ACCESS_KEY_ID,
17 secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
18 region: process.env.AWS_REGION,
19 });
20 queueNormal = new AWS.SQS();
21 logService.log(`notice`, `AWS Connection Configured`);
22 logService.log(`notice`, `Attempting to retrieve messages..`, { queue: process.env.MIGRATION_QUEUE_DLX });
23
24 this.sqsParamsBluePrint = {
25 QueueUrl: process.env.MIGRATION_QUEUE,
26 MessageBody: '',
27 MessageAttributes: {},
28 };
29 }
30
31 async handleDlx(message: any): Promise<void> {
32 console.log('handleDlx', message.Body);
33 const sendGoodRequest: any = (msg: any) => {
34 logService.log('info', 'SQS Sender send migration to SQS');
35 return new Promise((resolve, reject) => {
36 const sqsParamsBluePrint = {
37 QueueUrl: process.env.MIGRATION_QUEUE,
38 MessageBody: '',
39 MessageAttributes: {},
40 };
41 const params = clone(sqsParamsBluePrint);
42 const attributesMap: SQS.MessageBodyAttributeMap = {
43 type: {
44 DataType: 'String',
45 StringValue: 'normal',
46 },
47 tenantId: {
48 DataType: 'String',
49 StringValue: process.env.TENANT_ID,
50 },
51 };
52 params.MessageBody = JSON.stringify(msg);
53 params.MessageAttributes = attributesMap;
54 queueNormal.sendMessage(params, (err: Error) => {
55 if (err) {
56 logService.log('error', 'message failed', { err });
57 reject(err);
58 } else {
59 logService.log('info', 'message sent');
60 resolve(true);
61 }
62 });
63 });
64 };
65 return new Promise(async (resolve, reject) => {
66 try {
67 await sendGoodRequest(JSON.parse(message.Body));
68 resolve();
69 } catch (err) {
70 logService.log(`error`, 'failed to process SQS message', err);
71 reject(err);
72 }
73 });
74 }
75
76 async start() {
77 try {
78 this.queueDlxConsumer = Consumer.create({
79 queueUrl: process.env.MIGRATION_QUEUE_DLX,
80 handleMessage: this.handleDlx,
81 });
82 this.queueDlxConsumer.on('error', (err: Error) => {
83 if (err) throw err;
84 });
85 this.queueDlxConsumer.start();
86 } catch (err) {
87 throw err;
88 }
89 }
90}
91
92export default DlxToNormal;
93Created on 10/12/2020