diff --git a/src/admin.ts b/src/admin.ts index a71ac4be1dc..e030384eafc 100644 --- a/src/admin.ts +++ b/src/admin.ts @@ -78,7 +78,8 @@ export class Admin { new RunAdminCommandOperation(command, { ...resolveBSONOptions(options), session: options?.session, - readPreference: options?.readPreference + readPreference: options?.readPreference, + timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS }) ); } diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index c6420d8306e..115da60db29 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { ServerType } from '../sdam/common'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; +import { type Timeout } from '../timeout'; import { BufferPool, calculateDurationInMs, @@ -92,6 +93,9 @@ export interface CommandOptions extends BSONSerializeOptions { writeConcern?: WriteConcern; directConnection?: boolean; + + /** @internal */ + timeout?: Timeout; } /** @public */ diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index f91e1361f65..83d36adaf6b 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -21,13 +21,14 @@ import { MongoInvalidArgumentError, MongoMissingCredentialsError, MongoNetworkError, + MongoOperationTimeoutError, MongoRuntimeError, MongoServerError } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; import { Timeout, TimeoutError } from '../timeout'; -import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; +import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit void; reject: (err: AnyError) => void; - timeout: Timeout; [kCancelled]?: boolean; } @@ -354,35 +354,57 @@ export class ConnectionPool extends TypedEventEmitter { * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or * explicitly destroyed by the new owner. */ - async checkOut(): Promise { + async checkOut(options?: { timeout?: Timeout }): Promise { this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) ); const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; + const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS; const { promise, resolve, reject } = promiseWithResolvers(); - const timeout = Timeout.expires(waitQueueTimeoutMS); + let timeout: Timeout | null = null; + if (options?.timeout) { + // CSOT enabled + // Determine if we're using the timeout passed in or a new timeout + if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { + // This check determines whether or not Topology.selectServer used the configured + // `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout + if ( + options.timeout.duration === serverSelectionTimeoutMS || + csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS + ) { + // server selection used `timeoutMS`, so we should use the existing timeout as the timeout + // here + timeout = options.timeout; + } else { + // server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with + // the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut + // cumulatively don't spend more than `serverSelectionTimeoutMS` blocking + timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed); + } + } + } else { + timeout = Timeout.expires(waitQueueTimeoutMS); + } const waitQueueMember: WaitQueueMember = { resolve, - reject, - timeout + reject }; this[kWaitQueue].push(waitQueueMember); process.nextTick(() => this.processWaitQueue()); try { - return await Promise.race([promise, waitQueueMember.timeout]); + timeout?.throwIfExpired(); + return await (timeout ? Promise.race([promise, timeout]) : promise); } catch (error) { if (TimeoutError.is(error)) { waitQueueMember[kCancelled] = true; - waitQueueMember.timeout.clear(); - this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, 'timeout') @@ -393,9 +415,16 @@ export class ConnectionPool extends TypedEventEmitter { : 'Timed out while checking out a connection from connection pool', this.address ); + if (options?.timeout) { + throw new MongoOperationTimeoutError('Timed out during connection checkout', { + cause: timeoutError + }); + } throw timeoutError; } throw error; + } finally { + if (timeout !== options?.timeout) timeout?.clear(); } } @@ -761,7 +790,6 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, reason, error) ); - waitQueueMember.timeout.clear(); this[kWaitQueue].shift(); waitQueueMember.reject(error); continue; @@ -782,7 +810,6 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECKED_OUT, new ConnectionCheckedOutEvent(this, connection) ); - waitQueueMember.timeout.clear(); this[kWaitQueue].shift(); waitQueueMember.resolve(connection); @@ -820,8 +847,6 @@ export class ConnectionPool extends TypedEventEmitter { ); waitQueueMember.resolve(connection); } - - waitQueueMember.timeout.clear(); } process.nextTick(() => this.processWaitQueue()); }); diff --git a/src/collection.ts b/src/collection.ts index 459ecd34226..28bee8b0a2a 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -254,6 +254,11 @@ export class Collection { this.s.collectionHint = normalizeHintField(v); } + /** @internal */ + get timeoutMS(): number | undefined { + return this.s.options.timeoutMS; + } + /** * Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field, * one will be added to each of the documents missing it by the driver, mutating the document. This behavior diff --git a/src/db.ts b/src/db.ts index 0dcd24dbb1d..538c8d2c9b4 100644 --- a/src/db.ts +++ b/src/db.ts @@ -222,6 +222,11 @@ export class Db { return this.s.namespace.toString(); } + /** @internal */ + get timeoutMS(): number | undefined { + return this.s.options?.timeoutMS; + } + /** * Create a new collection on a server with the specified options. Use this to create capped collections. * More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/ @@ -272,6 +277,7 @@ export class Db { this.client, new RunCommandOperation(this, command, { ...resolveBSONOptions(options), + timeoutMS: options?.timeoutMS, session: options?.session, readPreference: options?.readPreference }) diff --git a/src/error.ts b/src/error.ts index 294062e3d1c..e26a96f40df 100644 --- a/src/error.ts +++ b/src/error.ts @@ -759,6 +759,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError { } } +/** + * @internal + */ +export class MongoOperationTimeoutError extends MongoRuntimeError { + override get name(): string { + return 'MongoOperationTimeoutError'; + } +} + /** * An error thrown when the user attempts to add options to a cursor that has already been * initialized diff --git a/src/index.ts b/src/index.ts index daeae592d21..5c27aacd732 100644 --- a/src/index.ts +++ b/src/index.ts @@ -63,6 +63,7 @@ export { MongoNetworkTimeoutError, MongoNotConnectedError, MongoOIDCError, + MongoOperationTimeoutError, MongoParseError, MongoRuntimeError, MongoServerClosedError, diff --git a/src/operations/command.ts b/src/operations/command.ts index fbcafe3f8b5..156e0024762 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -64,6 +64,7 @@ export interface OperationParent { writeConcern?: WriteConcern; readPreference?: ReadPreference; bsonOptions?: BSONSerializeOptions; + timeoutMS?: number; } /** @internal */ @@ -117,6 +118,7 @@ export abstract class CommandOperation extends AbstractOperation { const options = { ...this.options, ...this.bsonOptions, + timeout: this.timeout, readPreference: this.readPreference, session }; diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 4faf4fd95ad..46a94e56ec2 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -27,6 +27,7 @@ import { } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; +import { Timeout } from '../timeout'; import { squashError, supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; @@ -152,9 +153,13 @@ export async function executeOperation< selector = readPreference; } + const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined; + operation.timeout = timeout; + const server = await topology.selectServer(selector, { session, - operationName: operation.commandName + operationName: operation.commandName, + timeout }); if (session == null) { @@ -265,6 +270,7 @@ async function retryOperation< // select a new server, and attempt to retry the operation const server = await topology.selectServer(selector, { session, + timeout: operation.timeout, operationName: operation.commandName, previousServer }); diff --git a/src/operations/find.ts b/src/operations/find.ts index cdf1a711767..c5cf0da140c 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -112,7 +112,8 @@ export class FindOperation extends CommandOperation { ...this.options, ...this.bsonOptions, documentsReturnedIn: 'firstBatch', - session + session, + timeout: this.timeout }, undefined ); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index e71baa44a9e..114980893af 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '.. import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; +import { type Timeout } from '../timeout'; import type { MongoDBNamespace } from '../utils'; export const Aspect = { @@ -61,6 +62,11 @@ export abstract class AbstractOperation { options: OperationOptions; + /** @internal */ + timeout?: Timeout; + /** @internal */ + timeoutMS?: number; + [kSession]: ClientSession | undefined; constructor(options: OperationOptions = {}) { @@ -76,6 +82,8 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; this.trySecondaryWrite = false; + + this.timeoutMS = options.timeoutMS; } /** Must match the first key of the command object sent to the server. diff --git a/src/operations/run_command.ts b/src/operations/run_command.ts index 9042407b3e2..79198d9be7f 100644 --- a/src/operations/run_command.ts +++ b/src/operations/run_command.ts @@ -13,6 +13,8 @@ export type RunCommandOptions = { session?: ClientSession; /** The read preference */ readPreference?: ReadPreferenceLike; + /** @internal */ + timeoutMS?: number; } & BSONSerializeOptions; /** @internal */ @@ -31,6 +33,7 @@ export class RunCommandOperation extends AbstractOperation { const res: TODO_NODE_3286 = await server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, + timeout: this.timeout, session }); return res; @@ -58,7 +61,8 @@ export class RunAdminCommandOperation extends AbstractOperation const res: TODO_NODE_3286 = await server.command(this.ns, this.command, { ...this.options, readPreference: this.readPreference, - session + session, + timeout: this.timeout }); return res; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 8ea91815c60..f7e40910d17 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -310,7 +310,7 @@ export class Server extends TypedEventEmitter { this.incrementOperationCount(); if (conn == null) { try { - conn = await this.pool.checkOut(); + conn = await this.pool.checkOut(options); if (this.loadBalanced && isPinnableCommand(cmd, session)) { session?.pin(conn); } @@ -333,6 +333,7 @@ export class Server extends TypedEventEmitter { operationError.code === MONGODB_ERROR_CODES.Reauthenticate ) { await this.pool.reauthenticate(conn); + // TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer try { return await conn.command(ns, cmd, finalOptions, responseType); } catch (commandError) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 73b0e92a09a..4c9d71d807d 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -24,6 +24,7 @@ import { type MongoDriverError, MongoError, MongoErrorLabel, + MongoOperationTimeoutError, MongoRuntimeError, MongoServerSelectionError, MongoTopologyClosedError @@ -37,6 +38,7 @@ import { Timeout, TimeoutError } from '../timeout'; import type { Transaction } from '../transactions'; import { type Callback, + csotMin, type EventEmitterWithState, HostAddress, List, @@ -107,7 +109,6 @@ export interface ServerSelectionRequest { resolve: (server: Server) => void; reject: (error: MongoError) => void; [kCancelled]?: boolean; - timeout: Timeout; operationName: string; waitingLogged: boolean; previousServer?: ServerDescription; @@ -457,8 +458,14 @@ export class Topology extends TypedEventEmitter { } } + const timeoutMS = this.client.options.timeoutMS; + const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined; const readPreference = options.readPreference ?? ReadPreference.primary; - const selectServerOptions = { operationName: 'ping', ...options }; + const selectServerOptions = { + operationName: 'ping', + timeout, + ...options + }; try { const server = await this.selectServer( readPreferenceServerSelector(readPreference), @@ -467,7 +474,7 @@ export class Topology extends TypedEventEmitter { const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { - await server.command(ns('admin.$cmd'), { ping: 1 }, {}); + await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout }); stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); @@ -556,6 +563,25 @@ export class Topology extends TypedEventEmitter { new ServerSelectionStartedEvent(selector, this.description, options.operationName) ); } + const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0; + let timeout: Timeout | null; + if (options.timeout) { + // CSOT Enabled + if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) { + if ( + options.timeout.duration === serverSelectionTimeoutMS || + csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS + ) { + timeout = options.timeout; + } else { + timeout = Timeout.expires(serverSelectionTimeoutMS); + } + } else { + timeout = null; + } + } else { + timeout = Timeout.expires(serverSelectionTimeoutMS); + } const isSharded = this.description.type === TopologyType.Sharded; const session = options.session; @@ -578,11 +604,12 @@ export class Topology extends TypedEventEmitter { ) ); } + if (timeout !== options.timeout) timeout?.clear(); return transaction.server; } const { promise: serverPromise, resolve, reject } = promiseWithResolvers(); - const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); + const waitQueueMember: ServerSelectionRequest = { serverSelector, topologyDescription: this.description, @@ -590,7 +617,6 @@ export class Topology extends TypedEventEmitter { transaction, resolve, reject, - timeout, startTime: now(), operationName: options.operationName, waitingLogged: false, @@ -601,14 +627,14 @@ export class Topology extends TypedEventEmitter { processWaitQueue(this); try { - return await Promise.race([serverPromise, waitQueueMember.timeout]); + timeout?.throwIfExpired(); + return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise); } catch (error) { if (TimeoutError.is(error)) { // Timeout waitQueueMember[kCancelled] = true; - timeout.clear(); const timeoutError = new MongoServerSelectionError( - `Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, + `Server selection timed out after ${timeout?.duration} ms`, this.description ); if ( @@ -628,10 +654,17 @@ export class Topology extends TypedEventEmitter { ); } + if (options.timeout) { + throw new MongoOperationTimeoutError('Timed out during server selection', { + cause: timeoutError + }); + } throw timeoutError; } // Other server selection error throw error; + } finally { + if (timeout !== options.timeout) timeout?.clear(); } } /** @@ -889,8 +922,6 @@ function drainWaitQueue(queue: List, drainError: MongoDr continue; } - waitQueueMember.timeout.clear(); - if (!waitQueueMember[kCancelled]) { if ( waitQueueMember.mongoLogger?.willLog( @@ -944,7 +975,6 @@ function processWaitQueue(topology: Topology) { ) : serverDescriptions; } catch (selectorError) { - waitQueueMember.timeout.clear(); if ( topology.client.mongoLogger?.willLog( MongoLoggableComponent.SERVER_SELECTION, @@ -1032,8 +1062,6 @@ function processWaitQueue(topology: Topology) { transaction.pinServer(selectedServer); } - waitQueueMember.timeout.clear(); - if ( topology.client.mongoLogger?.willLog( MongoLoggableComponent.SERVER_SELECTION, diff --git a/src/timeout.ts b/src/timeout.ts index 7d0af2383d3..02b87394e7d 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -40,6 +40,16 @@ export class Timeout extends Promise { public duration: number; public timedOut = false; + get remainingTime(): number { + if (this.timedOut) return 0; + if (this.duration === 0) return Infinity; + return this.start + this.duration - Math.trunc(performance.now()); + } + + get timeElapsed(): number { + return Math.trunc(performance.now()) - this.start; + } + /** Create a new timeout that expires in `duration` ms */ private constructor(executor: Executor = () => null, duration: number, unref = false) { let reject!: Reject; @@ -78,6 +88,10 @@ export class Timeout extends Promise { this.id = undefined; } + throwIfExpired(): void { + if (this.timedOut) throw new TimeoutError('Timed out'); + } + public static expires(durationMS: number, unref?: boolean): Timeout { return new Timeout(undefined, durationMS, unref); } diff --git a/src/utils.ts b/src/utils.ts index 2ede778258d..159448edb64 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -528,6 +528,10 @@ export function resolveOptions( result.readPreference = readPreference; } + const timeoutMS = options?.timeoutMS; + + result.timeoutMS = timeoutMS ?? parent?.timeoutMS; + return result; } @@ -1363,6 +1367,12 @@ export async function fileIsAccessible(fileName: string, mode?: number) { } } +export function csotMin(duration1: number, duration2: number): number { + if (duration1 === 0) return duration2; + if (duration2 === 0) return duration1; + return Math.min(duration1, duration2); +} + export function noop() { return; } diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 1ed88f34d86..903ea9c3bb4 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -1,8 +1,30 @@ /* Specification prose tests */ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + MongoClient, + MongoOperationTimeoutError, + MongoServerSelectionError, + now +} from '../../mongodb'; + // TODO(NODE-5824): Implement CSOT prose tests -describe.skip('CSOT spec prose tests', () => { - context('1. Multi-batch writes', () => { +describe('CSOT spec prose tests', function () { + let internalClient: MongoClient; + let client: MongoClient; + + beforeEach(async function () { + internalClient = this.configuration.newClient(); + }); + + afterEach(async function () { + await internalClient?.close(); + await client?.close(); + }); + + context.skip('1. Multi-batch writes', () => { /** * This test MUST only run against standalones on server versions 4.4 and higher. * The `insertMany` call takes an exceedingly long time on replicasets and sharded @@ -31,7 +53,7 @@ describe.skip('CSOT spec prose tests', () => { */ }); - context('2. maxTimeMS is not set for commands sent to mongocryptd', () => { + context.skip('2. maxTimeMS is not set for commands sent to mongocryptd', () => { /** * This test MUST only be run against enterprise server versions 4.2 and higher. * @@ -42,7 +64,7 @@ describe.skip('CSOT spec prose tests', () => { */ }); - context('3. ClientEncryption', () => { + context.skip('3. ClientEncryption', () => { /** * Each test under this category MUST only be run against server versions 4.4 and higher. In these tests, * `LOCAL_MASTERKEY` refers to the following base64: @@ -132,7 +154,7 @@ describe.skip('CSOT spec prose tests', () => { }); }); - context('4. Background Connection Pooling', () => { + context.skip('4. Background Connection Pooling', () => { /** * The tests in this section MUST only be run if the server version is 4.4 or higher and the URI has authentication * fields (i.e. a username and password). Each test in this section requires drivers to create a MongoClient and then wait @@ -192,7 +214,7 @@ describe.skip('CSOT spec prose tests', () => { }); }); - context('5. Blocking Iteration Methods', () => { + context.skip('5. Blocking Iteration Methods', () => { /** * Tests in this section MUST only be run against server versions 4.4 and higher and only apply to drivers that have a * blocking method for cursor iteration that executes `getMore` commands in a loop until a document is available or an @@ -251,7 +273,7 @@ describe.skip('CSOT spec prose tests', () => { }); }); - context('6. GridFS - Upload', () => { + context.skip('6. GridFS - Upload', () => { /** Tests in this section MUST only be run against server versions 4.4 and higher. */ context('uploads via openUploadStream can be timed out', () => { @@ -306,7 +328,7 @@ describe.skip('CSOT spec prose tests', () => { }); }); - context('7. GridFS - Download', () => { + context.skip('7. GridFS - Download', () => { /** * This test MUST only be run against server versions 4.4 and higher. * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. @@ -351,96 +373,225 @@ describe.skip('CSOT spec prose tests', () => { }); context('8. Server Selection', () => { - context('serverSelectionTimeoutMS honored if timeoutMS is not set', () => { - /** - * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?serverSelectionTimeoutMS=10`. - * 1. Using `client`, execute the command `{ ping: 1 }` against the `admin` database. - * - Expect this to fail with a server selection timeout error after no more than 15ms. - */ - }); + context('using sinon timer', function () { + let clock: sinon.SinonFakeTimers; + + beforeEach(function () { + clock = sinon.useFakeTimers(); + }); + + afterEach(function () { + clock.restore(); + }); - context( - "timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", - () => { + it('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () { /** - * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20`. - * 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database. + * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?serverSelectionTimeoutMS=10`. + * 1. Using `client`, execute the command `{ ping: 1 }` against the `admin` database. * - Expect this to fail with a server selection timeout error after no more than 15ms. */ - } - ); - context( - "serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", - () => { - /** - * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10`. - * 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database. - * - Expect this to fail with a server selection timeout error after no more than 15ms. + /** NOTE: This is the original implementation of this test, but it was flaky, so was + * replaced by the current implementation using sinon fake timers + * ```ts + * client = new MongoClient('mongodb://invalid/?serverSelectionTimeoutMS=10'); + * const admin = client.db('test').admin(); + * const start = performance.now(); + * const maybeError = await admin.ping().then( + * () => null, + * e => e + * ); + * const end = performance.now(); + * + * expect(maybeError).to.be.instanceof(MongoServerSelectionError); + * expect(end - start).to.be.lte(15) + * ``` */ - } - ); + client = new MongoClient('mongodb://invalid/?serverSelectionTimeoutMS=10'); + const admin = client.db('test').admin(); + const maybeError = admin.ping().then( + () => null, + e => e + ); + + await clock.tickAsync(11); + expect(await maybeError).to.be.instanceof(MongoServerSelectionError); + }); + }); + + it("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () { + /** + * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20`. + * 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database. + * - Expect this to fail with a server selection timeout error after no more than 15ms. + */ + client = new MongoClient('mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20'); + const start = now(); + + const maybeError = await client + .db('test') + .admin() + .ping() + .then( + () => null, + e => e + ); + const end = now(); + + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + expect(end - start).to.be.lte(15); + }); + + it("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () { + /** + * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10`. + * 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database. + * - Expect this to fail with a server selection timeout error after no more than 15ms. + */ + client = new MongoClient('mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10'); + const start = now(); + const maybeError = await client + .db('test') + .admin() + .ping() + .then( + () => null, + e => e + ); + const end = now(); + + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + expect(end - start).to.be.lte(15); + }); - context('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', () => { + it('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () { /** * 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10`. * 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database. * - Expect this to fail with a server selection timeout error after no more than 15ms. */ + client = new MongoClient('mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10'); + const start = now(); + const maybeError = await client + .db('test') + .admin() + .ping() + .then( + () => null, + e => e + ); + const end = now(); + + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + expect(end - start).to.be.lte(15); }); - context( - "timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", - () => { - /** - * This test MUST only be run if the server version is 4.4 or higher and the URI has authentication fields (i.e. a - * username and password). - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: failCommand, - * mode: { times: 1 }, - * data: { - * failCommands: ["saslContinue"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=10` and `serverSelectionTimeoutMS=20`. - * 1. Using `client`, insert the document `{ x: 1 }` into collection `db.coll`. - * - Expect this to fail with a timeout error after no more than 15ms. - */ - } - ); + it.skip("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", async function () { + /** + * This test MUST only be run if the server version is 4.4 or higher and the URI has authentication fields (i.e. a + * username and password). + * 1. Using `internalClient`, set the following fail point: + * ```js + * { + * configureFailPoint: failCommand, + * mode: { times: 1 }, + * data: { + * failCommands: ["saslContinue"], + * blockConnection: true, + * blockTimeMS: 15 + * } + * } + * ``` + * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=10` and `serverSelectionTimeoutMS=20`. + * 1. Using `client`, insert the document `{ x: 1 }` into collection `db.coll`. + * - Expect this to fail with a timeout error after no more than 15ms. + */ + await internalClient + .db('db') + .admin() + .command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['saslContinue'], + blockConnection: true, + blockTimeMS: 15 + } + }); - context( - "serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", - () => { - /** - * This test MUST only be run if the server version is 4.4 or higher and the URI has authentication fields (i.e. a - * username and password). - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: failCommand, - * mode: { times: 1 }, - * data: { - * failCommands: ["saslContinue"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=20` and `serverSelectionTimeoutMS=10`. - * 1. Using `client`, insert the document `{ x: 1 }` into collection `db.coll`. - * - Expect this to fail with a timeout error after no more than 15ms. - */ - } - ); + client = this.configuration.newClient({ + serverSelectionTimeoutMS: 20, + timeoutMS: 10 + }); + const start = now(); + const maybeError = await client + .db('db') + .collection('coll') + .insertOne({ x: 1 }) + .then( + () => null, + e => e + ); + const end = now(); + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + expect(end - start).to.be.lte(15); + }).skipReason = + 'TODO(DRIVERS-2347): Requires this ticket to be implemented before we can assert on connection CSOT behaviour'; + + it.skip("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", async function () { + /** + * This test MUST only be run if the server version is 4.4 or higher and the URI has authentication fields (i.e. a + * username and password). + * 1. Using `internalClient`, set the following fail point: + * ```js + * { + * configureFailPoint: failCommand, + * mode: { times: 1 }, + * data: { + * failCommands: ["saslContinue"], + * blockConnection: true, + * blockTimeMS: 15 + * } + * } + * ``` + * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=20` and `serverSelectionTimeoutMS=10`. + * 1. Using `client`, insert the document `{ x: 1 }` into collection `db.coll`. + * - Expect this to fail with a timeout error after no more than 15ms. + */ + await internalClient + .db('db') + .admin() + .command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['saslContinue'], + blockConnection: true, + blockTimeMS: 15 + } + }); + + client = this.configuration.newClient({ + serverSelectionTimeoutMS: 10, + timeoutMS: 20 + }); + const start = now(); + const maybeError = await client + .db('db') + .collection('coll') + .insertOne({ x: 1 }) + .then( + () => null, + e => e + ); + const end = now(); + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + expect(end - start).to.be.lte(15); + }).skipReason = + 'TODO(DRIVERS-2347): Requires this ticket to be implemented before we can assert on connection CSOT behaviour'; }); - context('9. endSession', () => { + context.skip('9. endSession', () => { /** * This test MUST only be run against replica sets and sharded clusters with server version 4.4 or higher. It MUST be * run three times: once with the timeout specified via the MongoClient `timeoutMS` option, once with the timeout @@ -472,7 +623,7 @@ describe.skip('CSOT spec prose tests', () => { */ }); - context('10. Convenient Transactions', () => { + context.skip('10. Convenient Transactions', () => { /** Tests in this section MUST only run against replica sets and sharded clusters with server versions 4.4 or higher. */ context('timeoutMS is refreshed for abortTransaction if the callback fails', () => { diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts index cf9c5f736ff..c1426d8db1d 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts @@ -1,51 +1,105 @@ -/* eslint-disable @typescript-eslint/no-empty-function */ /** * The following tests are described in CSOTs spec prose tests as "unit" tests * The tests enumerated in this section could not be expressed in either spec or prose format. * Drivers SHOULD implement these if it is possible to do so using the driver's existing test infrastructure. */ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { ConnectionPool, type MongoClient, Timeout, Topology } from '../../mongodb'; + // TODO(NODE-5824): Implement CSOT prose tests -describe.skip('CSOT spec unit tests', () => { - context('Operations should ignore waitQueueTimeoutMS if timeoutMS is also set.', () => {}); - - context( - 'If timeoutMS is set for an operation, the remaining timeoutMS value should apply to connection checkout after a server has been selected.', - () => {} - ); - - context( - 'If timeoutMS is not set for an operation, waitQueueTimeoutMS should apply to connection checkout after a server has been selected.', - () => {} - ); - - context( - 'If a new connection is required to execute an operation, min(remaining computedServerSelectionTimeout, connectTimeoutMS) should apply to socket establishment.', - () => {} - ); - - context( - 'For drivers that have control over OCSP behavior, min(remaining computedServerSelectionTimeout, 5 seconds) should apply to HTTP requests against OCSP responders.', - () => {} - ); - - context( - 'If timeoutMS is unset, operations fail after two non-consecutive socket timeouts.', - () => {} - ); - - context( - 'The remaining timeoutMS value should apply to HTTP requests against KMS servers for CSFLE.', - () => {} - ); - - context( - 'The remaining timeoutMS value should apply to commands sent to mongocryptd as part of automatic encryption.', - () => {} - ); - - context( - 'When doing minPoolSize maintenance, connectTimeoutMS is used as the timeout for socket establishment.', - () => {} - ); +describe('CSOT spec unit tests', function () { + let client: MongoClient; + + afterEach(async function () { + sinon.restore(); + await client?.close(); + }); + + context('Server Selection and Connection Checkout', function () { + it('Operations should ignore waitQueueTimeoutMS if timeoutMS is also set.', async function () { + client = this.configuration.newClient({ waitQueueTimeoutMS: 999999, timeoutMS: 10000 }); + sinon.spy(Timeout, 'expires'); + + await client.db('db').collection('collection').insertOne({ x: 1 }); + + expect(Timeout.expires).to.have.been.calledWith(10000); + expect(Timeout.expires).to.not.have.been.calledWith(999999); + }); + + it('If timeoutMS is set for an operation, the remaining timeoutMS value should apply to connection checkout after a server has been selected.', async function () { + client = this.configuration.newClient({ timeoutMS: 1000 }); + // Spy on connection checkout and pull options argument + const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut'); + const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer'); + const expiresSpy = sinon.spy(Timeout, 'expires'); + + await client.db('db').collection('collection').insertOne({ x: 1 }); + + expect(checkoutSpy).to.have.been.calledOnce; + expect(checkoutSpy.firstCall.args[0].timeout).to.exist; + // Check that we passed through the timeout + expect(checkoutSpy.firstCall.args[0].timeout).to.equal( + selectServerSpy.lastCall.lastArg.timeout + ); + + // Check that no more Timeouts are constructed after we enter checkout + expect(!expiresSpy.calledAfter(checkoutSpy)); + }); + + it('If timeoutMS is not set for an operation, waitQueueTimeoutMS should apply to connection checkout after a server has been selected.', async function () { + client = this.configuration.newClient({ waitQueueTimeoutMS: 123456 }); + + const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut'); + const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer'); + const expiresSpy = sinon.spy(Timeout, 'expires'); + + await client.db('db').collection('collection').insertOne({ x: 1 }); + expect(checkoutSpy).to.have.been.calledAfter(selectServerSpy); + + expect(expiresSpy).to.have.been.calledWith(123456); + }); + + /* eslint-disable @typescript-eslint/no-empty-function */ + context.skip( + 'If a new connection is required to execute an operation, min(remaining computedServerSelectionTimeout, connectTimeoutMS) should apply to socket establishment.', + () => {} + ).skipReason = + 'TODO(DRIVERS-2347): Requires this ticket to be implemented before we can assert on connection CSOT behaviour'; + + context( + 'For drivers that have control over OCSP behavior, min(remaining computedServerSelectionTimeout, 5 seconds) should apply to HTTP requests against OCSP responders.', + () => {} + ); + }); + + context.skip('Socket timeouts', function () { + context( + 'If timeoutMS is unset, operations fail after two non-consecutive socket timeouts.', + () => {} + ); + }).skipReason = + 'TODO(NODE-5682): Add CSOT support for socket read/write at the connection layer for CRUD APIs'; + + context.skip('Client side encryption', function () { + context( + 'The remaining timeoutMS value should apply to HTTP requests against KMS servers for CSFLE.', + () => {} + ); + + context( + 'The remaining timeoutMS value should apply to commands sent to mongocryptd as part of automatic encryption.', + () => {} + ); + }).skipReason = 'TODO(NODE-5686): Add CSOT support to client side encryption'; + + context.skip('Background Connection Pooling', function () { + context( + 'When doing minPoolSize maintenance, connectTimeoutMS is used as the timeout for socket establishment.', + () => {} + ); + }).skipReason = 'TODO(NODE-6091): Implement CSOT logic for Background Connection Pooling'; + /* eslint-enable @typescript-eslint/no-empty-function */ }); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index b6a936afbb9..5636eb00db7 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -7,7 +7,9 @@ import { type Collection, type Db, type FindCursor, - type MongoClient + LEGACY_HELLO_COMMAND, + type MongoClient, + MongoOperationTimeoutError } from '../../mongodb'; describe('CSOT driver tests', () => { @@ -94,4 +96,75 @@ describe('CSOT driver tests', () => { }); }); }); + + describe('autoconnect', () => { + let client: MongoClient; + + afterEach(async function () { + await client?.close(); + client = undefined; + }); + + describe('when failing autoconnect with timeoutMS defined', () => { + let configClient: MongoClient; + + beforeEach(async function () { + configClient = this.configuration.newClient(); + const result = await configClient + .db() + .admin() + .command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + blockConnection: true, + blockTimeMS: 10 + } + }); + expect(result).to.have.property('ok', 1); + }); + + afterEach(async function () { + const result = await configClient + .db() + .admin() + .command({ + configureFailPoint: 'failCommand', + mode: 'off', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + blockConnection: true, + blockTimeMS: 10 + } + }); + expect(result).to.have.property('ok', 1); + await configClient.close(); + }); + + it('throws a MongoOperationTimeoutError', { + metadata: { requires: { mongodb: '>=4.4' } }, + test: async function () { + const commandsStarted = []; + client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true }); + + client.on('commandStarted', ev => commandsStarted.push(ev)); + + const maybeError = await client + .db('test') + .collection('test') + .insertOne({ a: 19 }) + .then( + () => null, + e => e + ); + + expect(maybeError).to.exist; + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + + expect(commandsStarted).to.have.length(0); // Ensure that we fail before we start the insertOne + } + }); + }); + }); }); diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 69102e1f150..18048befab4 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -5,7 +5,7 @@ const { WaitQueueTimeoutError } = require('../../mongodb'); const mock = require('../../tools/mongodb-mock/index'); const sinon = require('sinon'); const { expect } = require('chai'); -const { setImmediate } = require('timers'); +const { setImmediate } = require('timers/promises'); const { ns, isHello } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); const { topologyWithPlaceholderClient } = require('../../tools/utils'); @@ -26,6 +26,9 @@ describe('Connection Pool', function () { options: { extendedMetadata: {} } + }, + s: { + serverSelectionTimeoutMS: 0 } } }; @@ -98,7 +101,7 @@ describe('Connection Pool', function () { pool.checkIn(conn); }); - it('should clear timed out wait queue members if no connections are available', function (done) { + it('should clear timed out wait queue members if no connections are available', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -114,23 +117,15 @@ describe('Connection Pool', function () { pool.ready(); - pool.checkOut().then(conn => { - expect(conn).to.exist; - pool.checkOut().then(expect.fail, err => { - expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); - - // We can only process the wait queue with `checkIn` and `checkOut`, so we - // force the pool here to think there are no available connections, even though - // we are checking the connection back in. This simulates a slow leak where - // incoming requests outpace the ability of the queue to fully process cancelled - // wait queue members - sinon.stub(pool, 'availableConnectionCount').get(() => 0); - pool.checkIn(conn); - - setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0)); - done(); - }); - }, expect.fail); + const conn = await pool.checkOut(); + const err = await pool.checkOut().catch(e => e); + expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); + sinon.stub(pool, 'availableConnectionCount').get(() => 0); + pool.checkIn(conn); + + await setImmediate(); + + expect(pool).property('waitQueueSize').to.equal(0); }); describe('minPoolSize population', function () { diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index 6509568c018..90eb355ad83 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -106,6 +106,7 @@ const EXPECTED_EXPORTS = [ 'MongoTailableCursorError', 'MongoTopologyClosedError', 'MongoTransactionError', + 'MongoOperationTimeoutError', 'MongoUnexpectedServerResponseError', 'MongoWriteConcernError', 'ObjectId',