1import { Consumer } from 'sqs-consumer';
2import * as AWS from 'aws-sdk';
3import * as SQS from 'aws-sdk/clients/sqs';
4
5import logService from '../services/log-service';
6declare type SQSMessage = SQS.Types.Message;
7
8class SqsConsumerService {
9 public queue: SQS;
10 private subscriptionsConsumer: Consumer;
11 private scheduledTransactionsConsumer: Consumer;
12 constructor() {
13 AWS.config.update({
14 accessKeyId: process.env.AWS_ACCESS_KEY_ID,
15 secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
16 region: process.env.AWS_REGION
17 });
18 this.queue = new AWS.SQS();
19 logService.log(`notice`, `AWS Connection Configured`);
20 }
21
22 async handleScheduledTransaction(message: SQSMessage): Promise<void> {
23 return new Promise((resolve, reject) => {
24 const msgObject = JSON.parse(message.Body);
25 logService.log(`info`, 'new message', msgObject);
26 return resolve();
27 // reject() to send back to queue
28 });
29 }
30
31 async handleDowngradedSubscription(message: SQSMessage): Promise<void> {
32 return new Promise((resolve, reject) => {
33 const msgObject = JSON.parse(message.Body);
34 logService.log(`info`, 'new message', msgObject);
35 return resolve();
36 // reject() to send back to queue
37 });
38 }
39
40 startListeners() {
41 logService.log(`notice`, `SQS Consumers attached`);
42 // SQS Consumers
43 this.scheduledTransactionsConsumer = Consumer.create({
44 queueUrl: process.env.SQS_RECURRING_URI,
45 handleMessage: this.handleScheduledTransaction
46 });
47 this.scheduledTransactionsConsumer.on('error', (err: Error) => {
48 if (err) throw(err);
49 });
50 this.scheduledTransactionsConsumer.start();
51
52 this.subscriptionsConsumer = Consumer.create({
53 queueUrl: process.env.SQS_DOWNGRADED_URI,
54 handleMessage: this.handleDowngradedSubscription
55 });
56 this.subscriptionsConsumer.on('error', (err: Error) => {
57 if (err) throw(err);
58 });
59 this.subscriptionsConsumer.start();
60 }
61
62
63}
64
65export default new SqsConsumerService();
66Created on 3/7/2019