Skip to content

Commit

Permalink
refactor(NODE-6411): AbstractCursor accepts an external timeout conte…
Browse files Browse the repository at this point in the history
…xt (#4264)
  • Loading branch information
baileympearson authored and dariakp committed Nov 6, 2024
1 parent 3cb8187 commit 3980489
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 75 deletions.
6 changes: 3 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
if (!options.omitMaxTimeMS) {
const maxTimeMS = options.timeoutContext?.maxTimeMS;
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
Expand Down
134 changes: 103 additions & 31 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;

/**
* @internal
*
* A timeout context to govern the total time the cursor can live. If provided, the cursor
* cannot be used in ITERATION mode.
*/
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -171,7 +179,7 @@ export abstract class AbstractCursor<
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;
protected timeoutContext?: CursorTimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -205,20 +213,12 @@ export abstract class AbstractCursor<
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
}
this.cursorOptions.timeoutMode =
options.timeoutMode ??
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
} else {
if (options.timeoutMode != null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
Expand Down Expand Up @@ -264,6 +264,17 @@ export abstract class AbstractCursor<
utf8: options?.enableUtf8Validation === false ? false : true
}
};

if (
options.timeoutContext != null &&
options.timeoutMS != null &&
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
) {
throw new MongoAPIError(
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
);
}
this.timeoutContext = options.timeoutContext;
}

/**
Expand Down Expand Up @@ -721,6 +732,9 @@ export abstract class AbstractCursor<
* if the resultant data has already been retrieved by this cursor.
*/
rewind(): void {
if (this.timeoutContext && this.timeoutContext.owner !== this) {
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
}
if (!this.initialized) {
return;
}
Expand Down Expand Up @@ -790,10 +804,13 @@ export abstract class AbstractCursor<
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
this.timeoutContext ??= new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
}),
this
);
}
try {
const state = await this._initialize(this.cursorSession);
Expand Down Expand Up @@ -872,6 +889,20 @@ export abstract class AbstractCursor<
private async cleanup(timeoutMS?: number, error?: Error) {
this.isClosed = true;
const session = this.cursorSession;
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
if (timeoutMS != null) {
this.timeoutContext?.clear();
return new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
}),
this
);
} else {
return this.timeoutContext?.refreshed();
}
};
try {
if (
!this.isKilled &&
Expand All @@ -884,23 +915,13 @@ export abstract class AbstractCursor<
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
let timeoutContext: TimeoutContext | undefined;
if (timeoutMS != null) {
this.timeoutContext?.clear();
timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
});
} else {
this.timeoutContext?.refresh();
timeoutContext = this.timeoutContext;
}

await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
}),
timeoutContext
timeoutContextForKillCursors()
);
}
} catch (error) {
Expand Down Expand Up @@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
}

configureResourceManagement(AbstractCursor.prototype);

/**
* @internal
* The cursor timeout context is a wrapper around a timeout context
* that keeps track of the "owner" of the cursor. For timeout contexts
* instantiated inside a cursor, the owner will be the cursor.
*
* All timeout behavior is exactly the same as the wrapped timeout context's.
*/
export class CursorTimeoutContext extends TimeoutContext {
constructor(
public timeoutContext: TimeoutContext,
public owner: symbol | AbstractCursor
) {
super();
}
override get serverSelectionTimeout(): Timeout | null {
return this.timeoutContext.serverSelectionTimeout;
}
override get connectionCheckoutTimeout(): Timeout | null {
return this.timeoutContext.connectionCheckoutTimeout;
}
override get clearServerSelectionTimeout(): boolean {
return this.timeoutContext.clearServerSelectionTimeout;
}
override get clearConnectionCheckoutTimeout(): boolean {
return this.timeoutContext.clearConnectionCheckoutTimeout;
}
override get timeoutForSocketWrite(): Timeout | null {
return this.timeoutContext.timeoutForSocketWrite;
}
override get timeoutForSocketRead(): Timeout | null {
return this.timeoutContext.timeoutForSocketRead;
}
override csotEnabled(): this is CSOTTimeoutContext {
return this.timeoutContext.csotEnabled();
}
override refresh(): void {
return this.timeoutContext.refresh();
}
override clear(): void {
return this.timeoutContext.clear();
}
override get maxTimeMS(): number | null {
return this.timeoutContext.maxTimeMS;
}

override refreshed(): CursorTimeoutContext {
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ export type {
CursorStreamOptions
} from './cursor/abstract_cursor';
export type {
CursorTimeoutContext,
InitialCursorResponse,
InternalAbstractCursorOptions
} from './cursor/abstract_cursor';
Expand Down
5 changes: 3 additions & 2 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { type ExplainOptions } from '../explain';
import { ReadConcern } from '../read_concern';
Expand All @@ -18,7 +18,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface FindOptions<TSchema extends Document = Document>
extends Omit<CommandOperationOptions, 'writeConcern' | 'explain'> {
extends Omit<CommandOperationOptions, 'writeConcern' | 'explain'>,
AbstractCursorOptions {
/** Sets the limit of documents returned in the query. */
limit?: number;
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */
Expand Down
17 changes: 17 additions & 0 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ export abstract class TimeoutContext {
else throw new MongoRuntimeError('Unrecognized options');
}

abstract get maxTimeMS(): number | null;

abstract get serverSelectionTimeout(): Timeout | null;

abstract get connectionCheckoutTimeout(): Timeout | null;
Expand All @@ -195,6 +197,9 @@ export abstract class TimeoutContext {
abstract refresh(): void;

abstract clear(): void;

/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
abstract refreshed(): TimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`);
return remainingTimeMS;
}

override refreshed(): CSOTTimeoutContext {
return new CSOTTimeoutContext(this);
}
}

/** @internal */
Expand Down Expand Up @@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
clear(): void {
return;
}

get maxTimeMS() {
return null;
}

override refreshed(): LegacyTimeoutContext {
return new LegacyTimeoutContext(this.options);
}
}
18 changes: 14 additions & 4 deletions test/integration/client-side-operations-timeout/node_csot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
MongoServerError,
ObjectId
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';
import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils';

const metadata = { requires: { mongodb: '>=4.4' } };

Expand Down Expand Up @@ -362,7 +362,7 @@ describe('CSOT driver tests', metadata, () => {
};

beforeEach(async function () {
internalClient = this.configuration.newClient();
internalClient = this.configuration.newClient({});
await internalClient
.db('db')
.dropCollection('coll')
Expand All @@ -378,7 +378,11 @@ describe('CSOT driver tests', metadata, () => {

await internalClient.db().admin().command(failpoint);

client = this.configuration.newClient(undefined, { monitorCommands: true });
client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 });

// wait for a handful of connections to have been established
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);

commandStarted = [];
commandSucceeded = [];
client.on('commandStarted', ev => commandStarted.push(ev));
Expand Down Expand Up @@ -492,7 +496,13 @@ describe('CSOT driver tests', metadata, () => {

await internalClient.db().admin().command(failpoint);

client = this.configuration.newClient(undefined, { monitorCommands: true });
client = this.configuration.newClient(undefined, {
monitorCommands: true,
minPoolSize: 10
});
// wait for a handful of connections to have been established
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);

commandStarted = [];
commandSucceeded = [];
client.on('commandStarted', ev => commandStarted.push(ev));
Expand Down
Loading

0 comments on commit 3980489

Please sign in to comment.