Skip to content

Commit

Permalink
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
Browse files Browse the repository at this point in the history
Co-authored-by: Warren James <[email protected]>
  • Loading branch information
2 people authored and baileympearson committed Oct 1, 2024
1 parent c0dd1ca commit 8d1a2d3
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 77 deletions.
5 changes: 4 additions & 1 deletion src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ export class Admin {
* @param options - Optional settings for the command
*/
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
return await executeOperation(
this.s.db.client,
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
);
}

/**
Expand Down
66 changes: 58 additions & 8 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoOperationTimeoutError,
MongoParseError,
MongoServerError,
MongoUnexpectedServerResponseError
Expand All @@ -30,7 +31,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { type TimeoutContext, TimeoutError } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -419,6 +420,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
? new OpMsgRequest(db, cmd, commandOptions)
: new OpQueryRequest(db, cmd, commandOptions);
Expand All @@ -433,7 +439,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
): AsyncGenerator<MongoDBResponse> {
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
if (options.timeoutContext?.csotEnabled()) {
this.socket.setTimeout(0);
} else if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
Expand All @@ -442,7 +450,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
zlibCompressionLevel: this.description.zlibCompressionLevel,
timeoutContext: options.timeoutContext
});

if (options.noResponse || message.moreToCome) {
Expand All @@ -452,7 +461,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

this.throwIfAborted();

for await (const response of this.readMany()) {
if (
options.timeoutContext?.csotEnabled() &&
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}

for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down Expand Up @@ -629,7 +648,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeoutContext?: TimeoutContext;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
Expand All @@ -641,8 +664,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (options.timeoutContext?.csotEnabled()) {
if (
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}
}

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');

const drainEvent = once<void>(this.socket, 'drain');
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
if (timeout) {
try {
return await Promise.race([drainEvent, timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
}
}
return await drainEvent;
}

/**
Expand All @@ -654,10 +701,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
private async *readMany(options: {
timeoutContext?: TimeoutContext;
}): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.dataEvents = onData(this.messageStream, options);
this.messageStream.resume();

for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand Down
17 changes: 14 additions & 3 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { type EventEmitter } from 'events';

import { MongoOperationTimeoutError } from '../../error';
import { type TimeoutContext, TimeoutError } from '../../timeout';
import { List, promiseWithResolvers } from '../../utils';

/**
Expand All @@ -18,7 +20,10 @@ type PendingPromises = Omit<
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
* It will reject upon an error event.
*/
export function onData(emitter: EventEmitter) {
export function onData(
emitter: EventEmitter,
{ timeoutContext }: { timeoutContext?: TimeoutContext }
) {
// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down Expand Up @@ -86,6 +91,8 @@ export function onData(emitter: EventEmitter) {
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);
// eslint-disable-next-line github/no-then
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);

return iterator;

Expand All @@ -97,8 +104,12 @@ export function onData(emitter: EventEmitter) {

function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
if (promise != null) promise.reject(err);
else error = err;
const timeoutError = TimeoutError.is(err)
? new MongoOperationTimeoutError('Timed out during socket read')
: undefined;

if (promise != null) promise.reject(timeoutError ?? err);
else error = timeoutError ?? err;
void closeHandler();
}

