diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 507615e9f03..acde2b1c2af 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -422,9 +422,9 @@ export class Connection extends TypedEventEmitter { ...options }; - if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) { - const { maxTimeMS } = options.timeoutContext; - if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS; + if (!options.omitMaxTimeMS) { + const maxTimeMS = options.timeoutContext?.maxTimeMS; + if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS; } const message = this.supportsOpMsg @@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter { throw new MongoOperationTimeoutError('Timed out at socket write'); } throw error; + } finally { + timeout.clear(); } } return await drainEvent; diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 64c636f41f1..f6732618330 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -116,6 +116,7 @@ export function onData( emitter.off('data', eventHandler); emitter.off('error', errorHandler); finished = true; + timeoutForSocketRead?.clear(); const doneResult = { value: undefined, done: finished } as const; for (const promise of unconsumedPromises) { diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d0f386923ad..74049fce788 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { type AsyncDisposable, configureResourceManagement } from '../resource_management'; import type { Server } from '../sdam/server'; import { ClientSession, maybeClearPinnedConnection } from '../sessions'; -import { TimeoutContext } from '../timeout'; +import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout'; import { type MongoDBNamespace, squashError } from '../utils'; /** @@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { timeoutMS?: number; /** @internal TODO(NODE-5688): make this public */ timeoutMode?: CursorTimeoutMode; + + /** + * @internal + * + * A timeout context to govern the total time the cursor can live. If provided, the cursor + * cannot be used in ITERATION mode. + */ + timeoutContext?: CursorTimeoutContext; } /** @internal */ @@ -171,7 +179,7 @@ export abstract class AbstractCursor< /** @internal */ protected readonly cursorOptions: InternalAbstractCursorOptions; /** @internal */ - protected timeoutContext?: TimeoutContext; + protected timeoutContext?: CursorTimeoutContext; /** @event */ static readonly CLOSE = 'close' as const; @@ -205,22 +213,14 @@ export abstract class AbstractCursor< }; this.cursorOptions.timeoutMS = options.timeoutMS; if (this.cursorOptions.timeoutMS != null) { - if (options.timeoutMode == null) { - if (options.tailable) { - this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; - } else { - this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME; - } - } else { - if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { - throw new MongoInvalidArgumentError( - "Cannot set tailable cursor's timeoutMode to LIFETIME" - ); - } - this.cursorOptions.timeoutMode = options.timeoutMode; + if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { + throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME"); } + this.cursorOptions.timeoutMode = + options.timeoutMode ?? + (options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME); } else { - if (options.timeoutMode != null) + if (options.timeoutMode != null && options.timeoutContext == null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); } this.cursorOptions.omitMaxTimeMS = @@ -264,6 +264,17 @@ export abstract class AbstractCursor< utf8: options?.enableUtf8Validation === false ? false : true } }; + + if ( + options.timeoutContext != null && + options.timeoutMS != null && + this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME + ) { + throw new MongoAPIError( + `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.` + ); + } + this.timeoutContext = options.timeoutContext; } /** @@ -721,6 +732,9 @@ export abstract class AbstractCursor< * if the resultant data has already been retrieved by this cursor. */ rewind(): void { + if (this.timeoutContext && this.timeoutContext.owner !== this) { + throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`); + } if (!this.initialized) { return; } @@ -790,10 +804,13 @@ export abstract class AbstractCursor< */ private async cursorInit(): Promise { if (this.cursorOptions.timeoutMS != null) { - this.timeoutContext = TimeoutContext.create({ - serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, - timeoutMS: this.cursorOptions.timeoutMS - }); + this.timeoutContext ??= new CursorTimeoutContext( + TimeoutContext.create({ + serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, + timeoutMS: this.cursorOptions.timeoutMS + }), + this + ); } try { const state = await this._initialize(this.cursorSession); @@ -872,6 +889,20 @@ export abstract class AbstractCursor< private async cleanup(timeoutMS?: number, error?: Error) { this.isClosed = true; const session = this.cursorSession; + const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => { + if (timeoutMS != null) { + this.timeoutContext?.clear(); + return new CursorTimeoutContext( + TimeoutContext.create({ + serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, + timeoutMS + }), + this + ); + } else { + return this.timeoutContext?.refreshed(); + } + }; try { if ( !this.isKilled && @@ -884,23 +915,13 @@ export abstract class AbstractCursor< this.isKilled = true; const cursorId = this.cursorId; this.cursorId = Long.ZERO; - let timeoutContext: TimeoutContext | undefined; - if (timeoutMS != null) { - this.timeoutContext?.clear(); - timeoutContext = TimeoutContext.create({ - serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, - timeoutMS - }); - } else { - this.timeoutContext?.refresh(); - timeoutContext = this.timeoutContext; - } + await executeOperation( this.cursorClient, new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { session }), - timeoutContext + timeoutContextForKillCursors() ); } } catch (error) { @@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable { } configureResourceManagement(AbstractCursor.prototype); + +/** + * @internal + * The cursor timeout context is a wrapper around a timeout context + * that keeps track of the "owner" of the cursor. For timeout contexts + * instantiated inside a cursor, the owner will be the cursor. + * + * All timeout behavior is exactly the same as the wrapped timeout context's. + */ +export class CursorTimeoutContext extends TimeoutContext { + constructor( + public timeoutContext: TimeoutContext, + public owner: symbol | AbstractCursor + ) { + super(); + } + override get serverSelectionTimeout(): Timeout | null { + return this.timeoutContext.serverSelectionTimeout; + } + override get connectionCheckoutTimeout(): Timeout | null { + return this.timeoutContext.connectionCheckoutTimeout; + } + override get clearServerSelectionTimeout(): boolean { + return this.timeoutContext.clearServerSelectionTimeout; + } + override get clearConnectionCheckoutTimeout(): boolean { + return this.timeoutContext.clearConnectionCheckoutTimeout; + } + override get timeoutForSocketWrite(): Timeout | null { + return this.timeoutContext.timeoutForSocketWrite; + } + override get timeoutForSocketRead(): Timeout | null { + return this.timeoutContext.timeoutForSocketRead; + } + override csotEnabled(): this is CSOTTimeoutContext { + return this.timeoutContext.csotEnabled(); + } + override refresh(): void { + return this.timeoutContext.refresh(); + } + override clear(): void { + return this.timeoutContext.clear(); + } + override get maxTimeMS(): number | null { + return this.timeoutContext.maxTimeMS; + } + + override refreshed(): CursorTimeoutContext { + return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner); + } +} diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index 06f34dfc52f..3add4b86c2f 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor { constructor( client: MongoClient, commandBuilder: ClientBulkWriteCommandBuilder, - options: ClientBulkWriteOptions = {} + options: ClientBulkWriteCursorOptions = {} ) { super(client, new MongoDBNamespace('admin', '$cmd'), options); @@ -78,7 +78,11 @@ export class ClientBulkWriteCursor extends AbstractCursor { session }); - const response = await executeOperation(this.client, clientBulkWriteOperation); + const response = await executeOperation( + this.client, + clientBulkWriteOperation, + this.timeoutContext + ); this.cursorResponse = response; return { server: clientBulkWriteOperation.server, session, response }; diff --git a/src/index.ts b/src/index.ts index 7f948f30ed4..82bbeb2aec7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -358,6 +358,7 @@ export type { CursorStreamOptions } from './cursor/abstract_cursor'; export type { + CursorTimeoutContext, InitialCursorResponse, InternalAbstractCursorOptions } from './cursor/abstract_cursor'; diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 5baf1ed6b6e..fe6b7b4ec10 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,5 +1,8 @@ +import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; import { type MongoClient } from '../../mongo_client'; +import { TimeoutContext } from '../../timeout'; +import { resolveTimeoutOptions } from '../../utils'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; import { ClientBulkWriteOperation } from './client_bulk_write'; @@ -56,10 +59,13 @@ export class ClientBulkWriteExecutor { pkFactory ); // Unacknowledged writes need to execute all batches and return { ok: 1} + const resolvedOptions = resolveTimeoutOptions(this.client, this.options); + const context = TimeoutContext.create(resolvedOptions); + if (this.options.writeConcern?.w === 0) { while (commandBuilder.hasNextBatch()) { const operation = new ClientBulkWriteOperation(commandBuilder, this.options); - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, context); } return { ok: 1 }; } else { @@ -67,7 +73,13 @@ export class ClientBulkWriteExecutor { // For each command will will create and exhaust a cursor for the results. let currentBatchOffset = 0; while (commandBuilder.hasNextBatch()) { - const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const cursorContext = new CursorTimeoutContext(context, Symbol()); + const options = { + ...this.options, + timeoutContext: cursorContext, + ...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME }) + }; + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options); const docs = await cursor.toArray(); const operations = cursor.operations; resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); diff --git a/src/operations/find.ts b/src/operations/find.ts index 641255553a0..348467acf75 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -1,6 +1,6 @@ import type { Document } from '../bson'; import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses'; -import { type CursorTimeoutMode } from '../cursor/abstract_cursor'; +import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor'; import { MongoInvalidArgumentError } from '../error'; import { ReadConcern } from '../read_concern'; import type { Server } from '../sdam/server'; @@ -17,7 +17,8 @@ import { Aspect, defineAspects, type Hint } from './operation'; */ // eslint-disable-next-line @typescript-eslint/no-unused-vars export interface FindOptions - extends Omit { + extends Omit, + AbstractCursorOptions { /** Sets the limit of documents returned in the query. */ limit?: number; /** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */ diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 08325086d53..280ba756a7a 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -106,7 +106,7 @@ export type ServerEvents = { EventEmitterWithState; /** @internal */ -export type ServerCommandOptions = Omit & { +export type ServerCommandOptions = Omit & { timeoutContext: TimeoutContext; }; diff --git a/src/timeout.ts b/src/timeout.ts index f694b5f4f4f..355bdc7e99c 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -171,13 +171,15 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions /** @internal */ export abstract class TimeoutContext { - static create(options: TimeoutContextOptions): TimeoutContext { + static create(options: Partial): TimeoutContext { if (options.session?.timeoutContext != null) return options.session?.timeoutContext; if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); else throw new MongoRuntimeError('Unrecognized options'); } + abstract get maxTimeMS(): number | null; + abstract get serverSelectionTimeout(): Timeout | null; abstract get connectionCheckoutTimeout(): Timeout | null; @@ -195,6 +197,9 @@ export abstract class TimeoutContext { abstract refresh(): void; abstract clear(): void; + + /** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */ + abstract refreshed(): TimeoutContext; } /** @internal */ @@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext { throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`); return remainingTimeMS; } + + override refreshed(): CSOTTimeoutContext { + return new CSOTTimeoutContext(this); + } } /** @internal */ @@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext { clear(): void { return; } + + get maxTimeMS() { + return null; + } + + override refreshed(): LegacyTimeoutContext { + return new LegacyTimeoutContext(this.options); + } } diff --git a/src/utils.ts b/src/utils.ts index 04174813c9c..cf8aef14e6e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -35,6 +35,7 @@ import { ServerType } from './sdam/common'; import type { Server } from './sdam/server'; import type { Topology } from './sdam/topology'; import type { ClientSession } from './sessions'; +import { type TimeoutContextOptions } from './timeout'; import { WriteConcern } from './write_concern'; /** @@ -514,6 +515,21 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean { return keys.length > 0 && keys[0][0] === '$'; } +export function resolveTimeoutOptions>( + client: MongoClient, + options?: T +): Pick< + MongoClient['s']['options'], + 'serverSelectionTimeoutMS' | 'socketTimeoutMS' | 'waitQueueTimeoutMS' | 'timeoutMS' +> & + T { + const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } = + client.s.options; + return Object.assign( + { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS }, + options + ); +} /** * Merge inherited properties from parent into options, prioritizing values from options, * then values from parent. diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 1b8c34633b4..a45c85c771a 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -18,7 +18,8 @@ import { ObjectId, promiseWithResolvers } from '../../mongodb'; -import { type FailPoint } from '../../tools/utils'; +import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils'; +import { filterForCommands } from '../shared'; // TODO(NODE-5824): Implement CSOT prose tests describe('CSOT spec prose tests', function () { @@ -1104,9 +1105,9 @@ describe('CSOT spec prose tests', function () { }); }); - describe.skip( + describe( '11. Multi-batch bulkWrites', - { requires: { mongodb: '>=8.0', serverless: 'forbid' } }, + { requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } }, function () { /** * ### 11. Multi-batch bulkWrites @@ -1166,9 +1167,6 @@ describe('CSOT spec prose tests', function () { } }; - let maxBsonObjectSize: number; - let maxMessageSizeBytes: number; - beforeEach(async function () { await internalClient .db('db') @@ -1177,29 +1175,20 @@ describe('CSOT spec prose tests', function () { .catch(() => null); await internalClient.db('admin').command(failpoint); - const hello = await internalClient.db('admin').command({ hello: 1 }); - maxBsonObjectSize = hello.maxBsonObjectSize; - maxMessageSizeBytes = hello.maxMessageSizeBytes; - client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true }); }); - it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () { + it('performs two bulkWrites which fail to complete before 2000 ms', async function () { const writes = []; - client.on('commandStarted', ev => writes.push(ev)); + client.on('commandStarted', filterForCommands('bulkWrite', writes)); - const length = maxMessageSizeBytes / maxBsonObjectSize + 1; - const models = Array.from({ length }, () => ({ - namespace: 'db.coll', - name: 'insertOne' as const, - document: { a: 'b'.repeat(maxBsonObjectSize - 500) } - })); + const models = await makeMultiBatchWrite(this.configuration); const error = await client.bulkWrite(models).catch(error => error); expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError); - expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']); - }).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up'; + expect(writes.length).to.equal(2); + }); } ); }); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index b2011ee2e73..2f7e2469015 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -26,7 +26,7 @@ import { MongoServerError, ObjectId } from '../../mongodb'; -import { type FailPoint } from '../../tools/utils'; +import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils'; const metadata = { requires: { mongodb: '>=4.4' } }; @@ -271,12 +271,16 @@ describe('CSOT driver tests', metadata, () => { .stub(Connection.prototype, 'readMany') .callsFake(async function* (...args) { const realIterator = readManyStub.wrappedMethod.call(this, ...args); - const cmd = commandSpy.lastCall.args.at(1); - if ('giveMeWriteErrors' in cmd) { - await realIterator.next().catch(() => null); // dismiss response - yield { parse: () => writeErrorsReply }; - } else { - yield (await realIterator.next()).value; + try { + const cmd = commandSpy.lastCall.args.at(1); + if ('giveMeWriteErrors' in cmd) { + await realIterator.next().catch(() => null); // dismiss response + yield { parse: () => writeErrorsReply }; + } else { + yield (await realIterator.next()).value; + } + } finally { + realIterator.return(); } }); }); @@ -362,7 +366,7 @@ describe('CSOT driver tests', metadata, () => { }; beforeEach(async function () { - internalClient = this.configuration.newClient(); + internalClient = this.configuration.newClient({}); await internalClient .db('db') .dropCollection('coll') @@ -378,7 +382,11 @@ describe('CSOT driver tests', metadata, () => { await internalClient.db().admin().command(failpoint); - client = this.configuration.newClient(undefined, { monitorCommands: true }); + client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 }); + + // wait for a handful of connections to have been established + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5); + commandStarted = []; commandSucceeded = []; client.on('commandStarted', ev => commandStarted.push(ev)); @@ -492,7 +500,13 @@ describe('CSOT driver tests', metadata, () => { await internalClient.db().admin().command(failpoint); - client = this.configuration.newClient(undefined, { monitorCommands: true }); + client = this.configuration.newClient(undefined, { + monitorCommands: true, + minPoolSize: 10 + }); + // wait for a handful of connections to have been established + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5); + commandStarted = []; commandSucceeded = []; client.on('commandStarted', ev => commandStarted.push(ev)); diff --git a/test/integration/collection-management/collection_db_management.test.ts b/test/integration/collection-management/collection_db_management.test.ts index f5c4c55cf05..0cb90b3b592 100644 --- a/test/integration/collection-management/collection_db_management.test.ts +++ b/test/integration/collection-management/collection_db_management.test.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { Collection, type Db, type MongoClient } from '../../mongodb'; +import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb'; describe('Collection Management and Db Management', function () { let client: MongoClient; @@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () { }); it('returns a collection object after calling createCollection', async function () { - const collection = await db.createCollection('collection'); + const collection = await db.createCollection(new ObjectId().toHexString()); expect(collection).to.be.instanceOf(Collection); }); diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts new file mode 100644 index 00000000000..9137731b179 --- /dev/null +++ b/test/integration/crud/client_bulk_write.test.ts @@ -0,0 +1,349 @@ +import { expect } from 'chai'; +import { setTimeout } from 'timers/promises'; + +import { + type CommandStartedEvent, + type Connection, + type ConnectionPool, + type MongoClient, + MongoOperationTimeoutError, + TimeoutContext +} from '../../mongodb'; +import { + clearFailPoint, + configureFailPoint, + makeMultiBatchWrite, + makeMultiResponseBatchModelArray +} from '../../tools/utils'; +import { filterForCommands } from '../shared'; + +const metadata: MongoDBMetadataUI = { + requires: { + mongodb: '>=8.0', + serverless: 'forbid' + } +}; + +describe('Client Bulk Write', function () { + let client: MongoClient; + + afterEach(async function () { + await client?.close(); + await clearFailPoint(this.configuration); + }); + + describe('CSOT enabled', function () { + describe('when timeoutMS is set on the client', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 300 }); + await client.connect(); + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { + const timeoutError = await client + .bulkWrite([ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ]) + .catch(e => e); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('when timeoutMS is set on the bulkWrite operation', function () { + beforeEach(async function () { + client = this.configuration.newClient({}); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('when timeoutMS is set on both the client and operation options', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 1500 }); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('bulk write options take precedence over the client options', metadata, async function () { + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe( + 'unacknowledged writes', + { + requires: { + mongodb: '>=8.0', + topology: 'single' + } + }, + function () { + let connection: Connection; + let pool: ConnectionPool; + + beforeEach(async function () { + client = this.configuration.newClient({}, { maxPoolSize: 1, waitQueueTimeoutMS: 2000 }); + + await client.connect(); + + pool = Array.from(client.topology.s.servers.values())[0].pool; + connection = await pool.checkOut({ + timeoutContext: TimeoutContext.create({ + serverSelectionTimeoutMS: 30000, + waitQueueTimeoutMS: 1000 + }) + }); + }); + + afterEach(async function () { + pool = Array.from(client.topology.s.servers.values())[0].pool; + pool.checkIn(connection); + await client.close(); + }); + + it('respects timeoutMS for a single batch', async function () { + const timeoutError = client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 200, writeConcern: { w: 0 } } + ) + .catch(e => e); + + await setTimeout(250); + + const error = await timeoutError; + expect(error).to.be.instanceOf(MongoOperationTimeoutError); + }); + + it( + 'timeoutMS applies to all batches', + { + requires: { + mongodb: '>=8.0', + topology: 'single' + } + }, + async function () { + const models = await makeMultiBatchWrite(this.configuration); + const timeoutError = client + .bulkWrite(models, { + timeoutMS: 400, + writeConcern: { w: 0 } + }) + .catch(e => e); + + await setTimeout(210); + + pool.checkIn(connection); + connection = await pool.checkOut({ + timeoutContext: TimeoutContext.create({ + serverSelectionTimeoutMS: 30000, + waitQueueTimeoutMS: 1000 + }) + }); + + await setTimeout(210); + + const error = await timeoutError; + expect(error).to.be.instanceOf(MongoOperationTimeoutError); + } + ); + } + ); + + describe('acknowledged writes', metadata, function () { + describe('when a bulk write command times out', function () { + beforeEach(async function () { + client = this.configuration.newClient({}, { timeoutMS: 1500 }); + + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1000, failCommands: ['bulkWrite'] } + }); + }); + + it('the operation times out', metadata, async function () { + const timeoutError = await client + .bulkWrite( + [ + { + name: 'insertOne', + namespace: 'foo.bar', + document: { age: 10 } + } + ], + { timeoutMS: 300 } + ) + .catch(e => e); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('when the timeout is reached while iterating the result cursor', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + client.on('commandStarted', filterForCommands('getMore', commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { blockConnection: true, blockTimeMS: 1400, failCommands: ['getMore'] } + }); + }); + + it('the bulk write operation times out', metadata, async function () { + const models = await makeMultiResponseBatchModelArray(this.configuration); + const timeoutError = await client + .bulkWrite(models, { + verboseResults: true, + timeoutMS: 1500 + }) + .catch(e => e); + + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + expect(commands).to.have.lengthOf(1); + }); + }); + + describe('if the cursor encounters and error and a killCursors is sent', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + + client.on('commandStarted', filterForCommands(['killCursors'], commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + blockConnection: true, + blockTimeMS: 3000, + failCommands: ['getMore', 'killCursors'] + } + }); + }); + + it( + 'timeoutMS is refreshed to the timeoutMS passed to the bulk write for the killCursors command', + metadata, + async function () { + const models = await makeMultiResponseBatchModelArray(this.configuration); + const timeoutError = await client + .bulkWrite(models, { ordered: true, timeoutMS: 2800, verboseResults: true }) + .catch(e => e); + + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + + const [ + { + command: { maxTimeMS } + } + ] = commands; + expect(maxTimeMS).to.be.greaterThan(1000); + } + ); + }); + + describe('when the bulk write is executed in multiple batches', function () { + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + await client.connect(); + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { blockConnection: true, blockTimeMS: 1010, failCommands: ['bulkWrite'] } + }); + }); + + it( + 'timeoutMS applies to the duration of all batches', + { + requires: { + ...metadata.requires, + topology: 'single' + } + }, + async function () { + const models = await makeMultiBatchWrite(this.configuration); + const timeoutError = await client + .bulkWrite(models, { + timeoutMS: 2000 + }) + .catch(e => e); + expect(timeoutError, timeoutError.stack).to.be.instanceOf(MongoOperationTimeoutError); + + expect(commands.length, 'Test must execute two batches.').to.equal(2); + } + ); + }); + }); + }); +}); diff --git a/test/integration/crud/find_cursor_methods.test.js b/test/integration/crud/find_cursor_methods.test.js index 42eeda3e816..21a6649bf0b 100644 --- a/test/integration/crud/find_cursor_methods.test.js +++ b/test/integration/crud/find_cursor_methods.test.js @@ -1,7 +1,13 @@ 'use strict'; const { expect } = require('chai'); const { filterForCommands } = require('../shared'); -const { promiseWithResolvers, MongoCursorExhaustedError } = require('../../mongodb'); +const { + promiseWithResolvers, + MongoCursorExhaustedError, + CursorTimeoutContext, + TimeoutContext, + MongoAPIError +} = require('../../mongodb'); describe('Find Cursor', function () { let client; @@ -246,23 +252,45 @@ describe('Find Cursor', function () { }); context('#rewind', function () { - it('should rewind a cursor', function (done) { + it('should rewind a cursor', async function () { const coll = client.db().collection('abstract_cursor'); const cursor = coll.find({}); - this.defer(() => cursor.close()); - cursor.toArray((err, docs) => { - expect(err).to.not.exist; - expect(docs).to.have.length(6); + try { + let docs = await cursor.toArray(); + expect(docs).to.have.lengthOf(6); cursor.rewind(); - cursor.toArray((err, docs) => { - expect(err).to.not.exist; - expect(docs).to.have.length(6); + docs = await cursor.toArray(); + expect(docs).to.have.lengthOf(6); + } finally { + await cursor.close(); + } + }); - done(); - }); - }); + it('throws if the cursor does not own its timeoutContext', async function () { + const coll = client.db().collection('abstract_cursor'); + const cursor = coll.find( + {}, + { + timeoutContext: new CursorTimeoutContext( + TimeoutContext.create({ + timeoutMS: 1000, + serverSelectionTimeoutMS: 1000 + }), + Symbol() + ) + } + ); + + try { + cursor.rewind(); + expect.fail(`rewind should have thrown.`); + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + } finally { + await cursor.close(); + } }); it('should end an implicit session on rewind', { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index a5e7fba13dd..136e72a3499 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -7,12 +7,17 @@ import { inspect } from 'util'; import { AbstractCursor, type Collection, + CursorTimeoutContext, + CursorTimeoutMode, type FindCursor, MongoAPIError, type MongoClient, MongoCursorExhaustedError, - MongoServerError + MongoOperationTimeoutError, + MongoServerError, + TimeoutContext } from '../../mongodb'; +import { type FailPoint } from '../../tools/utils'; describe('class AbstractCursor', function () { describe('regression tests NODE-5372', function () { @@ -395,4 +400,114 @@ describe('class AbstractCursor', function () { expect(nextSpy.callCount).to.be.lessThan(numDocuments); }); }); + + describe('externally provided timeout contexts', function () { + let client: MongoClient; + let collection: Collection; + let context: CursorTimeoutContext; + + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + context = new CursorTimeoutContext( + TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }), + Symbol() + ); + + await collection.insertMany([{ a: 1 }, { b: 2 }, { c: 3 }]); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + describe('when timeoutMode != LIFETIME', function () { + it('an error is thrown', function () { + expect(() => + collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.ITERATION } + ) + ).to.throw( + `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME` + ); + }); + }); + + describe('when timeoutMode is omitted', function () { + it('stores timeoutContext as the timeoutContext on the cursor', function () { + const cursor = collection.find({}, { timeoutContext: context, timeoutMS: 1000 }); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when timeoutMode is LIFETIME', function () { + it('stores timeoutContext as the timeoutContext on the cursor', function () { + const cursor = collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.LIFETIME } + ); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when the cursor is initialized', function () { + it('the provided timeoutContext is not overwritten', async function () { + const cursor = collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.LIFETIME } + ); + + await cursor.toArray(); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when the cursor refreshes the timeout for killCursors', function () { + it( + 'the provided timeoutContext is not modified', + { + requires: { + mongodb: '>=4.4' + } + }, + async function () { + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + blockConnection: true, + blockTimeMS: 5000 + } + } as FailPoint); + + const cursor = collection.find( + {}, + { + timeoutContext: context, + timeoutMS: 1000, + timeoutMode: CursorTimeoutMode.LIFETIME, + batchSize: 1 + } + ); + + const error = await cursor.toArray().catch(e => e); + + expect(error).to.be.instanceof(MongoOperationTimeoutError); + // @ts-expect-error We know we have a CSOT timeout context but TS does not. + expect(context.timeoutContext.remainingTimeMS).to.be.lessThan(0); + } + ); + }); + }); }); diff --git a/test/integration/server-selection/server_selection.prose.operation_count.test.ts b/test/integration/server-selection/server_selection.prose.operation_count.test.ts index fec6d24e61c..b4a7d9bf47b 100644 --- a/test/integration/server-selection/server_selection.prose.operation_count.test.ts +++ b/test/integration/server-selection/server_selection.prose.operation_count.test.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import { on } from 'events'; import { type Collection, @@ -7,7 +6,7 @@ import { HostAddress, type MongoClient } from '../../mongodb'; -import { sleep } from '../../tools/utils'; +import { waitUntilPoolsFilled } from '../../tools/utils'; const failPoint = { configureFailPoint: 'failCommand', @@ -28,17 +27,6 @@ async function runTaskGroup(collection: Collection, count: 10 | 100 | 1000) { } } -async function ensurePoolIsFull(client: MongoClient): Promise { - let connectionCount = 0; - - for await (const _event of on(client, 'connectionCreated')) { - connectionCount++; - if (connectionCount === POOL_SIZE * 2) { - break; - } - } -} - // Step 1: Configure a sharded cluster with two mongoses. Use a 4.2.9 or newer server version. const TEST_METADATA: MongoDBMetadataUI = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } }; @@ -75,15 +63,8 @@ describe('operationCount-based Selection Within Latency Window - Prose Test', fu client.on('commandStarted', updateCount); - const poolIsFullPromise = ensurePoolIsFull(client); - - await client.connect(); - // Step 4: Using CMAP events, ensure the client's connection pools for both mongoses have been saturated - const poolIsFull = Promise.race([poolIsFullPromise, sleep(30 * 1000)]); - if (!poolIsFull) { - throw new Error('Timed out waiting for connection pool to fill to minPoolSize'); - } + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), POOL_SIZE * 2); seeds = client.topology.s.seedlist.map(address => address.toString()); diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index 1d637486226..16024638fba 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -7,6 +7,7 @@ import { type AuthMechanism, HostAddress, MongoClient, + type MongoClientOptions, type ServerApi, TopologyType, type WriteConcernSettings @@ -82,7 +83,7 @@ export class TestConfiguration { auth?: { username: string; password: string; authSource?: string }; proxyURIParams?: ProxyParams; }; - serverApi: ServerApi; + serverApi?: ServerApi; activeResources: number; isSrv: boolean; serverlessCredentials: { username: string | undefined; password: string | undefined }; @@ -171,13 +172,34 @@ export class TestConfiguration { return this.options.replicaSet; } + /** + * Returns a `hello`, executed against `uri`. + */ + async hello(uri = this.uri) { + const client = this.newClient(uri); + try { + await client.connect(); + const { maxBsonObjectSize, maxMessageSizeBytes, maxWriteBatchSize, ...rest } = await client + .db('admin') + .command({ hello: 1 }); + return { + maxBsonObjectSize, + maxMessageSizeBytes, + maxWriteBatchSize, + ...rest + }; + } finally { + await client.close(); + } + } + isOIDC(uri: string, env: string): boolean { if (!uri) return false; return uri.indexOf('MONGODB-OIDC') > -1 && uri.indexOf(`ENVIRONMENT:${env}`) > -1; } - newClient(urlOrQueryOptions?: string | Record, serverOptions?: Record) { - serverOptions = Object.assign({}, getEnvironmentalOptions(), serverOptions); + newClient(urlOrQueryOptions?: string | Record, serverOptions?: MongoClientOptions) { + serverOptions = Object.assign({}, getEnvironmentalOptions(), serverOptions); // Support MongoClient constructor form (url, options) for `newClient`. if (typeof urlOrQueryOptions === 'string') { diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 3cb50d2cd51..8ebc5e8f532 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,5 +1,5 @@ import * as child_process from 'node:child_process'; -import { once } from 'node:events'; +import { on, once } from 'node:events'; import * as fs from 'node:fs/promises'; import * as path from 'node:path'; @@ -11,6 +11,7 @@ import { setTimeout } from 'timers'; import { inspect, promisify } from 'util'; import { + type AnyClientBulkWriteModel, type Document, type HostAddress, MongoClient, @@ -18,6 +19,7 @@ import { Topology, type TopologyOptions } from '../mongodb'; +import { type TestConfiguration } from './runner/config'; import { runUnifiedSuite } from './unified-spec-runner/runner'; import { type CollectionData, @@ -568,3 +570,98 @@ export async function itInNodeProcess( } }); } + +/** + * Connects the client and waits until `client` has emitted `count` connectionCreated events. + * + * **This will hang if the client does not have a maxPoolSizeSet!** + * + * This is useful when you want to ensure that the client has pools that are full of connections. + * + * This does not guarantee that all pools that the client has are completely full unless + * count = number of servers to which the client is connected * maxPoolSize. But it can + * serve as a way to ensure that some connections have been established and are in the pools. + */ +export async function waitUntilPoolsFilled( + client: MongoClient, + signal: AbortSignal, + count: number = client.s.options.maxPoolSize +): Promise { + let connectionCount = 0; + + async function wait$() { + for await (const _event of on(client, 'connectionCreated', { signal })) { + connectionCount++; + if (connectionCount >= count) { + break; + } + } + } + + await Promise.all([wait$(), client.connect()]); +} + +export async function configureFailPoint(configuration: TestConfiguration, failPoint: FailPoint) { + const utilClient = configuration.newClient(); + await utilClient.connect(); + + try { + await utilClient.db('admin').command(failPoint); + } finally { + await utilClient.close(); + } +} + +export async function clearFailPoint(configuration: TestConfiguration) { + const utilClient = configuration.newClient(); + await utilClient.connect(); + + try { + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'off' + }); + } finally { + await utilClient.close(); + } +} + +export async function makeMultiBatchWrite( + configuration: TestConfiguration +): Promise { + const { maxBsonObjectSize, maxMessageSizeBytes } = await configuration.hello(); + + const length = maxMessageSizeBytes / maxBsonObjectSize + 1; + const models = Array.from({ length }, () => ({ + namespace: 'db.coll', + name: 'insertOne' as const, + document: { a: 'b'.repeat(maxBsonObjectSize - 500) } + })); + + return models; +} + +export async function makeMultiResponseBatchModelArray( + configuration: TestConfiguration +): Promise { + const { maxBsonObjectSize } = await configuration.hello(); + const namespace = `foo.${new BSON.ObjectId().toHexString()}`; + const models: AnyClientBulkWriteModel[] = [ + { + name: 'updateOne', + namespace, + update: { $set: { age: 1 } }, + upsert: true, + filter: { _id: 'a'.repeat(maxBsonObjectSize / 2) } + }, + { + name: 'updateOne', + namespace, + update: { $set: { age: 1 } }, + upsert: true, + filter: { _id: 'b'.repeat(maxBsonObjectSize / 2) } + } + ]; + + return models; +}