Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6304): Add CSOT support for non-tailable cursors #4195

Merged
merged 53 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
78eb356
add timeoutMode to find, findOne, listIndexes, aggregate, listCollect…
W-A-James Aug 14, 2024
a22784d
unskip spec tests
W-A-James Aug 14, 2024
6c423e0
implement prose tests
W-A-James Aug 14, 2024
978d8dd
Add timeoutMode to abstract cursor and add validation
W-A-James Aug 14, 2024
5317378
Add $out and $merge validation
W-A-James Aug 14, 2024
7c459e0
start csot cursor implementation
W-A-James Aug 15, 2024
0a2e4ce
WIP: int testing
W-A-James Aug 15, 2024
ac97b5a
WIP
W-A-James Aug 16, 2024
02f835e
fix test
W-A-James Aug 16, 2024
1e65e25
correctly propagate timeoutMode
W-A-James Aug 16, 2024
0bc7793
rework test skip logic
W-A-James Aug 16, 2024
f83d3ef
wip
W-A-James Aug 20, 2024
9549fd4
prose tests WIP
W-A-James Aug 20, 2024
173796b
put CSOT options on runCursorCommand
W-A-James Aug 20, 2024
9fc4c4e
finish up int tests
W-A-James Aug 20, 2024
507af55
timeoutContext refresh fix
W-A-James Aug 20, 2024
1f27040
apply CSOT to hasNext and tryNext
W-A-James Aug 20, 2024
af02eb8
sync spec tests
W-A-James Aug 21, 2024
679c02c
add omitMaxTimeMS
W-A-James Aug 21, 2024
abc1cfd
skipping spec tests
W-A-James Aug 21, 2024
b64af7b
pass through timeoutMS arg to cursor.close
W-A-James Aug 21, 2024
628eb55
Fix tests and dropindexes
W-A-James Aug 23, 2024
e90e7ed
add catch
W-A-James Aug 23, 2024
cbbd7e7
ensure that CSOT tests only run against 4.4 or greater
W-A-James Aug 26, 2024
fd43e60
lint
W-A-James Aug 26, 2024
bcb6e70
ignore
W-A-James Aug 26, 2024
2ac4b0f
update export test
W-A-James Aug 26, 2024
ba0dc00
remove TODO
W-A-James Aug 26, 2024
1fb6c7d
use cursorOptions instead of CursorInitializeOptions
W-A-James Aug 28, 2024
343829f
remove CursorInitializeOptions and fix export
W-A-James Aug 28, 2024
441f71d
Review fixes
W-A-James Aug 28, 2024
a9b4f02
fix setting of omitMaxTimeMS opts
W-A-James Aug 28, 2024
041b202
ensure that timeoutMS is provided to cursor.close in UTR
W-A-James Aug 28, 2024
cff3c3c
add modified spec test
W-A-James Aug 28, 2024
0488d50
remove redundant optional chaining
W-A-James Aug 28, 2024
4a6607f
remove cursorTimeoutMode from timeoutContext options
W-A-James Aug 28, 2024
96ecddc
remove types from expected exports
W-A-James Aug 28, 2024
504eec2
remove cursorTimeoutMode from TimeoutContext.create call
W-A-James Aug 29, 2024
cb810e2
review comments
W-A-James Sep 3, 2024
54286ae
lint
W-A-James Sep 3, 2024
9114290
type annotation and bracket reorganization
W-A-James Sep 3, 2024
a47337b
skip flaky test and replace with more consistent one
W-A-James Sep 4, 2024
3236c83
spec test fixes
W-A-James Sep 4, 2024
743b55f
lint
W-A-James Sep 4, 2024
12dc4bf
Merge branch 'NODE-6090' into NODE-6304
W-A-James Sep 9, 2024
122487f
eslint
W-A-James Sep 9, 2024
6a7e900
bump timeoutMS
W-A-James Sep 9, 2024
0e14e98
bump timeoutMS and failpoint blockTimeMS
W-A-James Sep 9, 2024
c1572b1
lint
W-A-James Sep 9, 2024
1cbfbeb
chore: bump timeout avg
nbbeeken Sep 10, 2024
829d4fe
WIP
W-A-James Sep 11, 2024
5318659
review comments
W-A-James Sep 11, 2024
c48ca9f
lint
W-A-James Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export interface CommandOptions extends BSONSerializeOptions {
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;
omitMaxTimeMS?: boolean;

// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
// from executeOperation that the txnNum should be applied to this command.
Expand Down Expand Up @@ -418,7 +419,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (options.timeoutContext?.csotEnabled()) {
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}
Expand Down Expand Up @@ -614,7 +615,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
for await (const document of this.sendCommand(ns, command, options, responseType)) {
if (options.timeoutContext?.csotEnabled()) {
if (MongoDBResponse.is(document)) {
// TODO(NODE-5684): test coverage to be added once cursors are enabling CSOT
if (document.isMaxTimeExpiredError) {
throw new MongoOperationTimeoutError('Server reported a timeout error', {
cause: new MongoServerError(document.toObject())
Expand Down
6 changes: 4 additions & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
type ListSearchIndexesOptions
} from './cursor/list_search_indexes_cursor';
import type { Db } from './db';
import { MongoInvalidArgumentError } from './error';
import { MongoInvalidArgumentError, MongoOperationTimeoutError } from './error';
import type { MongoClient, PkFactory } from './mongo_client';
import type {
Filter,
Expand Down Expand Up @@ -677,7 +677,9 @@ export class Collection<TSchema extends Document = Document> {
new DropIndexOperation(this as TODO_NODE_3286, '*', resolveOptions(this, options))
);
return true;
} catch {
} catch (error) {
if (error instanceof MongoOperationTimeoutError) throw error; // TODO: Check the spec for index management behaviour/file a drivers ticket for this
// Seems like we should throw all errors
return false;
}
}
Expand Down
146 changes: 118 additions & 28 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -59,6 +60,17 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

/** @public*/
export const CursorTimeoutMode = Object.freeze({
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
ITERATION: 'iteration',
LIFETIME: 'cursorLifetime'
} as const);

/** @public
* TODO(NODE-5688): Document and release
* */
export type CursorTimeoutMode = (typeof CursorTimeoutMode)[keyof typeof CursorTimeoutMode];

/** @public */
export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
Expand Down Expand Up @@ -104,6 +116,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
noCursorTimeout?: boolean;
/** @internal TODO(NODE-5688): make this public */
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;
}

/** @internal */
Expand All @@ -116,6 +130,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
oplogReplay?: boolean;
exhaust?: boolean;
partial?: boolean;

omitMaxTimeMS?: boolean;
};

/** @public */
Expand Down Expand Up @@ -153,6 +169,8 @@ export abstract class AbstractCursor<
private isKilled: boolean;
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -182,6 +200,30 @@ export abstract class AbstractCursor<
...pluckBSONSerializeOptions(options)
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
}
} else {
if (options.timeoutMode != null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}
this.cursorOptions.omitMaxTimeMS =
this.cursorOptions.timeoutMS != null &&
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
!this.cursorOptions.tailable) ||
(this.cursorOptions.tailable && !this.cursorOptions.awaitData));

