Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5682): set maxTimeMS on commands and preempt I/O #4174

Merged
merged 24 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a58ce34
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
7b4a1fb
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
3f4313e
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
4aa6575
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
898b93f
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
W-A-James Jun 27, 2024
6125789
WIP maxTimeMS
nbbeeken Jul 11, 2024
749ad6d
chore: rm stack trace
nbbeeken Jul 11, 2024
e7dec00
pass timeoutMS to runCommand
W-A-James Jul 15, 2024
8d83bf5
wrap errors correctly
W-A-James Jul 15, 2024
74ae021
throw MongoOperationTimeoutError instead of TimeoutError
W-A-James Jul 15, 2024
59eb1b1
pass timeoutContext through to Connection.command
W-A-James Jul 15, 2024
ad90766
remove unused field
W-A-James Jul 15, 2024
6f9925c
don't ejsonify logs
W-A-James Jul 15, 2024
a432320
pass timeoutMS through to listDatabases
W-A-James Jul 15, 2024
f897155
Remove Error.captureStackTrace calls
W-A-James Jul 15, 2024
edf63e4
unskip tests
W-A-James Jul 16, 2024
7c79517
fix: connect ignores timeoutMS setting from client
nbbeeken Jul 23, 2024
2fb3226
chore: unset timeoutMS not 0
nbbeeken Jul 23, 2024
9ee5112
test: skip more connect tests
nbbeeken Jul 23, 2024
f5f9bc9
chore: add ticket
nbbeeken Jul 23, 2024
2fdeab8
comments 1
nbbeeken Jul 24, 2024
520fc51
chore: move socket timeouts into method
nbbeeken Jul 24, 2024
89ddaae
chore: move start
nbbeeken Jul 25, 2024
5e975d7
chore: linttttttttttttttttttttttttttttttttttttttttttttttt
nbbeeken Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ export class Admin {
* @param options - Optional settings for the command
*/
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
return await executeOperation(
this.s.db.client,
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
);
}

