From 1b0d6282a1b65e8a5b8b39a53a826be9ad0ca9c3 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 10 Oct 2024 11:58:10 -0600 Subject: [PATCH 1/6] client bulk write + comments --- src/cmap/connection.ts | 2 + src/cmap/wire_protocol/on_data.ts | 1 + src/cursor/abstract_cursor.ts | 2 +- src/cursor/client_bulk_write_cursor.ts | 8 +- src/operations/client_bulk_write/executor.ts | 16 +- src/sdam/server.ts | 2 +- src/timeout.ts | 2 +- src/utils.ts | 13 + ...ient_side_operations_timeout.prose.test.ts | 29 +- .../node_csot.test.ts | 16 +- .../collection_db_management.test.ts | 4 +- .../crud/client_bulk_write.test.ts | 383 ++++++++++++++++++ test/tools/runner/config.ts | 28 +- test/tools/utils.ts | 67 +++ 14 files changed, 535 insertions(+), 38 deletions(-) create mode 100644 test/integration/crud/client_bulk_write.test.ts diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index a43d6106c7..acde2b1c2a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter { throw new MongoOperationTimeoutError('Timed out at socket write'); } throw error; + } finally { + timeout.clear(); } } return await drainEvent; diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 64c636f41f..f673261833 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -116,6 +116,7 @@ export function onData( emitter.off('data', eventHandler); emitter.off('error', errorHandler); finished = true; + timeoutForSocketRead?.clear(); const doneResult = { value: undefined, done: finished } as const; for (const promise of unconsumedPromises) { diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 255a977a5f..96d28d0558 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -243,7 +243,7 @@ export abstract class AbstractCursor< options.timeoutMode ?? (options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME); } else { - if (options.timeoutMode != null) + if (options.timeoutMode != null && options.timeoutContext == null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); } diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index 3a4e7eb99a..72c73caad6 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -35,7 +35,7 @@ export class ClientBulkWriteCursor extends AbstractCursor { constructor( client: MongoClient, commandBuilder: ClientBulkWriteCommandBuilder, - options: ClientBulkWriteOptions = {} + options: ClientBulkWriteCursorOptions = {} ) { super(client, new MongoDBNamespace('admin', '$cmd'), options); @@ -72,7 +72,11 @@ export class ClientBulkWriteCursor extends AbstractCursor { session }); - const response = await executeOperation(this.client, clientBulkWriteOperation); + const response = await executeOperation( + this.client, + clientBulkWriteOperation, + this.timeoutContext + ); this.cursorResponse = response; return { server: clientBulkWriteOperation.server, session, response }; diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 93acaac216..6aac96aa63 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,3 +1,4 @@ +import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; import { MongoClientBulkWriteError, @@ -5,6 +6,8 @@ import { MongoServerError } from '../../error'; import { type MongoClient } from '../../mongo_client'; +import { TimeoutContext } from '../../timeout'; +import { resolveTimeoutOptions } from '../../utils'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; import { ClientBulkWriteOperation } from './client_bulk_write'; @@ -70,17 +73,26 @@ export class ClientBulkWriteExecutor { pkFactory ); // Unacknowledged writes need to execute all batches and return { ok: 1} + const resolvedOptions = resolveTimeoutOptions(this.client, this.options); + const context = TimeoutContext.create(resolvedOptions); + if (this.options.writeConcern?.w === 0) { while (commandBuilder.hasNextBatch()) { const operation = new ClientBulkWriteOperation(commandBuilder, this.options); - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, context); } return { ok: 1 }; } else { const resultsMerger = new ClientBulkWriteResultsMerger(this.options); // For each command will will create and exhaust a cursor for the results. while (commandBuilder.hasNextBatch()) { - const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const cursorContext = new CursorTimeoutContext(context, Symbol()); + const options = { + ...this.options, + timeoutContext: cursorContext, + ...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME }) + }; + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options); try { await resultsMerger.merge(cursor); } catch (error) { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 7ab2d9a043..35a6f1de69 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -106,7 +106,7 @@ export type ServerEvents = { EventEmitterWithState; /** @internal */ -export type ServerCommandOptions = Omit & { +export type ServerCommandOptions = Omit & { timeoutContext: TimeoutContext; }; diff --git a/src/timeout.ts b/src/timeout.ts index 9041ce4b88..355bdc7e99 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -171,7 +171,7 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions /** @internal */ export abstract class TimeoutContext { - static create(options: TimeoutContextOptions): TimeoutContext { + static create(options: Partial): TimeoutContext { if (options.session?.timeoutContext != null) return options.session?.timeoutContext; if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); diff --git a/src/utils.ts b/src/utils.ts index 04174813c9..15b3bab90f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -35,6 +35,7 @@ import { ServerType } from './sdam/common'; import type { Server } from './sdam/server'; import type { Topology } from './sdam/topology'; import type { ClientSession } from './sessions'; +import { type TimeoutContextOptions } from './timeout'; import { WriteConcern } from './write_concern'; /** @@ -514,6 +515,18 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean { return keys.length > 0 && keys[0][0] === '$'; } +export function resolveTimeoutOptions>( + client: MongoClient, + options: T +): T & + Pick< + MongoClient['s']['options'], + 'timeoutMS' | 'serverSelectionTimeoutMS' | 'waitQueueTimeoutMS' | 'socketTimeoutMS' + > { + const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } = + client.s.options; + return { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS, ...options }; +} /** * Merge inherited properties from parent into options, prioritizing values from options, * then values from parent. diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 80da92e10a..458447a437 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -21,7 +21,8 @@ import { promiseWithResolvers, squashError } from '../../mongodb'; -import { type FailPoint } from '../../tools/utils'; +import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils'; +import { filterForCommands } from '../shared'; // TODO(NODE-5824): Implement CSOT prose tests describe('CSOT spec prose tests', function () { @@ -1183,9 +1184,9 @@ describe('CSOT spec prose tests', function () { }); }); - describe.skip( + describe( '11. Multi-batch bulkWrites', - { requires: { mongodb: '>=8.0', serverless: 'forbid' } }, + { requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } }, function () { /** * ### 11. Multi-batch bulkWrites @@ -1245,9 +1246,6 @@ describe('CSOT spec prose tests', function () { } }; - let maxBsonObjectSize: number; - let maxMessageSizeBytes: number; - beforeEach(async function () { await internalClient .db('db') @@ -1256,29 +1254,20 @@ describe('CSOT spec prose tests', function () { .catch(() => null); await internalClient.db('admin').command(failpoint); - const hello = await internalClient.db('admin').command({ hello: 1 }); - maxBsonObjectSize = hello.maxBsonObjectSize; - maxMessageSizeBytes = hello.maxMessageSizeBytes; - client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true }); }); - it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () { + it('performs two bulkWrites which fail to complete before 2000 ms', async function () { const writes = []; - client.on('commandStarted', ev => writes.push(ev)); + client.on('commandStarted', filterForCommands('bulkWrite', writes)); - const length = maxMessageSizeBytes / maxBsonObjectSize + 1; - const models = Array.from({ length }, () => ({ - namespace: 'db.coll', - name: 'insertOne' as const, - document: { a: 'b'.repeat(maxBsonObjectSize - 500) } - })); + const models = await makeMultiBatchWrite(this.configuration); const error = await client.bulkWrite(models).catch(error => error); expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError); - expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']); - }).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up'; + expect(writes).to.have.lengthOf(2); + }); } ); }); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index b1516454cc..4eb557ff42 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -279,12 +279,16 @@ describe('CSOT driver tests', metadata, () => { .stub(Connection.prototype, 'readMany') .callsFake(async function* (...args) { const realIterator = readManyStub.wrappedMethod.call(this, ...args); - const cmd = commandSpy.lastCall.args.at(1); - if ('giveMeWriteErrors' in cmd) { - await realIterator.next().catch(() => null); // dismiss response - yield { parse: () => writeErrorsReply }; - } else { - yield (await realIterator.next()).value; + try { + const cmd = commandSpy.lastCall.args.at(1); + if ('giveMeWriteErrors' in cmd) { + await realIterator.next().catch(() => null); // dismiss response + yield { parse: () => writeErrorsReply }; + } else { + yield (await realIterator.next()).value; + } + } finally { + realIterator.return(); } }); }); diff --git a/test/integration/collection-management/collection_db_management.test.ts b/test/integration/collection-management/collection_db_management.test.ts index f5c4c55cf0..0cb90b3b59 100644 --- a/test/integration/collection-management/collection_db_management.test.ts +++ b/test/integration/collection-management/collection_db_management.test.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { Collection, type Db, type MongoClient } from '../../mongodb'; +import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb'; describe('Collection Management and Db Management', function () { let client: MongoClient; @@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () { }); it('returns a collection object after calling createCollection', async function () { - const collection = await db.createCollection('collection'); + const collection = await db.createCollection(new ObjectId().toHexString()); expect(collection).to.be.instanceOf(Collection); }); diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts new file mode 100644 index 0000000000..8b0f22f2a0 --- /dev/null +++ b/test/integration/crud/client_bulk_write.test.ts @@ -0,0 +1,383 @@ +import { expect } from 'chai'; +import { setTimeout } from 'timers/promises'; + +import { + type CommandStartedEvent, + type Connection, + type ConnectionPool, + type MongoClient, + MongoError, + MongoOperationTimeoutError, + now, + TimeoutContext +} from '../../mongodb'; +import { + clearFailPoint, + configureFailPoint, + makeMultiBatchWrite, + makeMultiResponseBatchModelArray +} from '../../tools/utils'; +import { filterForCommands } from '../shared'; + +const metadata: MongoDBMetadataUI = { + requires: { + mongodb: '>=8.0', + serverless: 'forbid' + } +}; + +describe('Client Bulk Write', function () { + let client: MongoClient; + + afterEach(async function () { + await client?.close(); + await clearFailPoint(this.configuration); + }); + + describe('CSOT enabled', function () { + describe('when timeoutMS is set on the client', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 300 }); + await client.connect(); + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { + const start = now(); + const timeoutError = await client + .bulkWrite([ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ]) + .catch(e => e); + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(300 - 100, 300 + 100); + }); + }); + + describe('when timeoutMS is set on the bulkWrite operation', function () { + beforeEach(async function () { + client = this.configuration.newClient({}); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { + const start = now(); + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(300 - 100, 300 + 100); + }); + }); + + describe('when timeoutMS is set on both the client and operation options', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 1500 }); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('bulk write options take precedence over the client options', metadata, async function () { + const start = now(); + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(300 - 100, 300 + 100); + }); + }); + + describe( + 'unacknowledged writes', + { + requires: { + mongodb: '>=8.0', + topology: 'single' + } + }, + function () { + let connection: Connection; + let pool: ConnectionPool; + + beforeEach(async function () { + client = this.configuration.newClient({}, { maxPoolSize: 1, waitQueueTimeoutMS: 2000 }); + + await client.connect(); + + pool = Array.from(client.topology.s.servers.values())[0].pool; + connection = await pool.checkOut({ + timeoutContext: TimeoutContext.create({ + serverSelectionTimeoutMS: 30000, + waitQueueTimeoutMS: 1000 + }) + }); + }); + + afterEach(async function () { + pool = Array.from(client.topology.s.servers.values())[0].pool; + pool.checkIn(connection); + await client.close(); + }); + + it('a single batch bulk write does not take longer than timeoutMS', async function () { + const start = now(); + let end; + const timeoutError = client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 200, writeConcern: { w: 0 } } + ) + .catch(e => e) + .then(e => { + end = now(); + return e; + }); + + await setTimeout(250); + + expect(await timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(200 - 100, 200 + 100); + }); + + it( + 'timeoutMS applies to all batches', + { + requires: { + mongodb: '>=8.0', + topology: 'single' + } + }, + async function () { + const models = await makeMultiBatchWrite(this.configuration); + const start = now(); + let end; + const timeoutError = client + .bulkWrite(models, { + timeoutMS: 400, + writeConcern: { w: 0 } + }) + .catch(e => e) + .then(r => { + end = now(); + return r; + }); + + await setTimeout(210); + + pool.checkIn(connection); + connection = await pool.checkOut({ + timeoutContext: TimeoutContext.create({ + serverSelectionTimeoutMS: 30000, + waitQueueTimeoutMS: 1000 + }) + }); + + await setTimeout(210); + + expect(await timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(400 - 100, 400 + 100); + } + ); + } + ); + + describe('acknowledged writes', metadata, function () { + describe('when a bulk write command times out', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 1500 }); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('the operation times out', metadata, async function () { + const start = now(); + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(300 - 100, 300 + 100); + }); + }); + + describe('when the timeout is reached while iterating the result cursor', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + client.on('commandStarted', filterForCommands(['getMore'], commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1400, failCommands: ['getMore'] } + }); + }); + + it('the bulk write operation times out', metadata, async function () { + const models = await makeMultiResponseBatchModelArray(this.configuration); + const start = now(); + const timeoutError = await client + .bulkWrite(models, { + verboseResults: true, + timeoutMS: 1500 + }) + .catch(e => e); + + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + + // DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS. + expect(end - start).to.be.within(2000 - 100, 2000 + 100); + expect(commands).to.have.lengthOf(1); + }); + }); + + describe('if the cursor encounters and error and a killCursors is sent', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + + client.on('commandStarted', filterForCommands(['killCursors'], commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + blockConnection: true, + blockTimeMS: 3000, + failCommands: ['getMore', 'killCursors'] + } + }); + }); + + it( + 'timeoutMS is refreshed to the timeoutMS passed to the bulk write for the killCursors command', + metadata, + async function () { + const models = await makeMultiResponseBatchModelArray(this.configuration); + const timeoutError = await client + .bulkWrite(models, { ordered: true, timeoutMS: 2800, verboseResults: true }) + .catch(e => e); + + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + + const [ + { + command: { maxTimeMS } + } + ] = commands; + expect(maxTimeMS).to.be.greaterThan(1000); + } + ); + }); + + describe('when the bulk write is executed in multiple batches', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { blockConnection: true, blockTimeMS: 1010, failCommands: ['bulkWrite'] } + }); + }); + + it( + 'timeoutMS applies to the duration of all batches', + { + requires: { + ...metadata.requires, + topology: 'single' + } + }, + async function () { + const models = await makeMultiBatchWrite(this.configuration); + const start = now(); + const timeoutError = await client + .bulkWrite(models, { + timeoutMS: 2000 + }) + .catch(e => e); + + const end = now(); + expect(timeoutError).to.be.instanceOf(MongoError); + expect(end - start).to.be.within(2000 - 100, 2000 + 100); + expect(commands.length, 'Test must execute two batches.').to.equal(2); + } + ); + }); + }); + }); +}); diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index 1d63748622..16024638fb 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -7,6 +7,7 @@ import { type AuthMechanism, HostAddress, MongoClient, + type MongoClientOptions, type ServerApi, TopologyType, type WriteConcernSettings @@ -82,7 +83,7 @@ export class TestConfiguration { auth?: { username: string; password: string; authSource?: string }; proxyURIParams?: ProxyParams; }; - serverApi: ServerApi; + serverApi?: ServerApi; activeResources: number; isSrv: boolean; serverlessCredentials: { username: string | undefined; password: string | undefined }; @@ -171,13 +172,34 @@ export class TestConfiguration { return this.options.replicaSet; } + /** + * Returns a `hello`, executed against `uri`. + */ + async hello(uri = this.uri) { + const client = this.newClient(uri); + try { + await client.connect(); + const { maxBsonObjectSize, maxMessageSizeBytes, maxWriteBatchSize, ...rest } = await client + .db('admin') + .command({ hello: 1 }); + return { + maxBsonObjectSize, + maxMessageSizeBytes, + maxWriteBatchSize, + ...rest + }; + } finally { + await client.close(); + } + } + isOIDC(uri: string, env: string): boolean { if (!uri) return false; return uri.indexOf('MONGODB-OIDC') > -1 && uri.indexOf(`ENVIRONMENT:${env}`) > -1; } - newClient(urlOrQueryOptions?: string | Record, serverOptions?: Record) { - serverOptions = Object.assign({}, getEnvironmentalOptions(), serverOptions); + newClient(urlOrQueryOptions?: string | Record, serverOptions?: MongoClientOptions) { + serverOptions = Object.assign({}, getEnvironmentalOptions(), serverOptions); // Support MongoClient constructor form (url, options) for `newClient`. if (typeof urlOrQueryOptions === 'string') { diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 8614bd7d64..8ebc5e8f53 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -11,6 +11,7 @@ import { setTimeout } from 'timers'; import { inspect, promisify } from 'util'; import { + type AnyClientBulkWriteModel, type Document, type HostAddress, MongoClient, @@ -18,6 +19,7 @@ import { Topology, type TopologyOptions } from '../mongodb'; +import { type TestConfiguration } from './runner/config'; import { runUnifiedSuite } from './unified-spec-runner/runner'; import { type CollectionData, @@ -598,3 +600,68 @@ export async function waitUntilPoolsFilled( await Promise.all([wait$(), client.connect()]); } + +export async function configureFailPoint(configuration: TestConfiguration, failPoint: FailPoint) { + const utilClient = configuration.newClient(); + await utilClient.connect(); + + try { + await utilClient.db('admin').command(failPoint); + } finally { + await utilClient.close(); + } +} + +export async function clearFailPoint(configuration: TestConfiguration) { + const utilClient = configuration.newClient(); + await utilClient.connect(); + + try { + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'off' + }); + } finally { + await utilClient.close(); + } +} + +export async function makeMultiBatchWrite( + configuration: TestConfiguration +): Promise { + const { maxBsonObjectSize, maxMessageSizeBytes } = await configuration.hello(); + + const length = maxMessageSizeBytes / maxBsonObjectSize + 1; + const models = Array.from({ length }, () => ({ + namespace: 'db.coll', + name: 'insertOne' as const, + document: { a: 'b'.repeat(maxBsonObjectSize - 500) } + })); + + return models; +} + +export async function makeMultiResponseBatchModelArray( + configuration: TestConfiguration +): Promise { + const { maxBsonObjectSize } = await configuration.hello(); + const namespace = `foo.${new BSON.ObjectId().toHexString()}`; + const models: AnyClientBulkWriteModel[] = [ + { + name: 'updateOne', + namespace, + update: { $set: { age: 1 } }, + upsert: true, + filter: { _id: 'a'.repeat(maxBsonObjectSize / 2) } + }, + { + name: 'updateOne', + namespace, + update: { $set: { age: 1 } }, + upsert: true, + filter: { _id: 'b'.repeat(maxBsonObjectSize / 2) } + } + ]; + + return models; +} From d22db741773d624dfee5e5778c1e75810186e9d6 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 10 Oct 2024 14:46:53 -0600 Subject: [PATCH 2/6] comments --- src/timeout.ts | 2 +- test/integration/crud/client_bulk_write.test.ts | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/timeout.ts b/src/timeout.ts index 355bdc7e99..9041ce4b88 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -171,7 +171,7 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions /** @internal */ export abstract class TimeoutContext { - static create(options: Partial): TimeoutContext { + static create(options: TimeoutContextOptions): TimeoutContext { if (options.session?.timeoutContext != null) return options.session?.timeoutContext; if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index 8b0f22f2a0..6818d763e0 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -6,7 +6,6 @@ import { type Connection, type ConnectionPool, type MongoClient, - MongoError, MongoOperationTimeoutError, now, TimeoutContext @@ -58,7 +57,7 @@ describe('Client Bulk Write', function () { ]) .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); }); @@ -91,7 +90,7 @@ describe('Client Bulk Write', function () { ) .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); }); @@ -124,7 +123,7 @@ describe('Client Bulk Write', function () { ) .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); }); @@ -183,7 +182,7 @@ describe('Client Bulk Write', function () { await setTimeout(250); - expect(await timeoutError).to.be.instanceOf(MongoError); + expect(await timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(200 - 100, 200 + 100); }); @@ -222,7 +221,7 @@ describe('Client Bulk Write', function () { await setTimeout(210); - expect(await timeoutError).to.be.instanceOf(MongoError); + expect(await timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(400 - 100, 400 + 100); } ); @@ -258,7 +257,7 @@ describe('Client Bulk Write', function () { ) .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); }); @@ -289,7 +288,7 @@ describe('Client Bulk Write', function () { .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); // DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS. expect(end - start).to.be.within(2000 - 100, 2000 + 100); @@ -372,7 +371,7 @@ describe('Client Bulk Write', function () { .catch(e => e); const end = now(); - expect(timeoutError).to.be.instanceOf(MongoError); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(2000 - 100, 2000 + 100); expect(commands.length, 'Test must execute two batches.').to.equal(2); } From 69d99c2cf556386e408cd68cf322ef58a8107458 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 11 Oct 2024 08:55:45 -0600 Subject: [PATCH 3/6] make test more leniant --- test/integration/crud/client_bulk_write.test.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index 6818d763e0..959a0541d9 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -14,7 +14,8 @@ import { clearFailPoint, configureFailPoint, makeMultiBatchWrite, - makeMultiResponseBatchModelArray + makeMultiResponseBatchModelArray, + waitUntilPoolsFilled } from '../../tools/utils'; import { filterForCommands } from '../shared'; @@ -266,7 +267,7 @@ describe('Client Bulk Write', function () { const commands: CommandStartedEvent[] = []; beforeEach(async function () { - client = this.configuration.newClient({}, { monitorCommands: true }); + client = this.configuration.newClient({}, { monitorCommands: true, minPoolSize: 5 }); client.on('commandStarted', filterForCommands(['getMore'], commands)); await client.connect(); @@ -291,7 +292,9 @@ describe('Client Bulk Write', function () { expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); // DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS. - expect(end - start).to.be.within(2000 - 100, 2000 + 100); + // The amount of time killCursors takes is wildly variable and can take up to almost + // 500ms sometimes. + expect(end - start).to.be.within(1500, 1500 + 600); expect(commands).to.have.lengthOf(1); }); }); From 473b8d65a2507f0f876c8f3f1c69bc889f039608 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 11 Oct 2024 09:16:06 -0600 Subject: [PATCH 4/6] lint --- test/integration/crud/client_bulk_write.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index 959a0541d9..e566e2a8ef 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -14,8 +14,7 @@ import { clearFailPoint, configureFailPoint, makeMultiBatchWrite, - makeMultiResponseBatchModelArray, - waitUntilPoolsFilled + makeMultiResponseBatchModelArray } from '../../tools/utils'; import { filterForCommands } from '../shared'; From 5f34f43015f09b842ead9e80d86a320b19e4d277 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 11 Oct 2024 09:51:40 -0600 Subject: [PATCH 5/6] bump threshold again --- test/integration/crud/client_bulk_write.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index e566e2a8ef..1251b821d3 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -292,8 +292,8 @@ describe('Client Bulk Write', function () { // DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS. // The amount of time killCursors takes is wildly variable and can take up to almost - // 500ms sometimes. - expect(end - start).to.be.within(1500, 1500 + 600); + // 600-700ms sometimes. + expect(end - start).to.be.within(1500, 1500 + 800); expect(commands).to.have.lengthOf(1); }); }); From b92ddc1d9eaf9f223014d89fcc20c44916775421 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 14 Oct 2024 08:06:45 -0600 Subject: [PATCH 6/6] Update test/integration/crud/client_bulk_write.test.ts Co-authored-by: Warren James --- test/integration/crud/client_bulk_write.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index 1251b821d3..6177077b63 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -298,7 +298,7 @@ describe('Client Bulk Write', function () { }); }); - describe('if the cursor encounters and error and a killCursors is sent', function () { + describe('if the cursor encounters an error and a killCursors is sent', function () { const commands: CommandStartedEvent[] = []; beforeEach(async function () {