Expand Down
2 changes: 1 addition & 1 deletion src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export class Db {
this.client,
new RunCommandOperation(this, command, {
...resolveBSONOptions(options),
timeoutMS: options?.timeoutMS,
timeoutMS: options?.timeoutMS ?? this.timeoutMS,
session: options?.session,
readPreference: options?.readPreference
})
Expand Down
17 changes: 10 additions & 7 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,29 +460,28 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

const timeoutMS = this.client.s.options.timeoutMS;
// TODO(NODE-6223): auto connect cannot use timeoutMS
// const timeoutMS = this.client.s.options.timeoutMS;
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
const readPreference = options.readPreference ?? ReadPreference.primary;

const timeoutContext = TimeoutContext.create({
timeoutMS,
timeoutMS: undefined,
serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
});

const selectServerOptions = {
operationName: 'ping',
...options,
timeoutContext
};

try {
const server = await this.selectServer(
readPreferenceServerSelector(readPreference),
selectServerOptions
);

const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
if (!skipPingOnConnect && server && this.s.credentials) {
if (!skipPingOnConnect && this.s.credentials) {
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
stateTransition(this, STATE_CONNECTED);
this.emit(Topology.OPEN, this);
Expand Down Expand Up @@ -623,7 +622,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

try {
timeout?.throwIfExpired();
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
}
return server;
} catch (error) {
if (TimeoutError.is(error)) {
// Timeout
Expand Down
43 changes: 36 additions & 7 deletions src/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { clearTimeout, setTimeout } from 'timers';

import { MongoInvalidArgumentError, MongoRuntimeError } from './error';
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
import { csotMin, noop } from './utils';

/** @internal */
Expand Down Expand Up @@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
}

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = false) {
private constructor(executor: Executor = () => null, duration: number, unref = true) {
let reject!: Reject;

if (duration < 0) {
Expand Down Expand Up @@ -163,6 +163,10 @@ export abstract class TimeoutContext {

abstract get clearConnectionCheckoutTimeout(): boolean;

abstract get timeoutForSocketWrite(): Timeout | null;

abstract get timeoutForSocketRead(): Timeout | null;

abstract csotEnabled(): this is CSOTTimeoutContext;
}

Expand All @@ -175,13 +179,15 @@ export class CSOTTimeoutContext extends TimeoutContext {
clearConnectionCheckoutTimeout: boolean;
clearServerSelectionTimeout: boolean;

private _maxTimeMS?: number;

private _serverSelectionTimeout?: Timeout | null;
private _connectionCheckoutTimeout?: Timeout | null;
public minRoundTripTime = 0;
private start: number;

constructor(options: CSOTTimeoutContextOptions) {
super();
this.start = Math.trunc(performance.now());

this.timeoutMS = options.timeoutMS;

this.serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
Expand All @@ -193,11 +199,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
}

get maxTimeMS(): number {
return this._maxTimeMS ?? -1;
return this.remainingTimeMS - this.minRoundTripTime;
}

set maxTimeMS(v: number) {
this._maxTimeMS = v;
get remainingTimeMS() {
const timePassed = Math.trunc(performance.now()) - this.start;
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
}

csotEnabled(): this is CSOTTimeoutContext {
Expand Down Expand Up @@ -238,6 +245,20 @@ export class CSOTTimeoutContext extends TimeoutContext {
}
return this._connectionCheckoutTimeout;
}

get timeoutForSocketWrite(): Timeout | null {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket write');
}

get timeoutForSocketRead(): Timeout | null {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket read');
}
}

/** @internal */
Expand Down Expand Up @@ -268,4 +289,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
return Timeout.expires(this.options.waitQueueTimeoutMS);
return null;
}

get timeoutForSocketWrite(): Timeout | null {
return null;
}

get timeoutForSocketRead(): Timeout | null {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ describe('CSOT spec prose tests', function () {
clock.restore();
});

it('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
it.skip('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
/**
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?serverSelectionTimeoutMS=10`.
* 1. Using `client`, execute the command `{ ping: 1 }` against the `admin` database.
Expand Down Expand Up @@ -416,10 +416,11 @@ describe('CSOT spec prose tests', function () {

await clock.tickAsync(11);
expect(await maybeError).to.be.instanceof(MongoServerSelectionError);
});
}).skipReason =
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
});

it("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
it.skip("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
/**
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20`.
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
Expand All @@ -440,9 +441,10 @@ describe('CSOT spec prose tests', function () {

expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
expect(end - start).to.be.lte(15);
});
}).skipReason =
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';

it("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
it.skip("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
/**
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10`.
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
Expand All @@ -462,9 +464,10 @@ describe('CSOT spec prose tests', function () {

expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
expect(end - start).to.be.lte(15);
});
}).skipReason =
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';

it('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
it.skip('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
/**
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10`.
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
Expand All @@ -484,7 +487,8 @@ describe('CSOT spec prose tests', function () {

expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
expect(end - start).to.be.lte(15);
});
}).skipReason =
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';

it.skip("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", async function () {
/**
Expand Down
Loading

0 comments on commit 8d1a2d3

Please sign in to comment.