From 2e93ce7210de3764a0db386092a55725a65cb47b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 2 Oct 2024 17:31:59 -0400 Subject: [PATCH] feat(NODE-6274): add CSOT support to bulkWrite (#4250) Co-authored-by: Bailey Pearson --- src/bulk/common.ts | 18 ++- ...ient_side_operations_timeout.prose.test.ts | 142 +++++++++++++++++- ...lient_side_operations_timeout.spec.test.ts | 7 - test/tools/unified-spec-runner/match.ts | 14 +- 4 files changed, 159 insertions(+), 22 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index dc0bcfb513f..22012207a09 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -501,7 +501,7 @@ export function mergeBatchResults( async function executeCommands( bulkOperation: BulkOperationBase, - options: BulkWriteOptions + options: BulkWriteOptions & { timeoutContext?: TimeoutContext | null } ): Promise { if (bulkOperation.s.batches.length === 0) { return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered); @@ -552,7 +552,11 @@ async function executeCommands( let thrownError = null; let result; try { - result = await executeOperation(bulkOperation.s.collection.client, operation); + result = await executeOperation( + bulkOperation.s.collection.client, + operation, + finalOptions.timeoutContext + ); } catch (error) { thrownError = error; } @@ -866,7 +870,11 @@ export class BulkWriteShimOperation extends AbstractOperation { return 'bulkWrite' as const; } - async execute(_server: Server, session: ClientSession | undefined): Promise { + async execute( + _server: Server, + session: ClientSession | undefined, + timeoutContext: TimeoutContext + ): Promise { if (this.options.session == null) { // An implicit session could have been created by 'executeOperation' // So if we stick it on finalOptions here, each bulk operation @@ -874,7 +882,7 @@ export class BulkWriteShimOperation extends AbstractOperation { // an explicit session would be this.options.session = session; } - return await executeCommands(this.bulkOperation, this.options); + return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext }); } } @@ -1203,7 +1211,7 @@ export abstract class BulkOperationBase { const finalOptions = { ...this.s.options, ...options }; const operation = new BulkWriteShimOperation(this, finalOptions); - return await executeOperation(this.s.collection.client, operation); + return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext); } /** diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 0d36998fd96..e276c9bbafd 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -7,6 +7,7 @@ import * as sinon from 'sinon'; import { type CommandStartedEvent } from '../../../mongodb'; import { type CommandSucceededEvent, + MongoBulkWriteError, MongoClient, MongoOperationTimeoutError, MongoServerSelectionError, @@ -28,7 +29,7 @@ describe('CSOT spec prose tests', function () { await client?.close(); }); - context.skip('1. Multi-batch writes', () => { + describe('1. Multi-batch writes', { requires: { topology: 'single', mongodb: '>=4.4' } }, () => { /** * This test MUST only run against standalones on server versions 4.4 and higher. * The `insertMany` call takes an exceedingly long time on replicasets and sharded @@ -55,6 +56,46 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a timeout error. * 1. Verify that two `insert` commands were executed against `db.coll` as part of the `insertMany` call. */ + + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { + times: 2 + }, + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 1010 + } + }; + + beforeEach(async function () { + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); + await internalClient.db('admin').command(failpoint); + + client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true }); + }); + + it('performs two inserts which fail to complete before 2000 ms', async () => { + const inserts = []; + client.on('commandStarted', ev => inserts.push(ev)); + + const a = new Uint8Array(1000000 - 22); + const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a })); + const error = await client + .db('db') + .collection<{ _id: number; a: Uint8Array }>('coll') + .insertMany(oneMBDocs) + .catch(error => error); + + expect(error).to.be.instanceOf(MongoBulkWriteError); + expect(error.errorResponse).to.be.instanceOf(MongoOperationTimeoutError); + expect(inserts.map(ev => ev.commandName)).to.deep.equal(['insert', 'insert']); + }); }); context.skip('2. maxTimeMS is not set for commands sent to mongocryptd', () => { @@ -901,4 +942,103 @@ describe('CSOT spec prose tests', function () { }); }); }); + + describe.skip( + '11. Multi-batch bulkWrites', + { requires: { mongodb: '>=8.0', serverless: 'forbid' } }, + function () { + /** + * ### 11. Multi-batch bulkWrites + * + * This test MUST only run against server versions 8.0+. This test must be skipped on Atlas Serverless. + * + * 1. Using `internalClient`, drop the `db.coll` collection. + * + * 2. Using `internalClient`, set the following fail point: + * + * @example + * ```javascript + * { + * configureFailPoint: "failCommand", + * mode: { + * times: 2 + * }, + * data: { + * failCommands: ["bulkWrite"], + * blockConnection: true, + * blockTimeMS: 1010 + * } + * } + * ``` + * + * 3. Using `internalClient`, perform a `hello` command and record the `maxBsonObjectSize` and `maxMessageSizeBytes` values + * in the response. + * + * 4. Create a new MongoClient (referred to as `client`) with `timeoutMS=2000`. + * + * 5. Create a list of write models (referred to as `models`) with the following write model repeated + * (`maxMessageSizeBytes / maxBsonObjectSize + 1`) times: + * + * @example + * ```json + * InsertOne { + * "namespace": "db.coll", + * "document": { "a": "b".repeat(maxBsonObjectSize - 500) } + * } + * ``` + * + * 6. Call `bulkWrite` on `client` with `models`. + * + * - Expect this to fail with a timeout error. + * + * 7. Verify that two `bulkWrite` commands were executed as part of the `MongoClient.bulkWrite` call. + */ + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { + times: 2 + }, + data: { + failCommands: ['bulkWrite'], + blockConnection: true, + blockTimeMS: 1010 + } + }; + + let maxBsonObjectSize: number; + let maxMessageSizeBytes: number; + + beforeEach(async function () { + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); + await internalClient.db('admin').command(failpoint); + + const hello = await internalClient.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + maxMessageSizeBytes = hello.maxMessageSizeBytes; + + client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true }); + }); + + it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () { + const writes = []; + client.on('commandStarted', ev => writes.push(ev)); + + const length = maxMessageSizeBytes / maxBsonObjectSize + 1; + const models = Array.from({ length }, () => ({ + namespace: 'db.coll', + name: 'insertOne' as const, + document: { a: 'b'.repeat(maxBsonObjectSize - 500) } + })); + + const error = await client.bulkWrite(models).catch(error => error); + + expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError); + expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']); + }).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up'; + } + ); }); diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index 99914fa08e7..c2e08cfc80a 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -5,7 +5,6 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; const skippedSpecs = { - bulkWrite: 'TODO(NODE-6274)', 'change-streams': 'TODO(NODE-6035)', 'convenient-transactions': 'TODO(NODE-5687)', 'deprecated-options': 'TODO(NODE-5689)', @@ -19,18 +18,12 @@ const skippedSpecs = { }; const skippedTests = { - 'timeoutMS can be configured on a MongoClient - insertMany on collection': 'TODO(NODE-6274)', - 'timeoutMS can be configured on a MongoClient - bulkWrite on collection': 'TODO(NODE-6274)', 'timeoutMS can be configured on a MongoClient - createChangeStream on client': 'TODO(NODE-6305)', 'timeoutMS applies to whole operation, not individual attempts - createChangeStream on client': 'TODO(NODE-6305)', 'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(NODE-6305)', 'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure': 'TODO(NODE-6305)', - 'timeoutMS applies to whole operation, not individual attempts - insertMany on collection': - 'TODO(NODE-6274)', - 'timeoutMS applies to whole operation, not individual attempts - bulkWrite on collection': - 'TODO(NODE-6274)', 'command is not sent if RTT is greater than timeoutMS': 'TODO(DRIVERS-2965)', 'Non=tailable cursor iteration timeoutMS is refreshed for getMore if timeoutMode is iteration - failure': 'TODO(DRIVERS-2965)', diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index 662746b4591..931ba1c9ecc 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -788,15 +788,11 @@ export function expectErrorCheck( if (expected.isTimeoutError === false) { expect(error).to.not.be.instanceof(MongoOperationTimeoutError); } else if (expected.isTimeoutError === true) { - expect(error).to.be.instanceof(MongoOperationTimeoutError); - } - - // TODO(NODE-6274): Check for MongoBulkWriteErrors that have a MongoOperationTimeoutError in their - // errorResponse field - if (expected.isTimeoutError === false) { - expect(error).to.not.be.instanceof(MongoOperationTimeoutError); - } else if (expected.isTimeoutError === true) { - expect(error).to.be.instanceof(MongoOperationTimeoutError); + if ('errorResponse' in error) { + expect(error.errorResponse).to.be.instanceof(MongoOperationTimeoutError); + } else { + expect(error).to.be.instanceof(MongoOperationTimeoutError); + } } if (expected.errorContains != null) {