From 9277dbb03371dbfcb8906f0265a373f657238e42 Mon Sep 17 00:00:00 2001 From: Le Roux Bodenstein Date: Fri, 23 Aug 2024 15:17:45 +0100 Subject: [PATCH] ImportWriter unit tests --- .../src/import/import-writer.spec.ts | 225 ++++++++++++++++++ .../src/import/import-writer.ts | 30 ++- 2 files changed, 243 insertions(+), 12 deletions(-) diff --git a/packages/compass-import-export/src/import/import-writer.spec.ts b/packages/compass-import-export/src/import/import-writer.spec.ts index e69de29bb2d..3cc77653ed2 100644 --- a/packages/compass-import-export/src/import/import-writer.spec.ts +++ b/packages/compass-import-export/src/import/import-writer.spec.ts @@ -0,0 +1,225 @@ +import sinon from 'sinon'; +import { expect } from 'chai'; + +import { ImportWriter } from './import-writer'; + +const BATCH_SIZE = 1000; + +type FakeError = Error & { + result: { + getWriteErrors?: () => Error[]; + }; +}; + +function getDataService({ + isFLE, + throwErrors, +}: { + isFLE: boolean; + throwErrors: boolean; +}) { + return { + bulkWrite: (ns: string, docs: any[] /*, options: any*/) => { + return new Promise((resolve, reject) => { + if (isFLE && docs.length !== 1) { + const error: any = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + return reject(error); + } + + if (throwErrors) { + const error = new Error('fake bulkWrite error'); + (error as FakeError).result = { + getWriteErrors: () => { + const errors: Error[] = []; + for (let i = 0; i < docs.length; ++i) { + const writeError = new Error(`Fake error for doc ${i}`); + delete writeError.stack; + errors.push(writeError); + } + return errors; + }, + }; + delete error.stack; // slows down tests due to excess output + return reject(error); + } + + resolve({ + insertedCount: docs.length, + matchedCount: 0, + modifiedCount: 0, + deletedCount: 0, + upsertedCount: 0, + ok: 1, + }); + }); + }, + + insertOne: () => { + if (throwErrors) { + const error = new Error('fake insertOne error'); + delete error.stack; // slows down tests due to excess output + return Promise.reject(error); + } + + return Promise.resolve({ acknowledged: true }); + }, + }; +} + +function getExpectedNumBatches( + numDocs: number, + isFLE: boolean, + stopOnErrors: boolean +) { + if (stopOnErrors) { + return 1; + } + + if (isFLE) { + // one attempted batch at the batch size (followed by insertOne() on retry), then subsequent batches are all size 1. + return numDocs > BATCH_SIZE ? 1 + numDocs - BATCH_SIZE : 1; + } + + return Math.ceil(numDocs / BATCH_SIZE); +} + +function getExpectedDocsInBatch( + batchNum: number, + numDocs: number, + isFLE: boolean +) { + if (batchNum === 1) { + return Math.min(numDocs, BATCH_SIZE); + } + + if (isFLE && batchNum > 1) { + return 1; + } + + const numBatches = getExpectedNumBatches(numDocs, isFLE, false); + + return batchNum < numBatches + ? BATCH_SIZE + : numDocs - (batchNum - 1) * BATCH_SIZE; +} + +describe('ImportWriter', function () { + const docs: { i: number }[] = []; + for (let i = 0; i < BATCH_SIZE * 2 + 1; ++i) { + docs.push({ i }); + } + + for (const isFLE of [true, false]) { + it(`inserts documents ${isFLE ? 'one by one' : 'in batches'} to ${ + isFLE ? 'FLE2' : 'regular' + } collection`, async function () { + const numBatches = getExpectedNumBatches(docs.length, isFLE, false); + + const dataService = getDataService({ isFLE, throwErrors: false }); + + const bulkWriteSpy = sinon.spy(dataService, 'bulkWrite'); + const insertOneSpy = sinon.spy(dataService, 'insertOne'); + + const writer = new ImportWriter(dataService as any, 'db.col', false); + + for (const doc of docs) { + await writer.write(doc); + } + + await writer.finish(); + + expect(bulkWriteSpy.callCount).to.equal(numBatches); + for (const [index, args] of bulkWriteSpy.args.entries()) { + const [, _docs] = args; + const expected = getExpectedDocsInBatch(index + 1, docs.length, isFLE); + expect(_docs.length).to.equal(expected); + } + if (isFLE) { + expect(insertOneSpy.callCount).to.equal(BATCH_SIZE); + } else { + expect(insertOneSpy.callCount).to.equal(0); + } + }); + + for (const stopOnErrors of [true, false]) { + it(`${stopOnErrors ? 'stops' : 'does not stop'} on the first error for ${ + isFLE ? 'FLE2' : 'regular' + } collection if stopOnErrors is ${stopOnErrors}`, async function () { + const dataService = getDataService({ isFLE, throwErrors: true }); + + const bulkWriteSpy = sinon.spy(dataService, 'bulkWrite'); + const insertOneSpy = sinon.spy(dataService, 'insertOne'); + + const writer = new ImportWriter( + dataService as any, + 'db.col', + stopOnErrors + ); + + // It always throws, it just depends if it finished the batch or not and + // whether it threw the first database error itself or a wrapped error + // that wraps all the errors in the batch + try { + for (const doc of docs) { + await writer.write(doc); + } + + await writer.finish(); + } catch (err: any) { + if (stopOnErrors) { + if (isFLE) { + expect(err.message).to.equal('fake insertOne error'); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(1); + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + // stop after the first insertOne call + expect(writer.docsErrored).to.equal(1); + } else { + expect(err.message).to.equal('fake bulkWrite error'); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(0); + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + // stop after the first bulkWrite call. in this case the whole + // first batch failed which is why there are so many docsErrored + // (see our mocks above) + expect(writer.docsErrored).to.equal(1000); + } + } else { + if (isFLE) { + expect(err.message).to.equal( + 'Something went wrong while writing data to a collection' + ); + expect(err.writeErrors).to.have.length(1000); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(1000); + + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + expect(writer.docsErrored).to.equal(1000); + } else { + expect(err.message).to.equal( + 'Something went wrong while writing data to a collection' + ); + expect(err.writeErrors).to.have.length(1000); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(0); + + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + expect(writer.docsErrored).to.equal(1000); + } + } + + return; + } + + expect.fail('expected to throw regardless'); + }); + } + } +}); diff --git a/packages/compass-import-export/src/import/import-writer.ts b/packages/compass-import-export/src/import/import-writer.ts index 9f5ceb59eb0..0ebea11be2b 100644 --- a/packages/compass-import-export/src/import/import-writer.ts +++ b/packages/compass-import-export/src/import/import-writer.ts @@ -47,7 +47,7 @@ function writeErrorToJSError({ code, index, }: WriteError): ImportWriterProgressError { - const op = err.op; + const op = err?.op; const e: ImportWriterProgressError = new Error(errmsg) as any; e.index = index; @@ -112,6 +112,8 @@ export class ImportWriter { async _executeBatch() { const documents = this.batch; + this.docsProcessed += documents.length; + this.batch = []; let bulkWriteResult: PartialBulkWriteResult; @@ -136,12 +138,8 @@ export class ImportWriter { if (bulkWriteError.code === 6371202) { this.BATCH_SIZE = 1; - bulkWriteResult = await this._insertOneByOne(); + bulkWriteResult = await this._insertOneByOne(documents); } else { - if (this.stopOnErrors) { - throw bulkWriteError; - } - // If we are writing with `ordered: false`, bulkWrite will throw and // will not return any result, but server might write some docs and bulk // result can still be accessed on the error instance @@ -150,7 +148,15 @@ export class ImportWriter { // when the operation ends in error, instead of relying on // `_mergeBulkOpResult` default argument substitution, we need to keep // this OR expression here - bulkWriteResult = (bulkWriteError as MongoBulkWriteError).result || {}; + bulkWriteResult = ((bulkWriteError as MongoBulkWriteError).result || + {}) as PartialBulkWriteResult; + + if (this.stopOnErrors) { + this.docsWritten += bulkWriteResult.insertedCount || 0; + this.docsErrored += + (bulkWriteResult.getWriteErrors?.() || []).length || 0; + throw bulkWriteError; + } } } @@ -161,7 +167,6 @@ export class ImportWriter { ); this.docsWritten += bulkOpResult.insertedCount; - this.docsProcessed += documents.length; this.docsErrored += bulkOpResult.numWriteErrors; this._batchCounter++; @@ -170,9 +175,9 @@ export class ImportWriter { } } - async _insertOneByOne(): Promise { - const documents = this.batch; - + async _insertOneByOne( + documents: Document[] + ): Promise { let insertedCount = 0; const errors: WriteError[] = []; @@ -182,11 +187,12 @@ export class ImportWriter { insertedCount += 1; } catch (insertOneByOneError: any) { if (this.stopOnErrors) { + this.docsWritten += insertedCount; + this.docsErrored += 1; throw insertOneByOneError; } errors.push(insertOneByOneError as WriteError); - this.docsErrored += 1; } }