Skip to content

Commit

Permalink
Install timeout throughout operation layer
Browse files Browse the repository at this point in the history
update with timeout

start prose test impl

add timeout to find.execute

start implementing prose tests

don't construct Timeout when not needed

ensure that timeoutMS is passed down correctly

start working on unit tests

continue prose test implementation

revert spec test changes

revert spec test changes

revert spec test changes

support timeout on run_command

continue prose test implementation

prose test changes

WIP - server selection changes

revert unneeded connection changes

add serverSelectionTimeout to run_command

use correct timeout

reorder operations

formatting

skip some CSOT tests that cannot be made to pass here

Improve timeout messages

silence eslint test issues

bump timeout values

misc changes

rename timeout

make getter internal

rename timeout

remove unneeded change for this PR

clear server selection timeout after checkout and remove command execution timeout

move Timeout.min to independent helper function

move Timeout.min to independent helper function

update timeout propagation

clean up

cleanup

test cleanup

clean up

simplify calculation

cleanup

clarify branching timeout behaviour

operationTimeout -> timeout

ensure timeouts are properly cleared

don't race if given infinite timeout

default clearTimeout to false

remove clearTimeout variable

conditionally clear timeout on early return

fix unit tests

bump test timeout value

replace test with sinon fake timer test

Update src/sdam/topology.ts

Co-authored-by: Neal Beeken <[email protected]>

clean up logic

Update test to assert on current behaviour

fix autoconnect

add test

remove .only

fix test

ensure test only runs when failcommand is available

do not run on 4.2

fix csot test?

add failpoint to correct commands

fix timeout signature
  • Loading branch information
W-A-James committed Jun 5, 2024
1 parent f790cc1 commit e3b7a63
Show file tree
Hide file tree
Showing 21 changed files with 575 additions and 176 deletions.
3 changes: 2 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
})
);
}
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type Timeout } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -92,6 +93,9 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

/** @internal */
timeout?: Timeout;
}

/** @public */
Expand Down
51 changes: 38 additions & 13 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import {
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Timeout, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -102,7 +103,6 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
export interface WaitQueueMember {
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeout: Timeout;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -354,35 +354,57 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
async checkOut(): Promise<Connection> {
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);
let timeout: Timeout | null = null;
if (options?.timeout) {
// CSOT enabled
// Determine if we're using the timeout passed in or a new timeout
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
// This check determines whether or not Topology.selectServer used the configured
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
if (
options.timeout.duration === serverSelectionTimeoutMS ||
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
) {
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
// here
timeout = options.timeout;
} else {
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
}
}
} else {
timeout = Timeout.expires(waitQueueTimeoutMS);
}

const waitQueueMember: WaitQueueMember = {
resolve,
reject,
timeout
reject
};

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

try {
return await Promise.race([promise, waitQueueMember.timeout]);
timeout?.throwIfExpired();
return await (timeout ? Promise.race([promise, timeout]) : promise);
} catch (error) {
if (TimeoutError.is(error)) {
waitQueueMember[kCancelled] = true;

waitQueueMember.timeout.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
Expand All @@ -393,9 +415,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
: 'Timed out while checking out a connection from connection pool',
this.address
);
if (options?.timeout) {
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
cause: timeoutError
});
}
throw timeoutError;
}
throw error;
} finally {
if (timeout !== options?.timeout) timeout?.clear();
}
}

Expand Down Expand Up @@ -761,7 +790,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
waitQueueMember.timeout.clear();
this[kWaitQueue].shift();
waitQueueMember.reject(error);
continue;
Expand All @@ -782,7 +810,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.timeout.clear();

this[kWaitQueue].shift();
waitQueueMember.resolve(connection);
Expand Down Expand Up @@ -820,8 +847,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
waitQueueMember.resolve(connection);
}

waitQueueMember.timeout.clear();
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
5 changes: 5 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ export class Collection<TSchema extends Document = Document> {
this.s.collectionHint = normalizeHintField(v);
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options.timeoutMS;
}

