diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d0f386923ad..72f1e33fd3c 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { type AsyncDisposable, configureResourceManagement } from '../resource_management'; import type { Server } from '../sdam/server'; import { ClientSession, maybeClearPinnedConnection } from '../sessions'; -import { TimeoutContext } from '../timeout'; +import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout'; import { type MongoDBNamespace, squashError } from '../utils'; /** @@ -119,6 +119,9 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { timeoutMS?: number; /** @internal TODO(NODE-5688): make this public */ timeoutMode?: CursorTimeoutMode; + + /** @internal */ + timeoutContext?: CursorTimeoutContext; } /** @internal */ @@ -140,6 +143,46 @@ export type AbstractCursorEvents = { [AbstractCursor.CLOSE](): void; }; +export class CursorTimeoutContext extends TimeoutContext { + constructor( + public timeoutContext: TimeoutContext, + public owner: AbstractCursor | null = null + ) { + super(); + } + + override get serverSelectionTimeout(): Timeout | null { + return this.timeoutContext.serverSelectionTimeout; + } + override get connectionCheckoutTimeout(): Timeout | null { + return this.timeoutContext.connectionCheckoutTimeout; + } + override get clearServerSelectionTimeout(): boolean { + return this.timeoutContext.clearServerSelectionTimeout; + } + override get clearConnectionCheckoutTimeout(): boolean { + return this.timeoutContext.clearConnectionCheckoutTimeout; + } + override get timeoutForSocketWrite(): Timeout | null { + return this.timeoutContext.timeoutForSocketWrite; + } + override get timeoutForSocketRead(): Timeout | null { + return this.timeoutContext.timeoutForSocketRead; + } + override csotEnabled(): this is CSOTTimeoutContext { + return this.timeoutContext.csotEnabled(); + } + override refresh(): void { + return this.timeoutContext.refresh(); + } + override clear(): void { + return this.timeoutContext.clear(); + } + override refreshed(): TimeoutContext { + return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner); + } +} + /** @public */ export abstract class AbstractCursor< TSchema = any, @@ -171,7 +214,7 @@ export abstract class AbstractCursor< /** @internal */ protected readonly cursorOptions: InternalAbstractCursorOptions; /** @internal */ - protected timeoutContext?: TimeoutContext; + protected timeoutContext?: CursorTimeoutContext; /** @event */ static readonly CLOSE = 'close' as const; @@ -264,6 +307,8 @@ export abstract class AbstractCursor< utf8: options?.enableUtf8Validation === false ? false : true } }; + + this.timeoutContext = options.timeoutContext; } /** @@ -790,10 +835,13 @@ export abstract class AbstractCursor< */ private async cursorInit(): Promise { if (this.cursorOptions.timeoutMS != null) { - this.timeoutContext = TimeoutContext.create({ - serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, - timeoutMS: this.cursorOptions.timeoutMS - }); + this.timeoutContext = new CursorTimeoutContext( + TimeoutContext.create({ + serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, + timeoutMS: this.cursorOptions.timeoutMS + }), + this + ); } try { const state = await this._initialize(this.cursorSession); diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index 06f34dfc52f..0bdc04345e8 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor { constructor( client: MongoClient, commandBuilder: ClientBulkWriteCommandBuilder, - options: ClientBulkWriteOptions = {} + options: ClientBulkWriteOptions & AbstractCursorOptions = {} ) { super(client, new MongoDBNamespace('admin', '$cmd'), options); diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 092e9418b3a..9906301745a 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -493,7 +493,7 @@ export class MongoClient extends TypedEventEmitter implements models: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ): Promise { - return await new ClientBulkWriteExecutor(this, models, options).execute(); + return await new ClientBulkWriteExecutor(this, models, resolveOptions(this, options)).execute(); } /** diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 5baf1ed6b6e..20e1e6c32e0 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,5 +1,7 @@ +import { CursorTimeoutContext } from '../../cursor/abstract_cursor'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; import { type MongoClient } from '../../mongo_client'; +import { TimeoutContext } from '../../timeout'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; import { ClientBulkWriteOperation } from './client_bulk_write'; @@ -55,11 +57,18 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); + + const timeoutContext = TimeoutContext.create({ + ...this.options, + serverSelectionTimeoutMS: this.client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS, + socketTimeoutMS: this.client.s.options.socketTimeoutMS + }); // Unacknowledged writes need to execute all batches and return { ok: 1} if (this.options.writeConcern?.w === 0) { while (commandBuilder.hasNextBatch()) { const operation = new ClientBulkWriteOperation(commandBuilder, this.options); - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, timeoutContext); } return { ok: 1 }; } else { @@ -67,7 +76,10 @@ export class ClientBulkWriteExecutor { // For each command will will create and exhaust a cursor for the results. let currentBatchOffset = 0; while (commandBuilder.hasNextBatch()) { - const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, { + ...this.options, + timeoutContext: new CursorTimeoutContext(timeoutContext) + }); const docs = await cursor.toArray(); const operations = cursor.operations; resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); diff --git a/src/timeout.ts b/src/timeout.ts index f7fb3d0daa5..caa3b30dde3 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -195,6 +195,8 @@ export abstract class TimeoutContext { abstract refresh(): void; abstract clear(): void; + + abstract refreshed(): TimeoutContext; } /** @internal */ @@ -305,6 +307,10 @@ export class CSOTTimeoutContext extends TimeoutContext { this._serverSelectionTimeout?.clear(); this._connectionCheckoutTimeout?.clear(); } + + override refreshed(): CSOTTimeoutContext { + return new CSOTTimeoutContext(this); + } } /** @internal */ @@ -351,4 +357,8 @@ export class LegacyTimeoutContext extends TimeoutContext { clear(): void { return; } + + override refreshed(): LegacyTimeoutContext { + return new LegacyTimeoutContext(this.options); + } }