SQS Queue Consumer (TypeScript)

JS
S
JavaScript

Simple SQS Consumer (TypeScript)

1import { Consumer } from 'sqs-consumer';
2import * as AWS from 'aws-sdk';
3import * as SQS from 'aws-sdk/clients/sqs';
4
5import logService from '../services/log-service';
6declare type SQSMessage = SQS.Types.Message;
7
8class SqsConsumerService {
9  public queue: SQS;
10  private subscriptionsConsumer: Consumer;
11  private scheduledTransactionsConsumer: Consumer;
12  constructor() {
13    AWS.config.update({
14      accessKeyId: process.env.AWS_ACCESS_KEY_ID,
15      secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
16      region: process.env.AWS_REGION
17    });
18    this.queue = new AWS.SQS();
19    logService.log(`notice`, `AWS Connection Configured`);
20  }
21
22  async handleScheduledTransaction(message: SQSMessage): Promise<void> {
23    return new Promise((resolve, reject) => {
24      const msgObject = JSON.parse(message.Body);
25      logService.log(`info`, 'new message', msgObject);
26      return resolve();
27      // reject() to send back to queue
28    });
29  }
30
31  async handleDowngradedSubscription(message: SQSMessage): Promise<void> {
32    return new Promise((resolve, reject) => {
33      const msgObject = JSON.parse(message.Body);
34      logService.log(`info`, 'new message', msgObject);
35      return resolve();
36      // reject() to send back to queue
37    });
38  }
39
40  startListeners() {
41    logService.log(`notice`, `SQS Consumers attached`);
42    // SQS Consumers
43    this.scheduledTransactionsConsumer = Consumer.create({
44      queueUrl: process.env.SQS_RECURRING_URI,
45      handleMessage: this.handleScheduledTransaction
46    });
47    this.scheduledTransactionsConsumer.on('error', (err: Error) => {
48      if (err) throw(err);
49    });
50    this.scheduledTransactionsConsumer.start();
51
52    this.subscriptionsConsumer = Consumer.create({
53      queueUrl: process.env.SQS_DOWNGRADED_URI,
54      handleMessage: this.handleDowngradedSubscription
55    });
56    this.subscriptionsConsumer.on('error', (err: Error) => {
57      if (err) throw(err);
58    });
59    this.subscriptionsConsumer.start();
60  }
61
62
63}
64
65export default new SqsConsumerService();
66

Created on 3/7/2019