/**
* Inserts a single document into MongoDB. If documents passed in do not contain the **_id** field,
* one will be added to each of the documents missing it by the driver, mutating the document. This behavior
Expand Down
6 changes: 6 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ export class Db {
return this.s.namespace.toString();
}

/** @internal */
get timeoutMS(): number | undefined {
return this.s.options?.timeoutMS;
}

/**
* Create a new collection on a server with the specified options. Use this to create capped collections.
* More information about command options available at https://www.mongodb.com/docs/manual/reference/command/create/
Expand Down Expand Up @@ -272,6 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
9 changes: 9 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,15 @@ export class MongoUnexpectedServerResponseError extends MongoRuntimeError {
}
}

/**
* @internal
*/
export class MongoOperationTimeoutError extends MongoRuntimeError {
override get name(): string {
return 'MongoOperationTimeoutError';
}
}

/**
* An error thrown when the user attempts to add options to a cursor that has already been
* initialized
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export {
MongoNetworkTimeoutError,
MongoNotConnectedError,
MongoOIDCError,
MongoOperationTimeoutError,
MongoParseError,
MongoRuntimeError,
MongoServerClosedError,
Expand Down
2 changes: 2 additions & 0 deletions src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface OperationParent {
writeConcern?: WriteConcern;
readPreference?: ReadPreference;
bsonOptions?: BSONSerializeOptions;
timeoutMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -117,6 +118,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
const options = {
...this.options,
...this.bsonOptions,
timeout: this.timeout,
readPreference: this.readPreference,
session
};
Expand Down
8 changes: 7 additions & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from '../sdam/server_selection';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { Timeout } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

Expand Down Expand Up @@ -152,9 +153,13 @@ export async function executeOperation<
selector = readPreference;
}

const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
operation.timeout = timeout;

const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
timeout
});

if (session == null) {
Expand Down Expand Up @@ -265,6 +270,7 @@ async function retryOperation<
// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
timeout: operation.timeout,
operationName: operation.commandName,
previousServer
});
Expand Down
3 changes: 2 additions & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ export class FindOperation extends CommandOperation<Document> {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
session,
timeout: this.timeout
},
undefined
);
Expand Down
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Timeout } from '../timeout';
import type { MongoDBNamespace } from '../utils';

export const Aspect = {
Expand Down Expand Up @@ -61,6 +62,11 @@ export abstract class AbstractOperation<TResult = any> {

options: OperationOptions;

/** @internal */
timeout?: Timeout;
/** @internal */
timeoutMS?: number;

[kSession]: ClientSession | undefined;

constructor(options: OperationOptions = {}) {
Expand All @@ -76,6 +82,8 @@ export abstract class AbstractOperation<TResult = any> {
this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;

this.timeoutMS = options.timeoutMS;
}

/** Must match the first key of the command object sent to the server.
Expand Down
6 changes: 5 additions & 1 deletion src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export type RunCommandOptions = {
session?: ClientSession;
/** The read preference */
readPreference?: ReadPreferenceLike;
/** @internal */
timeoutMS?: number;
} & BSONSerializeOptions;

/** @internal */
Expand All @@ -31,6 +33,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
timeout: this.timeout,
session
});
return res;
Expand Down Expand Up @@ -58,7 +61,8 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
const res: TODO_NODE_3286 = await server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
session,
timeout: this.timeout
});
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.incrementOperationCount();
if (conn == null) {
try {
conn = await this.pool.checkOut();
conn = await this.pool.checkOut(options);
if (this.loadBalanced && isPinnableCommand(cmd, session)) {
session?.pin(conn);
}
Expand All @@ -333,6 +333,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
await this.pool.reauthenticate(conn);
// TODO(NODE-5682): Implement CSOT support for socket read/write at the connection layer
try {
return await conn.command(ns, cmd, finalOptions, responseType);
} catch (commandError) {
Expand Down
Loading

0 comments on commit e3b7a63

Please sign in to comment.