Skip to content

Commit

Permalink
WIP maxTimeMS
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jul 11, 2024
1 parent e5b80ec commit dca1a9c
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 75 deletions.
5 changes: 5 additions & 0 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export class CommandStartedEvent {
command: WriteProtocolMessageType,
serverConnectionId: bigint | null
) {
Error.captureStackTrace(this);
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
Expand Down Expand Up @@ -110,6 +111,7 @@ export class CommandSucceededEvent {
started: number,
serverConnectionId: bigint | null
) {
Error.captureStackTrace(this);
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
Expand All @@ -130,6 +132,8 @@ export class CommandSucceededEvent {
}
}

Error.stackTraceLimit = 1000;

/**
* An event indicating the failure of a given command
* @public
Expand Down Expand Up @@ -168,6 +172,7 @@ export class CommandFailedEvent {
started: number,
serverConnectionId: bigint | null
) {
Error.captureStackTrace(this);
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
Expand Down
58 changes: 50 additions & 8 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { Timeout, type TimeoutContext, TimeoutError } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -416,6 +416,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
? new OpMsgRequest(db, cmd, commandOptions)
: new OpQueryRequest(db, cmd, commandOptions);
Expand All @@ -430,7 +435,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
): AsyncGenerator<MongoDBResponse> {
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
if (options.timeoutContext?.csotEnabled()) {
this.socket.setTimeout(0);
} else if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
Expand All @@ -439,7 +446,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
zlibCompressionLevel: this.description.zlibCompressionLevel,
timeoutContext: options.timeoutContext
});

if (options.noResponse) {
Expand All @@ -449,7 +457,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

this.throwIfAborted();

for await (const response of this.readMany()) {
if (
options.timeoutContext?.csotEnabled() &&
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new TimeoutError('Server roundtrip time is greater than the time remaining');
}

for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down Expand Up @@ -622,7 +638,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeoutContext?: TimeoutContext;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
Expand All @@ -634,8 +654,23 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (options.timeoutContext?.csotEnabled()) {
if (
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new TimeoutError('Server roundtrip time is greater than the time remaining');
}
}

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');

const drainEvent = once<void>(this.socket, 'drain');
if (options.timeoutContext?.csotEnabled()) {
const timeout = Timeout.expires(options.timeoutContext.remainingTimeMS);
return await Promise.race([drainEvent, timeout]);
}
return await drainEvent;
}

/**
Expand All @@ -647,9 +682,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
private async *readMany(options: {
timeoutContext?: TimeoutContext;
}): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
const timeoutMS = options.timeoutContext?.csotEnabled()
? options.timeoutContext.remainingTimeMS
: 0;

this.dataEvents = onData(this.messageStream, { timeoutMS });

for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand Down
7 changes: 6 additions & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type EventEmitter } from 'events';

import { Timeout } from '../../timeout';
import { List, promiseWithResolvers } from '../../utils';

/**
Expand All @@ -18,7 +19,7 @@ type PendingPromises = Omit<
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
* It will reject upon an error event.
*/
export function onData(emitter: EventEmitter) {
export function onData(emitter: EventEmitter, { timeoutMS }: { timeoutMS: number }) {
// Setup pending events and pending promise lists
/**
* When the caller has not yet called .next(), we store the
Expand Down Expand Up @@ -87,6 +88,10 @@ export function onData(emitter: EventEmitter) {
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);

if (timeoutMS > 0 && Number.isFinite(timeoutMS))
// eslint-disable-next-line github/no-then
Timeout.expires(timeoutMS).then(undefined, errorHandler);

return iterator;

function eventHandler(value: Buffer) {
Expand Down
45 changes: 23 additions & 22 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,29 +460,26 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

const timeoutMS = this.client.s.options.timeoutMS;
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
const readPreference = options.readPreference ?? ReadPreference.primary;

const timeoutContext = TimeoutContext.create({
timeoutMS,
serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
});

const selectServerOptions = {
operationName: 'ping',
...options,
timeoutContext
};
try {
const server = await this.selectServer(
readPreferenceServerSelector(readPreference),
selectServerOptions
);

const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
if (!skipPingOnConnect && server && this.s.credentials) {
if (!skipPingOnConnect && this.s.credentials) {
const timeoutMS = this.client.s.options.timeoutMS;
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
const readPreference = options.readPreference ?? ReadPreference.primary;
const timeoutContext = TimeoutContext.create({
timeoutMS,
serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
});
const selectServerOptions = {
operationName: 'ping',
...options,
timeoutContext
};
const server = await this.selectServer(
readPreferenceServerSelector(readPreference),
selectServerOptions
);
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
stateTransition(this, STATE_CONNECTED);
this.emit(Topology.OPEN, this);
Expand Down Expand Up @@ -623,7 +620,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

try {
timeout?.throwIfExpired();
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
}
return server;
} catch (error) {
if (TimeoutError.is(error)) {
// Timeout
Expand Down
15 changes: 11 additions & 4 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
}

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = false) {
private constructor(executor: Executor = () => null, duration: number, unref = true) {
let reject!: Reject;

if (duration < 0) {
Expand Down Expand Up @@ -150,6 +150,11 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions

/** @internal */
export abstract class TimeoutContext {
start: number;
constructor() {
this.start = Math.trunc(performance.now());
}

static create(options: TimeoutContextOptions): TimeoutContext {
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);
Expand Down Expand Up @@ -180,6 +185,7 @@ export class CSOTTimeoutContext extends TimeoutContext {

private _serverSelectionTimeout?: Timeout | null;
private _connectionCheckoutTimeout?: Timeout | null;
public minRoundTripTime = 0;

constructor(options: CSOTTimeoutContextOptions) {
super();
Expand All @@ -194,11 +200,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
}

get maxTimeMS(): number {
return this._maxTimeMS ?? -1;
return this.remainingTimeMS - this.minRoundTripTime;
}

set maxTimeMS(v: number) {
this._maxTimeMS = v;
get remainingTimeMS() {
const timePassed = Math.trunc(performance.now()) - this.start;
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
}

csotEnabled(): this is CSOTTimeoutContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,40 @@ import { join } from 'path';
import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

// TODO(NODE-5823): Implement unified runner operations and options support for CSOT
describe.skip('CSOT spec tests', function () {
runUnifiedSuite(loadSpecTests(join('client-side-operations-timeout')));
const enabled = [
'override-collection-timeoutMS',
'override-database-timeoutMS',
'override-operation-timeoutMS'
];

const cursorOperations = [
'aggregate',
'countDocuments',
'listIndexes',
'createChangeStream',
'listCollections',
'listCollectionNames'
];

describe('CSOT spec tests', function () {
const specs = loadSpecTests(join('client-side-operations-timeout'));
for (const spec of specs) {
for (const test of spec.tests) {
// not one of the test suites listed in kickoff
if (!enabled.includes(spec.name)) test.skipReason = 'Not working yet';

// Cursor operation
if (test.operations.find(operation => cursorOperations.includes(operation.name)))
test.skipReason = 'Not working yet';

// runCommand only uses options directly passed to it
if (
test.operations.find(
operation => operation.name === 'runCommand' && operation.arguments.timeoutMS == null
)
)
test.skipReason = 'Not working yet';
}
}
runUnifiedSuite(specs);
});
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ describe('CSOT driver tests', () => {
afterEach(async () => {
await cursor?.close();
await session?.endSession();
await session.endSession();
});

it('throws an error', async () => {
Expand Down
22 changes: 6 additions & 16 deletions test/integration/node-specific/db.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,12 @@ describe('Db', function () {
});
});

it('shouldCorrectlyHandleFailedConnection', {
metadata: {
requires: { topology: ['single', 'replicaset', 'sharded'] }
},

test: function (done) {
var configuration = this.configuration;
var fs_client = configuration.newClient('mongodb://127.0.0.1:25117/test', {
serverSelectionTimeoutMS: 10
});

fs_client.connect(function (err) {
test.ok(err != null);
done();
});
}
it('should correctly handle failed connection', async function () {
const client = this.configuration.newClient('mongodb://iLoveJS', {
serverSelectionTimeoutMS: 10
});
const error = await client.connect().catch(error => error);
expect(error).to.be.instanceOf(Error);
});

it('shouldCorrectlyGetErrorDroppingNonExistingDb', {
Expand Down
Loading

0 comments on commit dca1a9c

Please sign in to comment.