/**
Expand Down
66 changes: 58 additions & 8 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoOperationTimeoutError,
MongoParseError,
MongoServerError,
MongoUnexpectedServerResponseError
Expand All @@ -29,7 +30,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { type TimeoutContext, TimeoutError } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -416,6 +417,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused why this logic is necessary - if the remaining timeoutMS is less than minRoundTripTime, we're supposed to throw and exit. So maxTimeMS should never be negative here - right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but this is a check for greater than 0, so it confirms that maxTimeMS does not equal 0. The checks for if we should throw are colocated with the write/read steps.

}

const message = this.supportsOpMsg
? new OpMsgRequest(db, cmd, commandOptions)
: new OpQueryRequest(db, cmd, commandOptions);
Expand All @@ -430,7 +436,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
): AsyncGenerator<MongoDBResponse> {
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
if (options.timeoutContext?.csotEnabled()) {
this.socket.setTimeout(0);
} else if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -439,7 +447,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
zlibCompressionLevel: this.description.zlibCompressionLevel,
timeoutContext: options.timeoutContext
});

if (options.noResponse) {
Expand All @@ -449,7 +458,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

this.throwIfAborted();

for await (const response of this.readMany()) {
if (
options.timeoutContext?.csotEnabled() &&
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}

for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down Expand Up @@ -622,7 +641,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeoutContext?: TimeoutContext;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
Expand All @@ -634,8 +657,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (options.timeoutContext?.csotEnabled()) {
if (
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}
}

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');

const drainEvent = once<void>(this.socket, 'drain');
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
if (timeout) {
try {
return await Promise.race([drainEvent, timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
}
}
return await drainEvent;
}

/**
Expand All @@ -647,9 +694,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
private async *readMany(options: {
timeoutContext?: TimeoutContext;
}): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.dataEvents = onData(this.messageStream, options);

for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand Down
17 changes: 14 additions & 3 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { type EventEmitter } from 'events';

import { MongoOperationTimeoutError } from '../../error';
import { type TimeoutContext, TimeoutError } from '../../timeout';
import { List, promiseWithResolvers } from '../../utils';

/**
Expand All @@ -18,7 +20,10 @@ type PendingPromises = Omit<
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
* It will reject upon an error event.
*/
export function onData(emitter: EventEmitter) {
export function onData(
emitter: EventEmitter,
{ timeoutContext }: { timeoutContext?: TimeoutContext }
) {
// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down Expand Up @@ -86,6 +91,8 @@ export function onData(emitter: EventEmitter) {
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
// eslint-disable-next-line github/no-then
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);

return iterator;

Expand All @@ -97,8 +104,12 @@ export function onData(emitter: EventEmitter) {

function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.reject(err);
else error = err;
const timeoutError = TimeoutError.is(err)
? new MongoOperationTimeoutError('Timed out during socket read')
: undefined;

if (promise != null) promise.reject(timeoutError ?? err);
else error = timeoutError ?? err;
void closeHandler();
}

Expand Down
2 changes: 1 addition & 1 deletion src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
timeoutMS: options?.timeoutMS ?? this.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
17 changes: 10 additions & 7 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,29 +460,28 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

const timeoutMS = this.client.s.options.timeoutMS;
// TODO(NODE-6223): auto connect cannot use timeoutMS
// const timeoutMS = this.client.s.options.timeoutMS;
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
const readPreference = options.readPreference ?? ReadPreference.primary;

const timeoutContext = TimeoutContext.create({
timeoutMS,
timeoutMS: undefined,
serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
});

const selectServerOptions = {
operationName: 'ping',
...options,
timeoutContext
};

try {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
const server = await this.selectServer(
readPreferenceServerSelector(readPreference),
selectServerOptions
);

const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
if (!skipPingOnConnect && server && this.s.credentials) {
if (!skipPingOnConnect && this.s.credentials) {
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
stateTransition(this, STATE_CONNECTED);
this.emit(Topology.OPEN, this);
Expand Down Expand Up @@ -623,7 +622,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

try {
timeout?.throwIfExpired();
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
}
return server;
} catch (error) {
if (TimeoutError.is(error)) {
// Timeout
Expand Down
45 changes: 38 additions & 7 deletions src/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { clearTimeout, setTimeout } from 'timers';

import { MongoInvalidArgumentError, MongoRuntimeError } from './error';
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
import { csotMin, noop } from './utils';

/** @internal */
Expand Down Expand Up @@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
}

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = false) {
private constructor(executor: Executor = () => null, duration: number, unref = true) {
let reject!: Reject;

if (duration < 0) {
Expand Down Expand Up @@ -150,6 +150,11 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions

/** @internal */
export abstract class TimeoutContext {
start: number;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
constructor() {
this.start = Math.trunc(performance.now());
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
}

static create(options: TimeoutContextOptions): TimeoutContext {
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);
Expand All @@ -164,6 +169,10 @@ export abstract class TimeoutContext {

abstract get clearConnectionCheckoutTimeout(): boolean;

abstract get timeoutForSocketWrite(): Timeout | null;

abstract get timeoutForSocketRead(): Timeout | null;

abstract csotEnabled(): this is CSOTTimeoutContext;
}

Expand All @@ -176,10 +185,9 @@ export class CSOTTimeoutContext extends TimeoutContext {
clearConnectionCheckoutTimeout: boolean;
clearServerSelectionTimeout: boolean;

private _maxTimeMS?: number;

private _serverSelectionTimeout?: Timeout | null;
private _connectionCheckoutTimeout?: Timeout | null;
public minRoundTripTime = 0;

constructor(options: CSOTTimeoutContextOptions) {
super();
Expand All @@ -194,11 +202,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
}

get maxTimeMS(): number {
return this._maxTimeMS ?? -1;
return this.remainingTimeMS - this.minRoundTripTime;
}

set maxTimeMS(v: number) {
this._maxTimeMS = v;
get remainingTimeMS() {
const timePassed = Math.trunc(performance.now()) - this.start;
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
}

csotEnabled(): this is CSOTTimeoutContext {
Expand Down Expand Up @@ -239,6 +248,20 @@ export class CSOTTimeoutContext extends TimeoutContext {
}
return this._connectionCheckoutTimeout;
}

get timeoutForSocketWrite(): Timeout | null {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket write');
}

get timeoutForSocketRead(): Timeout | null {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket read');
}
}

/** @internal */
Expand Down Expand Up @@ -269,4 +292,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
return Timeout.expires(this.options.waitQueueTimeoutMS);
return null;
}

get timeoutForSocketWrite(): Timeout | null {
return null;
}

get timeoutForSocketRead(): Timeout | null {
return null;
}
}
Loading