Skip to content

Commit

Permalink
fixing runtime errors around error handling and batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Baalmart committed Oct 9, 2024
1 parent c58bbda commit a15161c
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions src/device-registry/bin/jobs/v3-store-readings-job.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ async function processDocument(doc) {

// Update Reading
const filter = { site_id: doc.site_id, time: docTime.toDate() };
const updateDoc = { ...doc, time: docTime.toDate() };
delete updateDoc._id;
const { _id, ...docWithoutId } = doc;
const updateDoc = { ...docWithoutId, time: docTime.toDate() };
await ReadingModel("airqo").updateOne(filter, updateDoc, {

Check warning on line 107 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L104-L107

Added lines #L104 - L107 were not covered by tests
upsert: true,
});
Expand Down Expand Up @@ -190,16 +190,27 @@ const fetchAndStoreDataIntoReadingsModel = async () => {
async (bail) => {
try {

Check warning on line 191 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L191

Added line #L191 was not covered by tests
// Process each document in the current batch
for (const doc of batch) {
if (doc.device_id) activeDeviceIds.add(doc.device_id);
if (doc.site_id) activeSiteIds.add(doc.site_id);
await processDocument(doc); // Call processDocument directly
}
await Promise.all(

Check warning on line 193 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L193

Added line #L193 was not covered by tests
batch.map(async (doc) => {
if (doc.device_id) activeDeviceIds.add(doc.device_id);
if (doc.site_id) activeSiteIds.add(doc.site_id);
await processDocument(doc);

Check warning on line 197 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L195-L197

Added lines #L195 - L197 were not covered by tests
})
);
} catch (error) {
logger.error(`🐛🐛 Error processing batch: ${error.message}`);
// Use bail() if the error is not recoverable
if (error.isFatal) {
bail(error);
logObject("the error inside processing of batches", error);
if (error.name === "MongoError" && error.code !== 11000) {
logger.error(

Check warning on line 203 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L201-L203

Added lines #L201 - L203 were not covered by tests
`🐛🐛 MongoError -- fetchAndStoreDataIntoReadingsModel -- ${stringify(
error
)}`
);
throw error; // Retry the operation
} else if (error.code === 11000) {

Check warning on line 209 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L208-L209

Added lines #L208 - L209 were not covered by tests
// Ignore duplicate key errors
console.warn(

Check warning on line 211 in src/device-registry/bin/jobs/v3-store-readings-job.js

View check run for this annotation

Codecov / codecov/patch

src/device-registry/bin/jobs/v3-store-readings-job.js#L211

Added line #L211 was not covered by tests
`🙀🙀 Duplicate key error for document: ${stringify(doc)}`
);
}
}
},
Expand Down

0 comments on commit a15161c

Please sign in to comment.