Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6387): Add CSOT support to change streams #4256

Merged
merged 22 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 125 additions & 52 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import type { Readable } from 'stream';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
import { Db } from './db';
import {
type AnyError,
isResumableError,
MongoAPIError,
MongoChangeStreamError,
MongoOperationTimeoutError,
MongoRuntimeError
} from './error';
import { MongoClient } from './mongo_client';
Expand All @@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

/** @internal */
Expand Down Expand Up @@ -538,7 +540,12 @@ export type ChangeStreamEvents<
end(): void;
error(error: Error): void;
change(change: TChange): void;
} & AbstractCursorEvents;
/**
* @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
* instance is closed, which can occur multiple times for a given `ChangeStream` instance.
*/
close(): void;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
Expand Down Expand Up @@ -609,6 +616,13 @@ export class ChangeStream<
*/
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED;

private timeoutContext?: TimeoutContext;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
/**
* Note that this property is here to uniquely identify a ChangeStream instance as the owner of
* the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
* that {@link AbstractCursor.close} does not mutate the timeoutContext.
*/
private contextOwner: symbol;
/**
* @internal
*
Expand All @@ -624,20 +638,25 @@ export class ChangeStream<

this.pipeline = pipeline;
this.options = { ...options };
let serverSelectionTimeoutMS: number;
delete this.options.writeConcern;

if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
} else {
throw new MongoChangeStreamError(
'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
);
}

this.contextOwner = Symbol();
this.parent = parent;
this.namespace = parent.s.namespace;
if (!this.options.readPreference && parent.readPreference) {
Expand All @@ -662,6 +681,13 @@ export class ChangeStream<
this[kCursorStream]?.removeAllListeners('data');
}
});

if (this.options.timeoutMS != null) {
this.timeoutContext = new CSOTTimeoutContext({
timeoutMS: this.options.timeoutMS,
serverSelectionTimeoutMS
});
}
}

/** @internal */
Expand All @@ -681,22 +707,30 @@ export class ChangeStream<
// This loop continues until either a change event is received or until a resume attempt
// fails.

while (true) {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
this.timeoutContext?.refresh();
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
throw error;
}
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand All @@ -706,24 +740,32 @@ export class ChangeStream<
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
this.timeoutContext?.refresh();

while (true) {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
throw error;
}
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand All @@ -735,23 +777,29 @@ export class ChangeStream<
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
this.timeoutContext?.refresh();

while (true) {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) throw error;
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand Down Expand Up @@ -784,6 +832,8 @@ export class ChangeStream<
* Frees the internal resources used by the change stream.
*/
async close(): Promise<void> {
this.timeoutContext?.clear();
this.timeoutContext = undefined;
this[kClosed] = true;

const cursor = this.cursor;
Expand Down Expand Up @@ -866,7 +916,12 @@ export class ChangeStream<
client,
this.namespace,
pipeline,
options
{
...options,
timeoutContext: this.timeoutContext
? new CursorTimeoutContext(this.timeoutContext, this.contextOwner)
: undefined
}
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down Expand Up @@ -899,8 +954,9 @@ export class ChangeStream<
} catch (error) {
this.emit(ChangeStream.ERROR, error);
}
this.timeoutContext?.refresh();
});
stream.on('error', error => this._processErrorStreamMode(error));
stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
}

/** @internal */
Expand Down Expand Up @@ -942,24 +998,30 @@ export class ChangeStream<
}

/** @internal */
private _processErrorStreamMode(changeStreamError: AnyError) {
private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) {
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) return;

if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (
cursorInitialized &&
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
changeStreamError instanceof MongoOperationTimeoutError)
) {
this._endStream();

this.cursor.close().then(undefined, squashError);

const topology = getTopology(this.parent);
topology
.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
})

this.cursor
.close()
.then(
() => this._resume(changeStreamError),
e => {
squashError(e);
return this._resume(changeStreamError);
}
)
.then(
() => {
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
if (changeStreamError instanceof MongoOperationTimeoutError)
this.emit(ChangeStream.ERROR, changeStreamError);
},
() => this._closeEmitterModeWithError(changeStreamError)
);
Expand All @@ -969,33 +1031,44 @@ export class ChangeStream<
}

/** @internal */
private async _processErrorIteratorMode(changeStreamError: AnyError) {
private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) {
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (
this.cursor.id == null ||
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
cursorInitialized &&
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
changeStreamError instanceof MongoOperationTimeoutError)
) {
try {
await this.cursor.close();
} catch (error) {
squashError(error);
}

await this._resume(changeStreamError);

if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
} else {
try {
await this.close();
} catch (error) {
squashError(error);
}

throw changeStreamError;
}
}

try {
await this.cursor.close();
} catch (error) {
squashError(error);
}
private async _resume(changeStreamError: AnyError) {
this.timeoutContext?.refresh();
const topology = getTopology(this.parent);
try {
await topology.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
operationName: 'reconnect topology in change stream',
timeoutContext: this.timeoutContext
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
Expand Down
8 changes: 5 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
responseType?: MongoDBResponseConstructor
) {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
if (this.shouldEmitAndLogCommand) {
started = now();
Expand Down Expand Up @@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
return await Promise.race([drainEvent, timeout]);
} catch (error) {
let err = error;
if (TimeoutError.is(error)) {
throw new MongoOperationTimeoutError('Timed out at socket write');
err = new MongoOperationTimeoutError('Timed out at socket write');
this.cleanup(err);
}
throw error;
} finally {
Expand Down Expand Up @@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
}
} catch (readError) {
const err = readError;
if (TimeoutError.is(readError)) {
const error = new MongoOperationTimeoutError(
`Timed out during socket read (${readError.duration}ms)`
Expand All @@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.onError(error);
throw error;
}
throw readError;
throw err;
} finally {
this.dataEvents = null;
this.messageStream.pause();
Expand Down
Loading