From 595bf9fca4c6fa2f563a45a0b99d5e2c4e14e06b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 25 Sep 2024 16:56:06 -0400 Subject: [PATCH 1/3] refactor(NODE-6398): bulkWrite internals to use async/await --- src/bulk/common.ts | 196 +++++++++++-------------- src/bulk/unordered.ts | 7 +- test/integration/crud/crud_api.test.ts | 36 ++--- 3 files changed, 101 insertions(+), 138 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index c133a57d22..374a0d5783 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -1,5 +1,3 @@ -import { promisify } from 'util'; - import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson'; import type { Collection } from '../collection'; import { @@ -7,6 +5,7 @@ import { MongoBatchReExecutionError, MONGODB_ERROR_CODES, MongoInvalidArgumentError, + MongoRuntimeError, MongoServerError, MongoWriteConcernError } from '../error'; @@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { applyRetryableWrites, - type Callback, getTopology, hasAtomicOperators, maybeAddIdToDocuments, @@ -500,86 +498,46 @@ export function mergeBatchResults( } } -function executeCommands( +async function executeCommands( bulkOperation: BulkOperationBase, - options: BulkWriteOptions, - callback: Callback -) { + options: BulkWriteOptions +): Promise { if (bulkOperation.s.batches.length === 0) { - return callback( - undefined, - new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) - ); + return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); } - const batch = bulkOperation.s.batches.shift() as Batch; + for (const batch of bulkOperation.s.batches) { + const finalOptions = resolveOptions(bulkOperation, { + ...options, + ordered: bulkOperation.isOrdered + }); - function resultHandler(err?: AnyError, result?: Document) { - // Error is a driver related error not a bulk op error, return early - if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) { - return callback( - new MongoBulkWriteError( - err, - new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) - ) - ); + if (finalOptions.bypassDocumentValidation !== true) { + delete finalOptions.bypassDocumentValidation; } - if (err instanceof MongoWriteConcernError) { - return handleMongoWriteConcernError( - batch, - bulkOperation.s.bulkResult, - bulkOperation.isOrdered, - err, - callback - ); + // Is the bypassDocumentValidation options specific + if (bulkOperation.s.bypassDocumentValidation === true) { + finalOptions.bypassDocumentValidation = true; } - // Merge the results together - mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result); - const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); - if (bulkOperation.handleWriteError(callback, writeResult)) return; - - // Execute the next command in line - executeCommands(bulkOperation, options, callback); - } - - const finalOptions = resolveOptions(bulkOperation, { - ...options, - ordered: bulkOperation.isOrdered - }); - - if (finalOptions.bypassDocumentValidation !== true) { - delete finalOptions.bypassDocumentValidation; - } - - // Set an operationIf if provided - if (bulkOperation.operationId) { - resultHandler.operationId = bulkOperation.operationId; - } - - // Is the bypassDocumentValidation options specific - if (bulkOperation.s.bypassDocumentValidation === true) { - finalOptions.bypassDocumentValidation = true; - } - - // Is the checkKeys option disabled - if (bulkOperation.s.checkKeys === false) { - finalOptions.checkKeys = false; - } - - if (finalOptions.retryWrites) { - if (isUpdateBatch(batch)) { - finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi); + // Is the checkKeys option disabled + if (bulkOperation.s.checkKeys === false) { + finalOptions.checkKeys = false; } - if (isDeleteBatch(batch)) { - finalOptions.retryWrites = - finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); + if (finalOptions.retryWrites) { + if (isUpdateBatch(batch)) { + finalOptions.retryWrites = + finalOptions.retryWrites && !batch.operations.some(op => op.multi); + } + + if (isDeleteBatch(batch)) { + finalOptions.retryWrites = + finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0); + } } - } - try { const operation = isInsertBatch(batch) ? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions) : isUpdateBatch(batch) @@ -588,38 +546,61 @@ function executeCommands( ? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions) : null; - if (operation != null) { - executeOperation(bulkOperation.s.collection.client, operation).then( - result => resultHandler(undefined, result), - error => resultHandler(error) - ); + if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`); + + let thrownError = null; + let result; + try { + result = await executeOperation(bulkOperation.s.collection.client, operation); + } catch (error) { + thrownError = error; + } + + if (thrownError != null) { + if (!(thrownError instanceof MongoWriteConcernError)) { + // Error is a driver related error not a bulk op error, return early + throw new MongoBulkWriteError( + thrownError, + new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) + ); + } + + if (thrownError instanceof MongoWriteConcernError) { + handleMongoWriteConcernError( + batch, + bulkOperation.s.bulkResult, + bulkOperation.isOrdered, + thrownError + ); + } } - } catch (err) { - // Force top level error - err.ok = 0; - // Merge top level error and return - mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined); - callback(); + + mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); + const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); + bulkOperation.handleWriteError(writeResult); } + + bulkOperation.s.batches.length = 0; + + const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); + bulkOperation.handleWriteError(writeResult); + return writeResult; } function handleMongoWriteConcernError( batch: Batch, bulkResult: BulkResult, isOrdered: boolean, - err: MongoWriteConcernError, - callback: Callback -) { + err: MongoWriteConcernError +): never { mergeBatchResults(batch, bulkResult, undefined, err.result); - callback( - new MongoBulkWriteError( - { - message: err.result.writeConcernError.errmsg, - code: err.result.writeConcernError.code - }, - new BulkWriteResult(bulkResult, isOrdered) - ) + throw new MongoBulkWriteError( + { + message: err.result.writeConcernError.errmsg, + code: err.result.writeConcernError.code + }, + new BulkWriteResult(bulkResult, isOrdered) ); } @@ -875,8 +856,6 @@ export interface BulkWriteOptions extends CommandOperationOptions { let?: Document; } -const executeCommandsAsync = promisify(executeCommands); - /** * TODO(NODE-4063) * BulkWrites merge complexity is implemented in executeCommands @@ -895,7 +874,7 @@ export class BulkWriteShimOperation extends AbstractOperation { return 'bulkWrite' as const; } - execute(_server: Server, session: ClientSession | undefined): Promise { + async execute(_server: Server, session: ClientSession | undefined): Promise { if (this.options.session == null) { // An implicit session could have been created by 'executeOperation' // So if we stick it on finalOptions here, each bulk operation @@ -903,7 +882,7 @@ export class BulkWriteShimOperation extends AbstractOperation { // an explicit session would be this.options.session = session; } - return executeCommandsAsync(this.bulkOperation, this.options); + return await executeCommands(this.bulkOperation, this.options); } } @@ -1239,33 +1218,26 @@ export abstract class BulkOperationBase { * Handles the write error before executing commands * @internal */ - handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean { + handleWriteError(writeResult: BulkWriteResult): void { if (this.s.bulkResult.writeErrors.length > 0) { const msg = this.s.bulkResult.writeErrors[0].errmsg ? this.s.bulkResult.writeErrors[0].errmsg : 'write operation failed'; - callback( - new MongoBulkWriteError( - { - message: msg, - code: this.s.bulkResult.writeErrors[0].code, - writeErrors: this.s.bulkResult.writeErrors - }, - writeResult - ) + throw new MongoBulkWriteError( + { + message: msg, + code: this.s.bulkResult.writeErrors[0].code, + writeErrors: this.s.bulkResult.writeErrors + }, + writeResult ); - - return true; } const writeConcernError = writeResult.getWriteConcernError(); if (writeConcernError) { - callback(new MongoBulkWriteError(writeConcernError, writeResult)); - return true; + throw new MongoBulkWriteError(writeConcernError, writeResult); } - - return false; } abstract addToOperationsList( diff --git a/src/bulk/unordered.ts b/src/bulk/unordered.ts index 3b7e7f2f35..97a134613b 100644 --- a/src/bulk/unordered.ts +++ b/src/bulk/unordered.ts @@ -4,7 +4,6 @@ import type { Collection } from '../collection'; import { MongoInvalidArgumentError } from '../error'; import type { DeleteStatement } from '../operations/delete'; import type { UpdateStatement } from '../operations/update'; -import { type Callback } from '../utils'; import { Batch, BatchType, @@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase { super(collection, options, false); } - override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean { + override handleWriteError(writeResult: BulkWriteResult): void { if (this.s.batches.length) { - return false; + return; } - return super.handleWriteError(callback, writeResult); + return super.handleWriteError(writeResult); } addToOperationsList( diff --git a/test/integration/crud/crud_api.test.ts b/test/integration/crud/crud_api.test.ts index a391e1c448..4a4a5f3ac8 100644 --- a/test/integration/crud/crud_api.test.ts +++ b/test/integration/crud/crud_api.test.ts @@ -1093,32 +1093,24 @@ describe('CRUD API', function () { } }); - it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - test: async function () { - const ops = []; - // Create a set of operations that go over the 1000 limit causing two messages - let i = 0; - for (; i < 1005; i++) { - ops.push({ insertOne: { _id: i, a: i } }); - } + it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', async function () { + const ops = []; + // Create a set of operations that go over the 1000 limit causing two messages + let i = 0; + for (; i < 1005; i++) { + ops.push({ insertOne: { _id: i, a: i } }); + } - ops.push({ insertOne: { _id: 0, a: i } }); + ops.push({ insertOne: { _id: 0, a: i } }); - const db = client.db(); + const db = client.db(); - const error = await db - .collection('t20_1') - .bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } }) - .catch(error => error); + const error = await db + .collection('t20_1') + .bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } }) + .catch(error => error); - expect(error).to.be.instanceOf(MongoError); - } + expect(error).to.be.instanceOf(MongoError); }); it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', { From f425e3cab621f2c4588f2b9cfccee7db269e2180 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 8 Oct 2024 14:12:46 -0400 Subject: [PATCH 2/3] chore: comments --- src/bulk/common.ts | 42 +++++++++----------------- test/integration/crud/crud_api.test.ts | 35 ++++++++++++--------- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 374a0d5783..a62d62a4a5 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -557,22 +557,27 @@ async function executeCommands( } if (thrownError != null) { - if (!(thrownError instanceof MongoWriteConcernError)) { + if (thrownError instanceof MongoWriteConcernError) { + mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); + const writeResult = new BulkWriteResult( + bulkOperation.s.bulkResult, + bulkOperation.isOrdered + ); + + throw new MongoBulkWriteError( + { + message: thrownError.result.writeConcernError.errmsg, + code: thrownError.result.writeConcernError.code + }, + writeResult + ); + } else { // Error is a driver related error not a bulk op error, return early throw new MongoBulkWriteError( thrownError, new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered) ); } - - if (thrownError instanceof MongoWriteConcernError) { - handleMongoWriteConcernError( - batch, - bulkOperation.s.bulkResult, - bulkOperation.isOrdered, - thrownError - ); - } } mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result); @@ -587,23 +592,6 @@ async function executeCommands( return writeResult; } -function handleMongoWriteConcernError( - batch: Batch, - bulkResult: BulkResult, - isOrdered: boolean, - err: MongoWriteConcernError -): never { - mergeBatchResults(batch, bulkResult, undefined, err.result); - - throw new MongoBulkWriteError( - { - message: err.result.writeConcernError.errmsg, - code: err.result.writeConcernError.code - }, - new BulkWriteResult(bulkResult, isOrdered) - ); -} - /** * An error indicating an unsuccessful Bulk Write * @public diff --git a/test/integration/crud/crud_api.test.ts b/test/integration/crud/crud_api.test.ts index 4a4a5f3ac8..f8ef24a022 100644 --- a/test/integration/crud/crud_api.test.ts +++ b/test/integration/crud/crud_api.test.ts @@ -7,6 +7,7 @@ import { Collection, CommandFailedEvent, CommandSucceededEvent, + MongoBulkWriteError, type MongoClient, MongoError, MongoServerError, @@ -1093,24 +1094,30 @@ describe('CRUD API', function () { } }); - it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', async function () { - const ops = []; - // Create a set of operations that go over the 1000 limit causing two messages - let i = 0; - for (; i < 1005; i++) { - ops.push({ insertOne: { _id: i, a: i } }); - } + describe('when performing a multi-batch unordered bulk write that has a duplicate key', function () { + it('throws a MongoBulkWriteError indicating the duplicate key document failed', async function () { + const ops = []; + // Create a set of operations that go over the 1000 limit causing two messages + let i = 0; + for (; i < 1005; i++) { + ops.push({ insertOne: { _id: i, a: i } }); + } - ops.push({ insertOne: { _id: 0, a: i } }); + ops[500] = { insertOne: { _id: 0, a: i } }; - const db = client.db(); + const db = client.db(); - const error = await db - .collection('t20_1') - .bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } }) - .catch(error => error); + const error = await db + .collection('t20_1') + .bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } }) + .catch(error => error); - expect(error).to.be.instanceOf(MongoError); + expect(error).to.be.instanceOf(MongoBulkWriteError); + // 1004 because one of them is duplicate key + // but since it is unordered we continued to write + expect(error).to.have.property('insertedCount', 1004); + expect(error.writeErrors[0]).to.have.nested.property('err.index', 500); + }); }); it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', { From 601d1e3df2d16a80a01fb71919450528926def36 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 9 Oct 2024 10:59:27 -0400 Subject: [PATCH 3/3] chore: L I N T --- test/integration/crud/crud_api.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/crud/crud_api.test.ts b/test/integration/crud/crud_api.test.ts index f8ef24a022..94610462a2 100644 --- a/test/integration/crud/crud_api.test.ts +++ b/test/integration/crud/crud_api.test.ts @@ -9,7 +9,6 @@ import { CommandSucceededEvent, MongoBulkWriteError, type MongoClient, - MongoError, MongoServerError, ObjectId, ReturnDocument