Recursive Mongoose Queries for Large Collections

JS
S
JavaScript

Using a recursive function to query segments of documents based on MongoDB insert order (Ids'). Example: Query the first 400, then the next 400, and so on...

1var lastTelemetryId;
2function recursiveRunner() {
3  telemetrySet.log$.debug(`========it: ${iterations}==== recursiveRunner 100 == limit ${limit}  == lastTelemetryId ${lastTelemetryId} =============`);
4  const startTime = moment();
5  iterations += 100;
6  let docsDeleted = 0;
7  const year = 2017;
8  const begQuarter = moment().year(year).month(0).utc().startOf('month').toISOString();
9  const endQuarter = moment().year(year).month(11).utc().endOf('month').toISOString();
10
11  let query = {
12    createdAt: { $gte: begQuarter, $lte: endQuarter },
13  }
14
15  if (lastTelemetryId) {
16    query = {
17      createdAt: { $gte: begQuarter, $lte: endQuarter },
18      '_id': { '$gt': lastTelemetryId }
19    }
20  }
21
22  const pms = Telemetry.findDeleted(query)
23      .limit(limit)
24      .lean()
25      .exec()
26      .each(function handleTelemetry(telemetryDoc) {
27          lastTelemetryId = telemetryDoc._id;
28          if (telemetryDoc && telemetryDoc.deleted) {
29              console.log.debug('==> Telemetry found... ', telemetryDoc._id, telemetryDoc.type);
30              if (telemetryDoc.type === 'ping' || telemetryDoc.type === 'healthz')
31                  return telemetryDoc;
32              if (telemetryDoc.type === 'k8s-probe') {
33                  docsDeleted++;
34                  return Telemetry.remove({ _id: telemetryDoc._id });
35              }
36              return Pneumatics.findOne({ resources: telemetryDoc._id }).exec()
37                  .then((_telemetrySet) => {
38                      if (_telemetrySet) {
39                        // do stuff
40                      }
41                      return Telemetry.remove({ _id: telemetryDoc._id });
42                  })
43          } else {
44              return telemetryDoc;
45          }
46      })
47      .then(function logResult(resources) {
48          const endTime = moment();
49          console.log.debug(`===========${iterations}================`);
50          console.log.debug(`Telemetrys found: ${resources.length}`);
51          console.log.debug(`Docs deleted: ${docsDeleted}`);
52          console.log.debug(`Duration: ${endTime.diff(startTime, 'seconds')}s / ${endTime.diff(startTime, 'miliseconds')}ms`);
53          process.exit(0);
54      })
55      .catch(function handleError(err) {
56          console.log.error({ err }, `Something went wrong while updating telemetry sets`);
57          process.exit(1);
58      })
59
60      const all = Promise.all([pms]);
61      all.then(res => {
62        console.log.debug(`======== iteration finished =========`);
63        recursiveRunner(); // recursive
64      })
65};
66
67recursiveRunner();

Created on 2/5/2019