const readConcern = ReadConcern.fromOptions(options);
if (readConcern) {
Expand Down Expand Up @@ -389,12 +431,21 @@ export abstract class AbstractCursor<
return false;
}

do {
if ((this.documents?.length ?? 0) !== 0) {
return true;
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}
try {
do {
if ((this.documents?.length ?? 0) !== 0) {
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
}

return false;
}
Expand All @@ -404,15 +455,24 @@ export abstract class AbstractCursor<
if (this.cursorId === Long.ZERO) {
throw new MongoCursorExhaustedError();
}
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}

do {
const doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
try {
do {
const doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
}

return null;
}
Expand All @@ -425,18 +485,27 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

let doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}
try {
let doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}

await this.fetchBatch();
await this.fetchBatch();

doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
doc = this.documents?.shift(this.cursorOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
}

return null;
Expand Down Expand Up @@ -465,8 +534,8 @@ export abstract class AbstractCursor<
/**
* Frees any client-side resources used by the cursor.
*/
async close(): Promise<void> {
await this.cleanup();
async close(options?: { timeoutMS?: number }): Promise<void> {
await this.cleanup(options?.timeoutMS);
}

/**
Expand Down Expand Up @@ -647,6 +716,8 @@ export abstract class AbstractCursor<

this.cursorId = null;
this.documents?.clear();
this.timeoutContext?.clear();
this.timeoutContext = undefined;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
this.isClosed = false;
this.isKilled = false;
this.initialized = false;
Expand Down Expand Up @@ -697,7 +768,7 @@ export abstract class AbstractCursor<
}
);

return await executeOperation(this.cursorClient, getMoreOperation);
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
}

/**
Expand All @@ -708,6 +779,12 @@ export abstract class AbstractCursor<
* a significant refactor.
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
}
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
try {
const state = await this._initialize(this.cursorSession);
const response = state.response;
Expand All @@ -719,7 +796,7 @@ export abstract class AbstractCursor<
} catch (error) {
// the cursor is now initialized, even if an error occurred
this.initialized = true;
await this.cleanup(error);
await this.cleanup(undefined, error);
throw error;
}

Expand Down Expand Up @@ -753,14 +830,15 @@ export abstract class AbstractCursor<

// otherwise need to call getMore
const batchSize = this.cursorOptions.batchSize || 1000;
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;

try {
const response = await this.getMore(batchSize);
this.cursorId = response.id;
this.documents = response;
} catch (error) {
try {
await this.cleanup(error);
await this.cleanup(undefined, error);
} catch (error) {
// `cleanupCursor` should never throw, squash and throw the original error
squashError(error);
Expand All @@ -781,7 +859,7 @@ export abstract class AbstractCursor<
}

/** @internal */
private async cleanup(error?: Error) {
private async cleanup(timeoutMS?: number, error?: Error) {
this.isClosed = true;
const session = this.cursorSession;
try {
Expand All @@ -796,11 +874,23 @@ export abstract class AbstractCursor<
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
let timeoutContext: TimeoutContext | undefined;
if (timeoutMS != null) {
this.timeoutContext?.clear();
timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
});
} else {
this.timeoutContext?.refresh();
timeoutContext = this.timeoutContext;
}
await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
})
}),
timeoutContext
);
}
} catch (error) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
21 changes: 19 additions & 2 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Document } from '../bson';
import { MongoAPIError } from '../error';
import type { ExplainVerbosityLike } from '../explain';
import type { MongoClient } from '../mongo_client';
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
Expand All @@ -8,7 +9,7 @@ import type { Sort } from '../sort';
import type { MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions, InitialCursorResponse } from './abstract_cursor';
import { AbstractCursor } from './abstract_cursor';
import { AbstractCursor, CursorTimeoutMode } from './abstract_cursor';

