From 1c489705bebcc9fe47291b2aaa054c635c06b057 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 9 Sep 2024 11:11:11 -0400 Subject: [PATCH] feat(NODE-6313): add CSOT support to sessions and transactions (#4199) --- package-lock.json | 9 +- package.json | 2 +- src/cmap/connection.ts | 7 + src/cmap/wire_protocol/on_data.ts | 15 +- src/collection.ts | 12 +- src/db.ts | 22 +- src/error.ts | 3 + src/operations/execute_operation.ts | 8 +- src/sessions.ts | 253 ++++++++++++------ src/timeout.ts | 49 +++- src/transactions.ts | 7 +- src/utils.ts | 13 +- ...ient_side_operations_timeout.prose.test.ts | 167 +++++++++++- ...lient_side_operations_timeout.spec.test.ts | 18 +- .../node_csot.test.ts | 150 +++++++++++ .../sessions-inherit-timeoutMS.json | 28 +- .../sessions-inherit-timeoutMS.yml | 19 +- ...sessions-override-operation-timeoutMS.json | 32 ++- .../sessions-override-operation-timeoutMS.yml | 23 +- .../sessions-override-timeoutMS.json | 28 +- .../sessions-override-timeoutMS.yml | 19 +- test/tools/unified-spec-runner/entities.ts | 4 + test/tools/unified-spec-runner/match.ts | 19 +- test/tools/unified-spec-runner/operations.ts | 27 +- 24 files changed, 739 insertions(+), 195 deletions(-) diff --git a/package-lock.json b/package-lock.json index e4c84ca29c..f2f85f68e2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -51,7 +51,7 @@ "mocha": "^10.4.0", "mocha-sinon": "^2.1.2", "mongodb-client-encryption": "^6.1.0", - "mongodb-legacy": "^6.0.1", + "mongodb-legacy": "^6.1.1", "nyc": "^15.1.0", "prettier": "^2.8.8", "semver": "^7.6.0", @@ -7754,10 +7754,11 @@ } }, "node_modules/mongodb-legacy": { - "version": "6.0.1", - "resolved": "https://registry.npmjs.org/mongodb-legacy/-/mongodb-legacy-6.0.1.tgz", - "integrity": "sha512-cbm3UOqAYo6yElNk9CHNp5tQwRl1PCpBpcSkVvlvyg7S+gG6vSt1zrFiEniNaWOwwWnIr/IsAeSo48Nm701B/A==", + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/mongodb-legacy/-/mongodb-legacy-6.1.1.tgz", + "integrity": "sha512-u9Cl8UEzdtf7mhWrAEHHhfU0OCqahaOB5midwtyudWIuEz5t18DJFXfqJq3cbEypVfLkfF3zi6rkolKMU9uPjQ==", "dev": true, + "license": "Apache-2.0", "dependencies": { "mongodb": "^6.0.0" }, diff --git a/package.json b/package.json index c57bbe76bb..1ab28643be 100644 --- a/package.json +++ b/package.json @@ -99,7 +99,7 @@ "mocha": "^10.4.0", "mocha-sinon": "^2.1.2", "mongodb-client-encryption": "^6.1.0", - "mongodb-legacy": "^6.0.1", + "mongodb-legacy": "^6.1.1", "nyc": "^15.1.0", "prettier": "^2.8.8", "semver": "^7.6.0", diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index a5ab7071ef..4d3a974080 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -737,6 +737,13 @@ export class Connection extends TypedEventEmitter { return; } } + } catch (readError) { + if (TimeoutError.is(readError)) { + throw new MongoOperationTimeoutError( + `Timed out during socket read (${readError.duration}ms)` + ); + } + throw readError; } finally { this.dataEvents = null; this.throwIfAborted(); diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index a32c6b1b48..23fd88e282 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -1,7 +1,6 @@ import { type EventEmitter } from 'events'; -import { MongoOperationTimeoutError } from '../../error'; -import { type TimeoutContext, TimeoutError } from '../../timeout'; +import { type TimeoutContext } from '../../timeout'; import { List, promiseWithResolvers } from '../../utils'; /** @@ -91,8 +90,11 @@ export function onData( // Adding event handlers emitter.on('data', eventHandler); emitter.on('error', errorHandler); + + const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead; + timeoutForSocketRead?.throwIfExpired(); // eslint-disable-next-line github/no-then - timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler); + timeoutForSocketRead?.then(undefined, errorHandler); return iterator; @@ -104,12 +106,9 @@ export function onData( function errorHandler(err: Error) { const promise = unconsumedPromises.shift(); - const timeoutError = TimeoutError.is(err) - ? new MongoOperationTimeoutError('Timed out during socket read') - : undefined; - if (promise != null) promise.reject(timeoutError ?? err); - else error = timeoutError ?? err; + if (promise != null) promise.reject(err); + else error = err; void closeHandler(); } diff --git a/src/collection.ts b/src/collection.ts index d7dd7b43ac..3ad150ac76 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -469,10 +469,14 @@ export class Collection { // Intentionally, we do not inherit options from parent for this operation. return await executeOperation( this.client, - new RenameOperation(this as TODO_NODE_3286, newName, { - ...options, - readPreference: ReadPreference.PRIMARY - }) as TODO_NODE_3286 + new RenameOperation( + this as TODO_NODE_3286, + newName, + resolveOptions(undefined, { + ...options, + readPreference: ReadPreference.PRIMARY + }) + ) as TODO_NODE_3286 ); } diff --git a/src/db.ts b/src/db.ts index 07a0c928cc..cbb0eac13f 100644 --- a/src/db.ts +++ b/src/db.ts @@ -275,12 +275,16 @@ export class Db { // Intentionally, we do not inherit options from parent for this operation. return await executeOperation( this.client, - new RunCommandOperation(this, command, { - ...resolveBSONOptions(options), - timeoutMS: options?.timeoutMS ?? this.timeoutMS, - session: options?.session, - readPreference: options?.readPreference - }) + new RunCommandOperation( + this, + command, + resolveOptions(undefined, { + ...resolveBSONOptions(options), + timeoutMS: options?.timeoutMS ?? this.timeoutMS, + session: options?.session, + readPreference: options?.readPreference + }) + ) ); } @@ -385,7 +389,11 @@ export class Db { new RenameOperation( this.collection(fromCollection) as TODO_NODE_3286, toCollection, - { ...options, new_collection: true, readPreference: ReadPreference.primary } + resolveOptions(undefined, { + ...options, + new_collection: true, + readPreference: ReadPreference.primary + }) ) as TODO_NODE_3286 ); } diff --git a/src/error.ts b/src/error.ts index b526ba24f9..4435e90f0a 100644 --- a/src/error.ts +++ b/src/error.ts @@ -124,6 +124,9 @@ function isAggregateError(e: unknown): e is Error & { errors: Error[] } { * mongodb-client-encryption has a dependency on this error, it uses the constructor with a string argument */ export class MongoError extends Error { + get [Symbol.toStringTag]() { + return this.name; + } /** @internal */ [kErrorLabels]: Set; /** diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 695e4ab92a..4ec6d88459 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -59,7 +59,7 @@ type ResultTypeFromOperation = TOperation extends AbstractOperation< export async function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(client: MongoClient, operation: T, timeoutContext?: TimeoutContext): Promise { +>(client: MongoClient, operation: T, timeoutContext?: TimeoutContext | null): Promise { if (!(operation instanceof AbstractOperation)) { // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); @@ -82,11 +82,6 @@ export async function executeOperation< } else if (session.client !== client) { throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient'); } - if (session.explicit && session?.timeoutMS != null && operation.options.timeoutMS != null) { - throw new MongoInvalidArgumentError( - 'Do not specify timeoutMS on operation if already specified on an explicit session' - ); - } const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -108,6 +103,7 @@ export async function executeOperation< } timeoutContext ??= TimeoutContext.create({ + session, serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, timeoutMS: operation.options.timeoutMS diff --git a/src/sessions.ts b/src/sessions.ts index 1fc88b468f..bbd1785275 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -29,6 +29,7 @@ import { ReadConcernLevel } from './read_concern'; import { ReadPreference } from './read_preference'; import { type AsyncDisposable, configureResourceManagement } from './resource_management'; import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common'; +import { TimeoutContext } from './timeout'; import { isTransactionCommand, Transaction, @@ -58,8 +59,11 @@ export interface ClientSessionOptions { snapshot?: boolean; /** The default TransactionOptions to use for transactions started on this session. */ defaultTransactionOptions?: TransactionOptions; - /** @internal - * The value of timeoutMS used for CSOT. Used to override client timeoutMS */ + /** + * @public + * An overriding timeoutMS value to use for a client-side timeout. + * If not provided the session uses the timeoutMS specified on the MongoClient. + */ defaultTimeoutMS?: number; /** @internal */ @@ -98,6 +102,9 @@ export interface EndSessionOptions { error?: AnyError; force?: boolean; forceClear?: boolean; + + /** @internal */ + timeoutMS?: number; } /** @@ -115,7 +122,7 @@ export class ClientSession /** @internal */ sessionPool: ServerSessionPool; hasEnded: boolean; - clientOptions?: MongoOptions; + clientOptions: MongoOptions; supports: { causalConsistency: boolean }; clusterTime?: ClusterTime; operationTime?: Timestamp; @@ -137,6 +144,9 @@ export class ClientSession /** @internal */ timeoutMS?: number; + /** @internal */ + public timeoutContext: TimeoutContext | null = null; + /** * Create a client session. * @internal @@ -149,7 +159,7 @@ export class ClientSession client: MongoClient, sessionPool: ServerSessionPool, options: ClientSessionOptions, - clientOptions?: MongoOptions + clientOptions: MongoOptions ) { super(); @@ -269,8 +279,13 @@ export class ClientSession async endSession(options?: EndSessionOptions): Promise { try { if (this.inTransaction()) { - await this.abortTransaction(); + await this.abortTransaction({ ...options, throwTimeout: true }); } + } catch (error) { + // spec indicates that we should ignore all errors for `endSessions` + if (error.name === 'MongoOperationTimeoutError') throw error; + squashError(error); + } finally { if (!this.hasEnded) { const serverSession = this[kServerSession]; if (serverSession != null) { @@ -286,10 +301,6 @@ export class ClientSession this.hasEnded = true; this.emit('ended', this); } - } catch (error) { - // spec indicates that we should ignore all errors for `endSessions` - squashError(error); - } finally { maybeClearPinnedConnection(this, { force: true, ...options }); } } @@ -441,8 +452,10 @@ export class ClientSession /** * Commits the currently active transaction in this session. + * + * @param options - Optional options, can be used to override `defaultTimeoutMS`. */ - async commitTransaction(): Promise { + async commitTransaction(options?: { timeoutMS?: number }): Promise { if (this.transaction.state === TxnState.NO_TRANSACTION) { throw new MongoTransactionError('No transaction started'); } @@ -492,8 +505,25 @@ export class ClientSession bypassPinningCheck: true }); + const timeoutMS = + typeof options?.timeoutMS === 'number' + ? options.timeoutMS + : typeof this.timeoutMS === 'number' + ? this.timeoutMS + : null; + + const timeoutContext = + this.timeoutContext ?? + (typeof timeoutMS === 'number' + ? TimeoutContext.create({ + serverSelectionTimeoutMS: this.clientOptions.serverSelectionTimeoutMS, + socketTimeoutMS: this.clientOptions.socketTimeoutMS, + timeoutMS + }) + : null); + try { - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, timeoutContext); return; } catch (firstCommitError) { if (firstCommitError instanceof MongoError && isRetryableWriteError(firstCommitError)) { @@ -503,7 +533,7 @@ export class ClientSession this.unpin({ force: true }); try { - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, timeoutContext); return; } catch (retryCommitError) { // If the retry failed, we process that error instead of the original @@ -535,8 +565,13 @@ export class ClientSession /** * Aborts the currently active transaction in this session. + * + * @param options - Optional options, can be used to override `defaultTimeoutMS`. */ - async abortTransaction(): Promise { + async abortTransaction(options?: { timeoutMS?: number }): Promise; + /** @internal */ + async abortTransaction(options?: { timeoutMS?: number; throwTimeout?: true }): Promise; + async abortTransaction(options?: { timeoutMS?: number; throwTimeout?: true }): Promise { if (this.transaction.state === TxnState.NO_TRANSACTION) { throw new MongoTransactionError('No transaction started'); } @@ -581,18 +616,45 @@ export class ClientSession bypassPinningCheck: true }); + const timeoutMS = + typeof options?.timeoutMS === 'number' + ? options.timeoutMS + : this.timeoutContext?.csotEnabled() + ? this.timeoutContext.timeoutMS // refresh timeoutMS for abort operation + : typeof this.timeoutMS === 'number' + ? this.timeoutMS + : null; + + const timeoutContext = + timeoutMS != null + ? TimeoutContext.create({ + timeoutMS, + serverSelectionTimeoutMS: this.clientOptions.serverSelectionTimeoutMS, + socketTimeoutMS: this.clientOptions.socketTimeoutMS + }) + : null; + try { - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, timeoutContext); this.unpin(); return; } catch (firstAbortError) { this.unpin(); + if (firstAbortError.name === 'MongoRuntimeError') throw firstAbortError; + if (options?.throwTimeout && firstAbortError.name === 'MongoOperationTimeoutError') { + throw firstAbortError; + } + if (firstAbortError instanceof MongoError && isRetryableWriteError(firstAbortError)) { try { - await executeOperation(this.client, operation); + await executeOperation(this.client, operation, timeoutContext); return; } catch (secondAbortError) { + if (secondAbortError.name === 'MongoRuntimeError') throw secondAbortError; + if (options?.throwTimeout && secondAbortError.name === 'MongoOperationTimeoutError') { + throw secondAbortError; + } // we do not retry the retry } } @@ -647,96 +709,119 @@ export class ClientSession */ async withTransaction( fn: WithTransactionCallback, - options?: TransactionOptions + options?: TransactionOptions & { + /** + * Configures a timeoutMS expiry for the entire withTransactionCallback. + * + * @remarks + * - The remaining timeout will not be applied to callback operations that do not use the ClientSession. + * - Overriding timeoutMS for operations executed using the explicit session inside the provided callback will result in a client-side error. + */ + timeoutMS?: number; + } ): Promise { const MAX_TIMEOUT = 120000; - const startTime = now(); - - let committed = false; - let result: any; - while (!committed) { - this.startTransaction(options); // may throw on error + const timeoutMS = options?.timeoutMS ?? this.timeoutMS ?? null; + this.timeoutContext = + timeoutMS != null + ? TimeoutContext.create({ + timeoutMS, + serverSelectionTimeoutMS: this.clientOptions.serverSelectionTimeoutMS, + socketTimeoutMS: this.clientOptions.socketTimeoutMS + }) + : null; - try { - const promise = fn(this); - if (!isPromiseLike(promise)) { - throw new MongoInvalidArgumentError( - 'Function provided to `withTransaction` must return a Promise' - ); - } + const startTime = this.timeoutContext?.csotEnabled() ? this.timeoutContext.start : now(); - result = await promise; + let committed = false; + let result: any; - if ( - this.transaction.state === TxnState.NO_TRANSACTION || - this.transaction.state === TxnState.TRANSACTION_COMMITTED || - this.transaction.state === TxnState.TRANSACTION_ABORTED - ) { - // Assume callback intentionally ended the transaction - return result; - } - } catch (fnError) { - if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) { - await this.abortTransaction(); - throw fnError; - } + try { + while (!committed) { + this.startTransaction(options); // may throw on error - if ( - this.transaction.state === TxnState.STARTING_TRANSACTION || - this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS - ) { - await this.abortTransaction(); - } + try { + const promise = fn(this); + if (!isPromiseLike(promise)) { + throw new MongoInvalidArgumentError( + 'Function provided to `withTransaction` must return a Promise' + ); + } - if ( - fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - now() - startTime < MAX_TIMEOUT - ) { - continue; - } + result = await promise; - throw fnError; - } + if ( + this.transaction.state === TxnState.NO_TRANSACTION || + this.transaction.state === TxnState.TRANSACTION_COMMITTED || + this.transaction.state === TxnState.TRANSACTION_ABORTED + ) { + // Assume callback intentionally ended the transaction + return result; + } + } catch (fnError) { + if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) { + await this.abortTransaction(); + throw fnError; + } - while (!committed) { - try { - /* - * We will rely on ClientSession.commitTransaction() to - * apply a majority write concern if commitTransaction is - * being retried (see: DRIVERS-601) - */ - await this.commitTransaction(); - committed = true; - } catch (commitError) { - /* - * Note: a maxTimeMS error will have the MaxTimeMSExpired - * code (50) and can be reported as a top-level error or - * inside writeConcernError, ex. - * { ok:0, code: 50, codeName: 'MaxTimeMSExpired' } - * { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } } - */ if ( - !isMaxTimeMSExpiredError(commitError) && - commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) && - now() - startTime < MAX_TIMEOUT + this.transaction.state === TxnState.STARTING_TRANSACTION || + this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS ) { - continue; + await this.abortTransaction(); } if ( - commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - now() - startTime < MAX_TIMEOUT + fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && + (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) ) { - break; + continue; } - throw commitError; + throw fnError; + } + + while (!committed) { + try { + /* + * We will rely on ClientSession.commitTransaction() to + * apply a majority write concern if commitTransaction is + * being retried (see: DRIVERS-601) + */ + await this.commitTransaction(); + committed = true; + } catch (commitError) { + /* + * Note: a maxTimeMS error will have the MaxTimeMSExpired + * code (50) and can be reported as a top-level error or + * inside writeConcernError, ex. + * { ok:0, code: 50, codeName: 'MaxTimeMSExpired' } + * { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } } + */ + if ( + !isMaxTimeMSExpiredError(commitError) && + commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) && + (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + ) { + continue; + } + + if ( + commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && + (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + ) { + break; + } + + throw commitError; + } } } + return result; + } finally { + this.timeoutContext = null; } - - return result; } } diff --git a/src/timeout.ts b/src/timeout.ts index 4007ffa50a..6e2f92b5de 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -1,16 +1,19 @@ import { clearTimeout, setTimeout } from 'timers'; import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error'; +import { type ClientSession } from './sessions'; import { csotMin, noop } from './utils'; /** @internal */ export class TimeoutError extends Error { + duration: number; override get name(): 'TimeoutError' { return 'TimeoutError'; } - constructor(message: string, options?: { cause?: Error }) { + constructor(message: string, options: { cause?: Error; duration: number }) { super(message, options); + this.duration = options.duration; } static is(error: unknown): error is TimeoutError { @@ -52,12 +55,19 @@ export class Timeout extends Promise { } /** Create a new timeout that expires in `duration` ms */ - private constructor(executor: Executor = () => null, duration: number, unref = true) { - let reject!: Reject; + private constructor( + executor: Executor = () => null, + options?: { duration: number; unref?: true; rejection?: Error } + ) { + const duration = options?.duration ?? 0; + const unref = !!options?.unref; + const rejection = options?.rejection; + if (duration < 0) { throw new MongoInvalidArgumentError('Cannot create a Timeout with a negative duration'); } + let reject!: Reject; super((_, promiseReject) => { reject = promiseReject; @@ -67,16 +77,20 @@ export class Timeout extends Promise { this.duration = duration; this.start = Math.trunc(performance.now()); - if (this.duration > 0) { + if (rejection == null && this.duration > 0) { this.id = setTimeout(() => { this.ended = Math.trunc(performance.now()); this.timedOut = true; - reject(new TimeoutError(`Expired after ${duration}ms`)); + reject(new TimeoutError(`Expired after ${duration}ms`, { duration })); }, this.duration); if (typeof this.id.unref === 'function' && unref) { // Ensure we do not keep the Node.js event loop running this.id.unref(); } + } else if (rejection != null) { + this.ended = Math.trunc(performance.now()); + this.timedOut = true; + reject(rejection); } } @@ -90,11 +104,11 @@ export class Timeout extends Promise { } throwIfExpired(): void { - if (this.timedOut) throw new TimeoutError('Timed out'); + if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration }); } - public static expires(durationMS: number, unref?: boolean): Timeout { - return new Timeout(undefined, durationMS, unref); + public static expires(duration: number, unref?: true): Timeout { + return new Timeout(undefined, { duration, unref }); } static is(timeout: unknown): timeout is Timeout { @@ -108,10 +122,16 @@ export class Timeout extends Promise { typeof timeout.then === 'function' ); } + + static override reject(rejection?: Error): Timeout { + return new Timeout(undefined, { duration: 0, unref: true, rejection }); + } } /** @internal */ -export type TimeoutContextOptions = LegacyTimeoutContextOptions | CSOTTimeoutContextOptions; +export type TimeoutContextOptions = (LegacyTimeoutContextOptions | CSOTTimeoutContextOptions) & { + session?: ClientSession; +}; /** @internal */ export type LegacyTimeoutContextOptions = { @@ -152,6 +172,7 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions /** @internal */ export abstract class TimeoutContext { static create(options: TimeoutContextOptions): TimeoutContext { + if (options.session?.timeoutContext != null) return options.session?.timeoutContext; if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options); else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options); else throw new MongoRuntimeError('Unrecognized options'); @@ -184,7 +205,7 @@ export class CSOTTimeoutContext extends TimeoutContext { private _serverSelectionTimeout?: Timeout | null; private _connectionCheckoutTimeout?: Timeout | null; public minRoundTripTime = 0; - private start: number; + public start: number; constructor(options: CSOTTimeoutContextOptions) { super(); @@ -218,8 +239,8 @@ export class CSOTTimeoutContext extends TimeoutContext { if (typeof this._serverSelectionTimeout !== 'object' || this._serverSelectionTimeout?.cleared) { const { remainingTimeMS, serverSelectionTimeoutMS } = this; if (remainingTimeMS <= 0) - throw new MongoOperationTimeoutError( - `Timed out in server selection after ${this.timeoutMS}ms` + return Timeout.reject( + new MongoOperationTimeoutError(`Timed out in server selection after ${this.timeoutMS}ms`) ); const usingServerSelectionTimeoutMS = serverSelectionTimeoutMS !== 0 && @@ -259,14 +280,14 @@ export class CSOTTimeoutContext extends TimeoutContext { const { remainingTimeMS } = this; if (!Number.isFinite(remainingTimeMS)) return null; if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS); - throw new MongoOperationTimeoutError('Timed out before socket write'); + return Timeout.reject(new MongoOperationTimeoutError('Timed out before socket write')); } get timeoutForSocketRead(): Timeout | null { const { remainingTimeMS } = this; if (!Number.isFinite(remainingTimeMS)) return null; if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS); - throw new MongoOperationTimeoutError('Timed out before socket read'); + return Timeout.reject(new MongoOperationTimeoutError('Timed out before socket read')); } } diff --git a/src/transactions.ts b/src/transactions.ts index 7ee8660b97..2fbb0ca42b 100644 --- a/src/transactions.ts +++ b/src/transactions.ts @@ -61,7 +61,7 @@ const COMMITTED_STATES: Set = new Set([ * Configuration options for a transaction. * @public */ -export interface TransactionOptions extends CommandOperationOptions { +export interface TransactionOptions extends Omit { // TODO(NODE-3344): These options use the proper class forms of these settings, it should accept the basic enum values too /** A default read concern for commands in this transaction */ readConcern?: ReadConcernLike; @@ -69,7 +69,10 @@ export interface TransactionOptions extends CommandOperationOptions { writeConcern?: WriteConcern; /** A default read preference for commands in this transaction */ readPreference?: ReadPreferenceLike; - /** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */ + /** + * Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds + * @deprecated This option is deprecated in favor of `timeoutMS` or `defaultTimeoutMS`. + */ maxCommitTimeMS?: number; } diff --git a/src/utils.ts b/src/utils.ts index 5648b226ef..bf635b5628 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -501,6 +501,10 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean { /** * Merge inherited properties from parent into options, prioritizing values from options, * then values from parent. + * + * @param parent - An optional owning class of the operation being run. ex. Db/Collection/MongoClient. + * @param options - The options passed to the operation method. + * * @internal */ export function resolveOptions( @@ -528,9 +532,14 @@ export function resolveOptions( result.readPreference = readPreference; } - const timeoutMS = options?.timeoutMS; + const isConvenientTransaction = session?.explicit && session?.timeoutContext != null; + if (isConvenientTransaction && options?.timeoutMS != null) { + throw new MongoInvalidArgumentError( + 'An operation cannot be given a timeoutMS setting when inside a withTransaction call that has a timeoutMS setting' + ); + } - result.timeoutMS = timeoutMS ?? parent?.timeoutMS; + result.timeoutMS = options?.timeoutMS ?? parent?.timeoutMS; return result; } diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 729bed4219..406aa53ed6 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -1,6 +1,7 @@ /* Specification prose tests */ import { expect } from 'chai'; +import * as semver from 'semver'; import * as sinon from 'sinon'; import { @@ -9,6 +10,7 @@ import { MongoServerSelectionError, now } from '../../mongodb'; +import { type FailPoint } from '../../tools/utils'; // TODO(NODE-5824): Implement CSOT prose tests describe('CSOT spec prose tests', function () { @@ -595,7 +597,10 @@ describe('CSOT spec prose tests', function () { 'TODO(DRIVERS-2347): Requires this ticket to be implemented before we can assert on connection CSOT behaviour'; }); - context.skip('9. endSession', () => { + describe('9. endSession', () => { + const metadata: MongoDBMetadataUI = { + requires: { mongodb: '>=4.4', topology: ['replicaset', 'sharded'] } + }; /** * This test MUST only be run against replica sets and sharded clusters with server version 4.4 or higher. It MUST be * run three times: once with the timeout specified via the MongoClient `timeoutMS` option, once with the timeout @@ -625,12 +630,92 @@ describe('CSOT spec prose tests', function () { * 1. Using `session`, execute `session.end_session` * - Expect this to fail with a timeout error after no more than 15ms. */ + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['abortTransaction'], + blockConnection: true, + blockTimeMS: 200 + } + }; + + beforeEach(async function () { + const internalClient = this.configuration.newClient(); + // End in-progress transactions otherwise "drop" will hang + await internalClient.db('admin').command({ killAllSessions: [] }); + await internalClient + .db('endSession_db') + .collection('endSession_coll') + .drop() + .catch(() => null); + await internalClient.db('endSession_db').createCollection('endSession_coll'); + await internalClient.db('admin').command(failpoint); + await internalClient.close(); + }); + + let client: MongoClient; + + afterEach(async function () { + const internalClient = this.configuration.newClient(); + await internalClient.db('admin').command({ ...failpoint, mode: 'off' }); + await internalClient.close(); + await client?.close(); + }); + + describe('when timeoutMS is provided to the client', () => { + it('throws a timeout error from endSession', metadata, async function () { + client = this.configuration.newClient({ timeoutMS: 150, monitorCommands: true }); + const coll = client.db('endSession_db').collection('endSession_coll'); + const session = client.startSession(); + session.startTransaction(); + await coll.insertOne({ x: 1 }, { session }); + const start = performance.now(); + const error = await session.endSession().catch(error => error); + const end = performance.now(); + expect(end - start).to.be.within(100, 170); + expect(error).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('when defaultTimeoutMS is provided to startSession', () => { + it('throws a timeout error from endSession', metadata, async function () { + client = this.configuration.newClient(); + const coll = client.db('endSession_db').collection('endSession_coll'); + const session = client.startSession({ defaultTimeoutMS: 150 }); + session.startTransaction(); + await coll.insertOne({ x: 1 }, { session }); + const start = performance.now(); + const error = await session.endSession().catch(error => error); + const end = performance.now(); + expect(end - start).to.be.within(100, 170); + expect(error).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('when timeoutMS is provided to endSession', () => { + it('throws a timeout error from endSession', metadata, async function () { + client = this.configuration.newClient(); + const coll = client.db('endSession_db').collection('endSession_coll'); + const session = client.startSession(); + session.startTransaction(); + await coll.insertOne({ x: 1 }, { session }); + const start = performance.now(); + const error = await session.endSession({ timeoutMS: 150 }).catch(error => error); + const end = performance.now(); + expect(end - start).to.be.within(100, 170); + expect(error).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); }); - context.skip('10. Convenient Transactions', () => { + describe('10. Convenient Transactions', () => { /** Tests in this section MUST only run against replica sets and sharded clusters with server versions 4.4 or higher. */ + const metadata: MongoDBMetadataUI = { + requires: { topology: ['replicaset', 'sharded'], mongodb: '>=4.4' } + }; - context('timeoutMS is refreshed for abortTransaction if the callback fails', () => { + describe('when an operation fails inside withTransaction callback', () => { /** * 1. Using `internalClient`, drop the `db.coll` collection. * 1. Using `internalClient`, set the following fail point: @@ -641,7 +726,7 @@ describe('CSOT spec prose tests', function () { * data: { * failCommands: ["insert", "abortTransaction"], * blockConnection: true, - * blockTimeMS: 15 + * blockTimeMS: 200 * } * } * ``` @@ -658,6 +743,80 @@ describe('CSOT spec prose tests', function () { * 1. `command_started` and `command_failed` events for an `insert` command. * 1. `command_started` and `command_failed` events for an `abortTransaction` command. */ + + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['insert', 'abortTransaction'], + blockConnection: true, + blockTimeMS: 200 + } + }; + + beforeEach(async function () { + if (!semver.satisfies(this.configuration.version, '>=4.4')) { + this.skipReason = 'Requires server version 4.4+'; + this.skip(); + } + const internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); + await internalClient.db('admin').command(failpoint); + await internalClient.close(); + }); + + let client: MongoClient; + + afterEach(async function () { + if (semver.satisfies(this.configuration.version, '>=4.4')) { + const internalClient = this.configuration.newClient(); + await internalClient + .db('admin') + .command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + } + await client?.close(); + }); + + it('timeoutMS is refreshed for abortTransaction', metadata, async function () { + if ( + this.configuration.topologyType === 'ReplicaSetWithPrimary' && + semver.satisfies(this.configuration.version, '<=4.4') + ) { + this.skipReason = '4.4 replicaset fail point does not blockConnection for requested time'; + this.skip(); + } + + const commandsFailed = []; + const commandsStarted = []; + + client = this.configuration + .newClient({ timeoutMS: 150, monitorCommands: true }) + .on('commandStarted', e => commandsStarted.push(e.commandName)) + .on('commandFailed', e => commandsFailed.push(e.commandName)); + + const coll = client.db('db').collection('coll'); + + const session = client.startSession(); + + const withTransactionError = await session + .withTransaction(async session => { + await coll.insertOne({ x: 1 }, { session }); + }) + .catch(error => error); + + try { + expect(withTransactionError).to.be.instanceOf(MongoOperationTimeoutError); + expect(commandsStarted, 'commands started').to.deep.equal(['insert', 'abortTransaction']); + expect(commandsFailed, 'commands failed').to.deep.equal(['insert', 'abortTransaction']); + } finally { + await session.endSession(); + } + }); }); }); }); diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index e4c9eb3027..a178cecc5d 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -1,4 +1,5 @@ import { join } from 'path'; +import * as semver from 'semver'; import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; @@ -8,7 +9,10 @@ const enabled = [ 'override-database-timeoutMS', 'override-operation-timeoutMS', 'retryability-legacy-timeouts', - 'retryability-timeoutMS' + 'retryability-timeoutMS', + 'sessions-override-operation-timeoutMS', + 'sessions-override-timeoutMS', + 'sessions-inherit-timeoutMS' ]; const cursorOperations = [ @@ -43,5 +47,15 @@ describe('CSOT spec tests', function () { 'TODO(NODE-6274): update test runner to check errorResponse field of MongoBulkWriteError in isTimeoutError assertion'; } } - runUnifiedSuite(specs); + runUnifiedSuite(specs, (test, configuration) => { + const sessionCSOTTests = ['timeoutMS applied to withTransaction']; + if ( + sessionCSOTTests.includes(test.description) && + configuration.topologyType === 'ReplicaSetWithPrimary' && + semver.satisfies(configuration.version, '<=4.4') + ) { + return '4.4 replicaset fail point does not blockConnection for requested time'; + } + return false; + }); }); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index d7d4a4ede5..cc767c1d80 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -12,6 +12,7 @@ import { type FindCursor, LEGACY_HELLO_COMMAND, type MongoClient, + MongoInvalidArgumentError, MongoOperationTimeoutError, MongoServerError } from '../../mongodb'; @@ -320,4 +321,153 @@ describe('CSOT driver tests', { requires: { mongodb: '>=4.4' } }, () => { }); }); }); + + describe('when using an explicit session', () => { + const metadata: MongoDBMetadataUI = { + requires: { topology: ['replicaset'], mongodb: '>=4.4' } + }; + + describe('created for a withTransaction callback', () => { + describe('passing a timeoutMS and a session with a timeoutContext', () => { + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient({ timeoutMS: 123 }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('throws a validation error from the operation', metadata, async () => { + // Drivers MUST raise a validation error if an explicit session with a timeout is used and + // the timeoutMS option is set at the operation level for operations executed as part of a withTransaction callback. + + const coll = client.db('db').collection('coll'); + + const session = client.startSession(); + + let insertError: Error | null = null; + const withTransactionError = await session + .withTransaction(async session => { + insertError = await coll + .insertOne({ x: 1 }, { session, timeoutMS: 1234 }) + .catch(error => error); + throw insertError; + }) + .catch(error => error); + + expect(insertError).to.be.instanceOf(MongoInvalidArgumentError); + expect(withTransactionError).to.be.instanceOf(MongoInvalidArgumentError); + }); + }); + }); + + describe('created manually', () => { + describe('passing a timeoutMS and a session with an inherited timeoutMS', () => { + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient({ timeoutMS: 123 }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('does not throw a validation error', metadata, async () => { + const coll = client.db('db').collection('coll'); + const session = client.startSession(); + session.startTransaction(); + await coll.insertOne({ x: 1 }, { session, timeoutMS: 1234 }); + await session.abortTransaction(); // this uses the inherited timeoutMS, not the insert + }); + }); + }); + }); + + describe('Convenient Transactions', () => { + /** Tests in this section MUST only run against replica sets and sharded clusters with server versions 4.4 or higher. */ + const metadata: MongoDBMetadataUI = { + requires: { topology: ['replicaset', 'sharded'], mongodb: '>=5.0' } + }; + + describe('when an operation fails inside withTransaction callback', () => { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['insert', 'abortTransaction'], + blockConnection: true, + blockTimeMS: 600 + } + }; + + beforeEach(async function () { + if (!semver.satisfies(this.configuration.version, '>=4.4')) { + this.skipReason = 'Requires server version 4.4+'; + this.skip(); + } + const internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); + await internalClient.db('admin').command(failpoint); + await internalClient.close(); + }); + + let client: MongoClient; + + afterEach(async function () { + if (semver.satisfies(this.configuration.version, '>=4.4')) { + const internalClient = this.configuration.newClient(); + await internalClient + .db('admin') + .command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + } + await client?.close(); + }); + + it( + 'timeoutMS is refreshed for abortTransaction and the timeout error is thrown from the operation', + metadata, + async function () { + const commandsFailed = []; + const commandsStarted = []; + + client = this.configuration + .newClient({ timeoutMS: 500, monitorCommands: true }) + .on('commandStarted', e => commandsStarted.push(e.commandName)) + .on('commandFailed', e => commandsFailed.push(e.commandName)); + + const coll = client.db('db').collection('coll'); + + const session = client.startSession(); + + let insertError: Error | null = null; + const withTransactionError = await session + .withTransaction(async session => { + insertError = await coll.insertOne({ x: 1 }, { session }).catch(error => error); + throw insertError; + }) + .catch(error => error); + + try { + expect(insertError).to.be.instanceOf(MongoOperationTimeoutError); + expect(withTransactionError).to.be.instanceOf(MongoOperationTimeoutError); + expect(commandsStarted, 'commands started').to.deep.equal([ + 'insert', + 'abortTransaction' + ]); + expect(commandsFailed, 'commands failed').to.deep.equal(['insert', 'abortTransaction']); + } finally { + await session.endSession(); + } + } + ); + }); + }); }); diff --git a/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.json b/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.json index abbc321732..13ea91c794 100644 --- a/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.json +++ b/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.json @@ -21,7 +21,7 @@ "client": { "id": "client", "uriOptions": { - "timeoutMS": 50 + "timeoutMS": 500 }, "useMultipleMongoses": false, "observeEvents": [ @@ -78,7 +78,7 @@ "commitTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -165,7 +165,7 @@ "abortTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -249,7 +249,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -302,6 +302,26 @@ "commandFailedEvent": { "commandName": "insert" } + }, + { + "commandStartedEvent": { + "commandName": "abortTransaction", + "databaseName": "admin", + "command": { + "abortTransaction": 1, + "maxTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "abortTransaction" + } } ] } diff --git a/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.yml b/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.yml index 184ef7eb9e..c79384e5f0 100644 --- a/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.yml +++ b/test/spec/client-side-operations-timeout/sessions-inherit-timeoutMS.yml @@ -13,7 +13,7 @@ createEntities: - client: id: &client client uriOptions: - timeoutMS: 50 + timeoutMS: 500 useMultipleMongoses: false observeEvents: - commandStartedEvent @@ -52,7 +52,7 @@ tests: data: failCommands: ["commitTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -95,7 +95,7 @@ tests: data: failCommands: ["abortTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -136,7 +136,7 @@ tests: data: failCommands: ["insert"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: withTransaction object: *session arguments: @@ -153,9 +153,6 @@ tests: expectEvents: - client: *client events: - # Because the insert expects an error and gets an error, it technically succeeds, so withTransaction will - # try to run commitTransaction. This will fail client-side, though, because the timeout has already expired, - # so no command is sent. - commandStartedEvent: commandName: insert databaseName: *databaseName @@ -166,3 +163,11 @@ tests: maxTimeMS: { $$type: ["int", "long"] } - commandFailedEvent: commandName: insert + - commandStartedEvent: + commandName: abortTransaction + databaseName: admin + command: + abortTransaction: 1 + maxTimeMS: { $$type: [ "int", "long" ] } + - commandFailedEvent: + commandName: abortTransaction diff --git a/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.json b/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.json index 0254b184a1..441c698328 100644 --- a/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.json +++ b/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.json @@ -75,7 +75,7 @@ "commitTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -98,7 +98,7 @@ "name": "commitTransaction", "object": "session", "arguments": { - "timeoutMS": 50 + "timeoutMS": 500 }, "expectError": { "isTimeoutError": true @@ -165,7 +165,7 @@ "abortTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -188,7 +188,7 @@ "name": "abortTransaction", "object": "session", "arguments": { - "timeoutMS": 50 + "timeoutMS": 500 } } ], @@ -252,7 +252,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -261,7 +261,7 @@ "name": "withTransaction", "object": "session", "arguments": { - "timeoutMS": 50, + "timeoutMS": 500, "callback": [ { "name": "insertOne", @@ -306,6 +306,26 @@ "commandFailedEvent": { "commandName": "insert" } + }, + { + "commandStartedEvent": { + "commandName": "abortTransaction", + "databaseName": "admin", + "command": { + "abortTransaction": 1, + "maxTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "abortTransaction" + } } ] } diff --git a/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.yml b/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.yml index 8a80a65720..bee91dc4cb 100644 --- a/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.yml +++ b/test/spec/client-side-operations-timeout/sessions-override-operation-timeoutMS.yml @@ -50,7 +50,7 @@ tests: data: failCommands: ["commitTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -61,7 +61,7 @@ tests: - name: commitTransaction object: *session arguments: - timeoutMS: 50 + timeoutMS: 500 expectError: isTimeoutError: true expectEvents: @@ -95,7 +95,7 @@ tests: data: failCommands: ["abortTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -106,7 +106,7 @@ tests: - name: abortTransaction object: *session arguments: - timeoutMS: 50 + timeoutMS: 500 expectEvents: - client: *client events: @@ -138,11 +138,11 @@ tests: data: failCommands: ["insert"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: withTransaction object: *session arguments: - timeoutMS: 50 + timeoutMS: 500 callback: - name: insertOne object: *collection @@ -156,9 +156,6 @@ tests: expectEvents: - client: *client events: - # Because the insert expects an error and gets an error, it technically succeeds, so withTransaction will - # try to run commitTransaction. This will fail client-side, though, because the timeout has already expired, - # so no command is sent. - commandStartedEvent: commandName: insert databaseName: *databaseName @@ -169,3 +166,11 @@ tests: maxTimeMS: { $$type: ["int", "long"] } - commandFailedEvent: commandName: insert + - commandStartedEvent: + commandName: abortTransaction + databaseName: admin + command: + abortTransaction: 1 + maxTimeMS: { $$type: ["int", "long"] } + - commandFailedEvent: + commandName: abortTransaction diff --git a/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.json b/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.json index c46ae4dd50..d90152e909 100644 --- a/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.json +++ b/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.json @@ -47,7 +47,7 @@ "id": "session", "client": "client", "sessionOptions": { - "defaultTimeoutMS": 50 + "defaultTimeoutMS": 500 } } } @@ -78,7 +78,7 @@ "commitTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -165,7 +165,7 @@ "abortTransaction" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -249,7 +249,7 @@ "insert" ], "blockConnection": true, - "blockTimeMS": 60 + "blockTimeMS": 600 } } } @@ -302,6 +302,26 @@ "commandFailedEvent": { "commandName": "insert" } + }, + { + "commandStartedEvent": { + "commandName": "abortTransaction", + "databaseName": "admin", + "command": { + "abortTransaction": 1, + "maxTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + } + }, + { + "commandFailedEvent": { + "commandName": "abortTransaction" + } } ] } diff --git a/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.yml b/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.yml index 61aaab4d97..73aaf9ff2a 100644 --- a/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.yml +++ b/test/spec/client-side-operations-timeout/sessions-override-timeoutMS.yml @@ -29,7 +29,7 @@ createEntities: id: &session session client: *client sessionOptions: - defaultTimeoutMS: 50 + defaultTimeoutMS: 500 initialData: - collectionName: *collectionName @@ -52,7 +52,7 @@ tests: data: failCommands: ["commitTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -95,7 +95,7 @@ tests: data: failCommands: ["abortTransaction"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: startTransaction object: *session - name: insertOne @@ -136,7 +136,7 @@ tests: data: failCommands: ["insert"] blockConnection: true - blockTimeMS: 60 + blockTimeMS: 600 - name: withTransaction object: *session arguments: @@ -153,9 +153,6 @@ tests: expectEvents: - client: *client events: - # Because the insert expects an error and gets an error, it technically succeeds, so withTransaction will - # try to run commitTransaction. This will fail client-side, though, because the timeout has already expired, - # so no command is sent. - commandStartedEvent: commandName: insert databaseName: *databaseName @@ -166,3 +163,11 @@ tests: maxTimeMS: { $$type: ["int", "long"] } - commandFailedEvent: commandName: insert + - commandStartedEvent: + commandName: abortTransaction + databaseName: admin + command: + abortTransaction: 1 + maxTimeMS: { $$type: [ "int", "long" ] } + - commandFailedEvent: + commandName: abortTransaction diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index 5b7a606280..54247f35c7 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -619,6 +619,10 @@ export class EntitiesMap extends Map { const options = Object.create(null); + if (entity.session.sessionOptions?.defaultTimeoutMS != null) { + options.defaultTimeoutMS = entity.session.sessionOptions?.defaultTimeoutMS; + } + if (entity.session.sessionOptions?.causalConsistency) { options.causalConsistency = entity.session.sessionOptions?.causalConsistency; } diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index 5c4ea000de..8cdcc765fc 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -501,6 +501,13 @@ function compareCommandFailedEvents( } } +function expectInstanceOf any>( + instance: any, + ctor: T +): asserts instance is InstanceType { + expect(instance).to.be.instanceOf(ctor); +} + function compareEvents( actual: CommandEvent[] | CmapEvent[] | SdamEvent[], expected: (ExpectedCommandEvent & ExpectedCmapEvent & ExpectedSdamEvent)[], @@ -515,9 +522,7 @@ function compareEvents( if (expectedEvent.commandStartedEvent) { const path = `${rootPrefix}.commandStartedEvent`; - if (!(actualEvent instanceof CommandStartedEvent)) { - expect.fail(`expected ${path} to be instanceof CommandStartedEvent`); - } + expectInstanceOf(actualEvent, CommandStartedEvent); compareCommandStartedEvents(actualEvent, expectedEvent.commandStartedEvent, entities, path); if (expectedEvent.commandStartedEvent.hasServerConnectionId) { expect(actualEvent).property('serverConnectionId').to.be.a('bigint'); @@ -526,9 +531,7 @@ function compareEvents( } } else if (expectedEvent.commandSucceededEvent) { const path = `${rootPrefix}.commandSucceededEvent`; - if (!(actualEvent instanceof CommandSucceededEvent)) { - expect.fail(`expected ${path} to be instanceof CommandSucceededEvent`); - } + expectInstanceOf(actualEvent, CommandSucceededEvent); compareCommandSucceededEvents( actualEvent, expectedEvent.commandSucceededEvent, @@ -542,9 +545,7 @@ function compareEvents( } } else if (expectedEvent.commandFailedEvent) { const path = `${rootPrefix}.commandFailedEvent`; - if (!(actualEvent instanceof CommandFailedEvent)) { - expect.fail(`expected ${path} to be instanceof CommandFailedEvent`); - } + expectInstanceOf(actualEvent, CommandFailedEvent); compareCommandFailedEvents(actualEvent, expectedEvent.commandFailedEvent, entities, path); if (expectedEvent.commandFailedEvent.hasServerConnectionId) { expect(actualEvent).property('serverConnectionId').to.be.a('bigint'); diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 0d7fc18970..d43f541aae 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -19,6 +19,7 @@ import { ServerType, type TopologyDescription, type TopologyType, + type TransactionOptions, WriteConcern } from '../../mongodb'; import { sleep } from '../../tools/utils'; @@ -49,11 +50,6 @@ operations.set('createEntities', async ({ entities, operation, testConfig }) => await EntitiesMap.createEntities(testConfig, null, operation.arguments.entities!, entities); }); -operations.set('abortTransaction', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.object); - return session.abortTransaction(); -}); - operations.set('aggregate', async ({ entities, operation }) => { const dbOrCollection = entities.get(operation.object) as Db | Collection; if (!(dbOrCollection instanceof Db || dbOrCollection instanceof Collection)) { @@ -231,7 +227,12 @@ operations.set('close', async ({ entities, operation }) => { operations.set('commitTransaction', async ({ entities, operation }) => { const session = entities.getEntity('session', operation.object); - return session.commitTransaction(); + return await session.commitTransaction({ timeoutMS: operation.arguments?.timeoutMS }); +}); + +operations.set('abortTransaction', async ({ entities, operation }) => { + const session = entities.getEntity('session', operation.object); + return await session.abortTransaction({ timeoutMS: operation.arguments?.timeoutMS }); }); operations.set('createChangeStream', async ({ entities, operation }) => { @@ -361,7 +362,7 @@ operations.set('insertOne', async ({ entities, operation }) => { // Looping exposes the fact that we can generate _ids for inserted // documents and we don't want the original operation to get modified // and use the same _id for each insert. - return collection.insertOne({ ...document }, opts); + return await collection.insertOne({ ...document }, opts); }); operations.set('insertMany', async ({ entities, operation }) => { @@ -708,13 +709,17 @@ operations.set('waitForThread', async ({ entities, operation }) => { operations.set('withTransaction', async ({ entities, operation, client, testConfig }) => { const session = entities.getEntity('session', operation.object); - const options = { + const options: TransactionOptions = { readConcern: ReadConcern.fromOptions(operation.arguments), writeConcern: WriteConcern.fromOptions(operation.arguments), readPreference: ReadPreference.fromOptions(operation.arguments), - maxCommitTimeMS: operation.arguments!.maxCommitTimeMS + maxCommitTimeMS: operation.arguments?.maxCommitTimeMS }; + if (typeof operation.arguments?.timeoutMS === 'number') { + options.timeoutMS = operation.arguments.timeoutMS; + } + await session.withTransaction(async () => { for (const callbackOperation of operation.arguments!.callback) { await executeOperationAndCheck(callbackOperation, entities, client, testConfig, true); @@ -935,7 +940,7 @@ export async function executeOperationAndCheck( rethrow = false ): Promise { const opFunc = operations.get(operation.name); - expect(opFunc, `Unknown operation: ${operation.name}`).to.exist; + if (opFunc == null) expect.fail(`Unknown operation: ${operation.name}`); if (operation.arguments && operation.arguments.session) { // The session could need to be either pulled from the entity map or in the case where @@ -949,7 +954,7 @@ export async function executeOperationAndCheck( let result; try { - result = await opFunc!({ entities, operation, client, testConfig }); + result = await opFunc({ entities, operation, client, testConfig }); } catch (error) { if (operation.expectError) { expectErrorCheck(error, operation.expectError, entities);