Skip to content

Commit

Permalink
feat(NODE-6387): Add CSOT support to change streams (#4256)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored and dariakp committed Nov 6, 2024
1 parent e86f11e commit 4588ff2
Show file tree
Hide file tree
Showing 10 changed files with 676 additions and 127 deletions.
177 changes: 125 additions & 52 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import type { Readable } from 'stream';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
import { Db } from './db';
import {
type AnyError,
isResumableError,
MongoAPIError,
MongoChangeStreamError,
MongoOperationTimeoutError,
MongoRuntimeError
} from './error';
import { MongoClient } from './mongo_client';
Expand All @@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
import type { ServerSessionId } from './sessions';
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

/** @internal */
Expand Down Expand Up @@ -538,7 +540,12 @@ export type ChangeStreamEvents<
end(): void;
error(error: Error): void;
change(change: TChange): void;
} & AbstractCursorEvents;
/**
* @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
* instance is closed, which can occur multiple times for a given `ChangeStream` instance.
*/
close(): void;
};

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
Expand Down Expand Up @@ -609,6 +616,13 @@ export class ChangeStream<
*/
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED;

private timeoutContext?: TimeoutContext;
/**
* Note that this property is here to uniquely identify a ChangeStream instance as the owner of
* the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
* that {@link AbstractCursor.close} does not mutate the timeoutContext.
*/
private contextOwner: symbol;
/**
* @internal
*
Expand All @@ -624,20 +638,25 @@ export class ChangeStream<

this.pipeline = pipeline;
this.options = { ...options };
let serverSelectionTimeoutMS: number;
delete this.options.writeConcern;

if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
} else {
throw new MongoChangeStreamError(
'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
);
}

this.contextOwner = Symbol();
this.parent = parent;
this.namespace = parent.s.namespace;
if (!this.options.readPreference && parent.readPreference) {
Expand All @@ -662,6 +681,13 @@ export class ChangeStream<
this[kCursorStream]?.removeAllListeners('data');
}
});

if (this.options.timeoutMS != null) {
this.timeoutContext = new CSOTTimeoutContext({
timeoutMS: this.options.timeoutMS,
serverSelectionTimeoutMS
});
}
}

/** @internal */
Expand All @@ -681,22 +707,30 @@ export class ChangeStream<
// This loop continues until either a change event is received or until a resume attempt
// fails.

while (true) {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
this.timeoutContext?.refresh();
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
throw error;
}
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand All @@ -706,24 +740,32 @@ export class ChangeStream<
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
this.timeoutContext?.refresh();

while (true) {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) {
throw error;
}
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand All @@ -735,23 +777,29 @@ export class ChangeStream<
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
this.timeoutContext?.refresh();

while (true) {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error, this.cursor.id != null);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) throw error;
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand Down Expand Up @@ -784,6 +832,8 @@ export class ChangeStream<
* Frees the internal resources used by the change stream.
*/
async close(): Promise<void> {
this.timeoutContext?.clear();
this.timeoutContext = undefined;
this[kClosed] = true;

const cursor = this.cursor;
Expand Down Expand Up @@ -866,7 +916,12 @@ export class ChangeStream<
client,
this.namespace,
pipeline,
options
{
...options,
timeoutContext: this.timeoutContext
? new CursorTimeoutContext(this.timeoutContext, this.contextOwner)
: undefined
}
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down Expand Up @@ -899,8 +954,9 @@ export class ChangeStream<
} catch (error) {
this.emit(ChangeStream.ERROR, error);
}
this.timeoutContext?.refresh();
});
stream.on('error', error => this._processErrorStreamMode(error));
stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
}

/** @internal */
Expand Down Expand Up @@ -942,24 +998,30 @@ export class ChangeStream<
}

/** @internal */
private _processErrorStreamMode(changeStreamError: AnyError) {
private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) {
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) return;

if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (
cursorInitialized &&
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
changeStreamError instanceof MongoOperationTimeoutError)
) {
this._endStream();

this.cursor.close().then(undefined, squashError);

const topology = getTopology(this.parent);
topology
.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
})

this.cursor
.close()
.then(
() => this._resume(changeStreamError),
e => {
squashError(e);
return this._resume(changeStreamError);
}
)
.then(
() => {
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
if (changeStreamError instanceof MongoOperationTimeoutError)
this.emit(ChangeStream.ERROR, changeStreamError);
},
() => this._closeEmitterModeWithError(changeStreamError)
);
Expand All @@ -969,33 +1031,44 @@ export class ChangeStream<
}

/** @internal */
private async _processErrorIteratorMode(changeStreamError: AnyError) {
private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) {
if (this[kClosed]) {
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (
this.cursor.id == null ||
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
cursorInitialized &&
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
changeStreamError instanceof MongoOperationTimeoutError)
) {
try {
await this.cursor.close();
} catch (error) {
squashError(error);
}

await this._resume(changeStreamError);

if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
} else {
try {
await this.close();
} catch (error) {
squashError(error);
}

throw changeStreamError;
}
}

try {
await this.cursor.close();
} catch (error) {
squashError(error);
}
private async _resume(changeStreamError: AnyError) {
this.timeoutContext?.refresh();
const topology = getTopology(this.parent);
try {
await topology.selectServer(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
operationName: 'reconnect topology in change stream',
timeoutContext: this.timeoutContext
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
Expand Down
8 changes: 5 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
responseType?: MongoDBResponseConstructor
) {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
if (this.shouldEmitAndLogCommand) {
started = now();
Expand Down Expand Up @@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
return await Promise.race([drainEvent, timeout]);
} catch (error) {
let err = error;
if (TimeoutError.is(error)) {
throw new MongoOperationTimeoutError('Timed out at socket write');
err = new MongoOperationTimeoutError('Timed out at socket write');
this.cleanup(err);
}
throw error;
} finally {
Expand Down Expand Up @@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
}
} catch (readError) {
const err = readError;
if (TimeoutError.is(readError)) {
const error = new MongoOperationTimeoutError(
`Timed out during socket read (${readError.duration}ms)`
Expand All @@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.onError(error);
throw error;
}
throw readError;
throw err;
} finally {
this.dataEvents = null;
this.messageStream.pause();
Expand Down
Loading

0 comments on commit 4588ff2

Please sign in to comment.