/** @public */
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
Expand Down Expand Up @@ -36,6 +37,15 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {

this.pipeline = pipeline;
this.aggregateOptions = options;

const lastStage: Document | undefined = this.pipeline[this.pipeline.length - 1];

if (
this.cursorOptions.timeoutMS != null &&
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
(lastStage?.$merge != null || lastStage?.$out != null)
)
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
}

clone(): AggregationCursor<TSchema> {
Expand All @@ -58,7 +68,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
session
});

const response = await executeOperation(this.client, aggregateOperation);
const response = await executeOperation(this.client, aggregateOperation, this.timeoutContext);

return { server: aggregateOperation.server, session, response };
}
Expand Down Expand Up @@ -93,6 +103,13 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
addStage<T = Document>(stage: Document): AggregationCursor<T>;
addStage<T = Document>(stage: Document): AggregationCursor<T> {
this.throwIfInitialized();
if (
this.cursorOptions.timeoutMS != null &&
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
(stage.$out != null || stage.$merge != null)
) {
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
}
this.pipeline.push(stage);
return this as unknown as AggregationCursor<T>;
}
Expand Down
6 changes: 5 additions & 1 deletion src/cursor/change_stream_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ export class ChangeStreamCursor<
session
});

const response = await executeOperation(session.client, aggregateOperation);
const response = await executeOperation(
session.client,
aggregateOperation,
this.timeoutContext
);

const server = aggregateOperation.server;
this.maxWireVersion = maxWireVersion(server);
Expand Down
Loading