From fee5ac4d6b14edb9ba55a40ad4b10a16be179bd1 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 11 Oct 2024 16:00:52 -0400 Subject: [PATCH] add CSOT support to change streams --- src/change_stream.ts | 180 +++++++++++++----- src/cmap/connection.ts | 8 +- src/cursor/abstract_cursor.ts | 43 ++--- src/cursor/change_stream_cursor.ts | 2 +- ...ient_side_operations_timeout.prose.test.ts | 12 +- ...lient_side_operations_timeout.spec.test.ts | 12 +- .../node_csot.test.ts | 180 +++++++++++++++++- .../node-specific/abstract_cursor.test.ts | 52 +++-- 8 files changed, 381 insertions(+), 108 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 34f92a4477c..6a1731a791e 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -3,7 +3,7 @@ import type { Readable } from 'stream'; import type { Binary, Document, Timestamp } from './bson'; import { Collection } from './collection'; import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants'; -import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor'; +import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor'; import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor'; import { Db } from './db'; import { @@ -11,6 +11,7 @@ import { isResumableError, MongoAPIError, MongoChangeStreamError, + MongoOperationTimeoutError, MongoRuntimeError } from './error'; import { MongoClient } from './mongo_client'; @@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command'; import type { ReadPreference } from './read_preference'; import { type AsyncDisposable, configureResourceManagement } from './resource_management'; import type { ServerSessionId } from './sessions'; +import { CSOTTimeoutContext, type TimeoutContext } from './timeout'; import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; /** @internal */ @@ -538,7 +540,13 @@ export type ChangeStreamEvents< end(): void; error(error: Error): void; change(change: TChange): void; -} & AbstractCursorEvents; + /** + * @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor` + * instance is closed, which can occur multiple times for a given `ChangeStream` instance. + * When this event is emitted is subject to change outside of major versions. + */ + close(): void; +}; /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. @@ -609,6 +617,13 @@ export class ChangeStream< */ static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED; + private timeoutContext?: TimeoutContext; + /** + * Note that this property is here to uniquely identify a ChangeStream instance as the owner of + * the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure + * that {@link AbstractCursor.close} does not mutate the timeoutContext. + */ + private contextOwner: symbol; /** * @internal * @@ -624,20 +639,25 @@ export class ChangeStream< this.pipeline = pipeline; this.options = { ...options }; + let serverSelectionTimeoutMS: number; delete this.options.writeConcern; if (parent instanceof Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; + serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS; } else if (parent instanceof Db) { this.type = CHANGE_DOMAIN_TYPES.DATABASE; + serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS; } else if (parent instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; + serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS; } else { throw new MongoChangeStreamError( 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient' ); } + this.contextOwner = Symbol(); this.parent = parent; this.namespace = parent.s.namespace; if (!this.options.readPreference && parent.readPreference) { @@ -662,6 +682,13 @@ export class ChangeStream< this[kCursorStream]?.removeAllListeners('data'); } }); + + if (this.options.timeoutMS != null) { + this.timeoutContext = new CSOTTimeoutContext({ + timeoutMS: this.options.timeoutMS, + serverSelectionTimeoutMS + }); + } } /** @internal */ @@ -681,22 +708,31 @@ export class ChangeStream< // This loop continues until either a change event is received or until a resume attempt // fails. - while (true) { - try { - const hasNext = await this.cursor.hasNext(); - return hasNext; - } catch (error) { + this.timeoutContext?.refresh(); + try { + while (true) { + const cursorInitialized = this.cursor.id != null; try { - await this._processErrorIteratorMode(error); + const hasNext = await this.cursor.hasNext(); + return hasNext; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, cursorInitialized); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && cursorInitialized) { + throw error; + } + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -706,24 +742,33 @@ export class ChangeStream< // Change streams must resume indefinitely while each resume event succeeds. // This loop continues until either a change event is received or until a resume attempt // fails. + this.timeoutContext?.refresh(); - while (true) { - try { - const change = await this.cursor.next(); - const processedChange = this._processChange(change ?? null); - return processedChange; - } catch (error) { + try { + while (true) { + const cursorInitialized = this.cursor.id != null; try { - await this._processErrorIteratorMode(error); + const change = await this.cursor.next(); + const processedChange = this._processChange(change ?? null); + return processedChange; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, cursorInitialized); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && cursorInitialized) { + throw error; + } + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -735,23 +780,30 @@ export class ChangeStream< // Change streams must resume indefinitely while each resume event succeeds. // This loop continues until either a change event is received or until a resume attempt // fails. + this.timeoutContext?.refresh(); - while (true) { - try { - const change = await this.cursor.tryNext(); - return change ?? null; - } catch (error) { + try { + while (true) { + const cursorInitialized = this.cursor.id != null; try { - await this._processErrorIteratorMode(error); + const change = await this.cursor.tryNext(); + return change ?? null; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, cursorInitialized); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && cursorInitialized) throw error; + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -784,6 +836,8 @@ export class ChangeStream< * Frees the internal resources used by the change stream. */ async close(): Promise { + this.timeoutContext?.clear(); + this.timeoutContext = undefined; this[kClosed] = true; const cursor = this.cursor; @@ -866,7 +920,12 @@ export class ChangeStream< client, this.namespace, pipeline, - options + { + ...options, + timeoutContext: this.timeoutContext + ? new CursorTimeoutContext(this.timeoutContext, this.contextOwner) + : undefined + } ); for (const event of CHANGE_STREAM_EVENTS) { @@ -900,7 +959,7 @@ export class ChangeStream< this.emit(ChangeStream.ERROR, error); } }); - stream.on('error', error => this._processErrorStreamMode(error)); + stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null)); } /** @internal */ @@ -942,24 +1001,30 @@ export class ChangeStream< } /** @internal */ - private _processErrorStreamMode(changeStreamError: AnyError) { + private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) { // If the change stream has been closed explicitly, do not process error. if (this[kClosed]) return; - if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if ( + cursorInitialized && + (isResumableError(changeStreamError, this.cursor.maxWireVersion) || + changeStreamError instanceof MongoOperationTimeoutError) + ) { this._endStream(); - this.cursor.close().then(undefined, squashError); - - const topology = getTopology(this.parent); - topology - .selectServer(this.cursor.readPreference, { - operationName: 'reconnect topology in change stream' - }) - + this.cursor + .close() + .then( + () => this._resume(changeStreamError), + e => { + squashError(e); + return this._resume(changeStreamError); + } + ) .then( () => { - this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); + if (changeStreamError instanceof MongoOperationTimeoutError) + this.emit(ChangeStream.ERROR, changeStreamError); }, () => this._closeEmitterModeWithError(changeStreamError) ); @@ -969,33 +1034,44 @@ export class ChangeStream< } /** @internal */ - private async _processErrorIteratorMode(changeStreamError: AnyError) { + private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } if ( - this.cursor.id == null || - !isResumableError(changeStreamError, this.cursor.maxWireVersion) + cursorInitialized && + (isResumableError(changeStreamError, this.cursor.maxWireVersion) || + changeStreamError instanceof MongoOperationTimeoutError) ) { + try { + await this.cursor.close(); + } catch (error) { + squashError(error); + } + + await this._resume(changeStreamError); + + if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError; + } else { try { await this.close(); } catch (error) { squashError(error); } + throw changeStreamError; } + } - try { - await this.cursor.close(); - } catch (error) { - squashError(error); - } + private async _resume(changeStreamError: AnyError) { + this.timeoutContext?.refresh(); const topology = getTopology(this.parent); try { await topology.selectServer(this.cursor.readPreference, { - operationName: 'reconnect topology in change stream' + operationName: 'reconnect topology in change stream', + timeoutContext: this.timeoutContext }); this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); } catch { diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 6b1d3c24171..ca7c86a0bad 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter { responseType?: MongoDBResponseConstructor ) { const message = this.prepareCommand(ns.db, command, options); - let started = 0; if (this.shouldEmitAndLogCommand) { started = now(); @@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter { try { return await Promise.race([drainEvent, timeout]); } catch (error) { + let err = error; if (TimeoutError.is(error)) { - throw new MongoOperationTimeoutError('Timed out at socket write'); + err = new MongoOperationTimeoutError('Timed out at socket write'); + this.cleanup(err); } throw error; } finally { @@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter { } } } catch (readError) { + const err = readError; if (TimeoutError.is(readError)) { const error = new MongoOperationTimeoutError( `Timed out during socket read (${readError.duration}ms)` @@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter { this.onError(error); throw error; } - throw readError; + throw err; } finally { this.dataEvents = null; this.messageStream.pause(); diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 96d28d05584..077e82f9c13 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -170,7 +170,11 @@ export abstract class AbstractCursor< private cursorClient: MongoClient; /** @internal */ private transform?: (doc: TSchema) => any; - /** @internal */ + /** + * @internal + * This is true whether or not the first command fails. It only indicates whether or not the first + * command has been run. + */ private initialized: boolean; /** @internal */ private isClosed: boolean; @@ -210,15 +214,16 @@ export abstract class AbstractCursor< ? options.readPreference : ReadPreference.primary, ...pluckBSONSerializeOptions(options), - timeoutMS: options.timeoutMS, + timeoutMS: options?.timeoutContext?.csotEnabled() + ? options.timeoutContext.timeoutMS + : options.timeoutMS, tailable: options.tailable, awaitData: options.awaitData }; + if (this.cursorOptions.timeoutMS != null) { if (options.timeoutMode == null) { if (options.tailable) { - this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; - if (options.awaitData) { if ( options.maxAwaitTimeMS != null && @@ -228,22 +233,21 @@ export abstract class AbstractCursor< 'Cannot specify maxAwaitTimeMS >= timeoutMS for a tailable awaitData cursor' ); } + + this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; } else { this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME; } } else { - if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { + if (options.tailable && options.timeoutMode === CursorTimeoutMode.LIFETIME) { throw new MongoInvalidArgumentError( "Cannot set tailable cursor's timeoutMode to LIFETIME" ); } this.cursorOptions.timeoutMode = options.timeoutMode; } - this.cursorOptions.timeoutMode = - options.timeoutMode ?? - (options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME); } else { - if (options.timeoutMode != null && options.timeoutContext == null) + if (options.timeoutMode != null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); } @@ -290,15 +294,6 @@ export abstract class AbstractCursor< } }; - if ( - options.timeoutContext != null && - options.timeoutMS != null && - this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME - ) { - throw new MongoAPIError( - `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.` - ); - } this.timeoutContext = options.timeoutContext; } @@ -489,7 +484,7 @@ export abstract class AbstractCursor< await this.fetchBatch(); } while (!this.isDead || (this.documents?.length ?? 0) !== 0); } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -502,6 +497,7 @@ export abstract class AbstractCursor< if (this.cursorId === Long.ZERO) { throw new MongoCursorExhaustedError(); } + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { this.timeoutContext?.refresh(); } @@ -516,7 +512,7 @@ export abstract class AbstractCursor< await this.fetchBatch(); } while (!this.isDead || (this.documents?.length ?? 0) !== 0); } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -550,7 +546,7 @@ export abstract class AbstractCursor< return doc; } } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -886,7 +882,6 @@ export abstract class AbstractCursor< // otherwise need to call getMore const batchSize = this.cursorOptions.batchSize || 1000; - this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null; try { const response = await this.getMore(batchSize); @@ -1130,10 +1125,10 @@ export class CursorTimeoutContext extends TimeoutContext { return this.timeoutContext.csotEnabled(); } override refresh(): void { - return this.timeoutContext.refresh(); + if (typeof this.owner !== 'symbol') return this.timeoutContext.refresh(); } override clear(): void { - return this.timeoutContext.clear(); + if (typeof this.owner !== 'symbol') return this.timeoutContext.clear(); } override get maxTimeMS(): number | null { return this.timeoutContext.maxTimeMS; diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts index 13f58675552..73a256cdeea 100644 --- a/src/cursor/change_stream_cursor.ts +++ b/src/cursor/change_stream_cursor.ts @@ -55,7 +55,7 @@ export class ChangeStreamCursor< pipeline: Document[] = [], options: ChangeStreamCursorOptions = {} ) { - super(client, namespace, options); + super(client, namespace, { ...options, tailable: true, awaitData: true }); this.pipeline = pipeline; this.changeStreamCursorOptions = options; diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 146a2585c52..e3d7333d13c 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -436,11 +436,17 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a timeout error. * 1. Verify that an `aggregate` command and two `getMore` commands were executed against the `db.coll` collection during the test. */ - it.skip('sends correct number of aggregate and getMores', metadata, async function () { + it('sends correct number of aggregate and getMores', metadata, async function () { + // NOTE: we don't check for a non-zero ID since we lazily send the initial aggregate to the + // server. See ChangeStreamCursor._initialize const changeStream = client .db('db') .collection('coll') - .watch([], { timeoutMS: 20, maxAwaitTimeMS: 19 }); + .watch([], { timeoutMS: 120, maxAwaitTimeMS: 10 }); + + // @ts-expect-error private method + await changeStream.cursor.cursorInit(); + const maybeError = await changeStream.next().then( () => null, e => e @@ -455,7 +461,7 @@ describe('CSOT spec prose tests', function () { expect(aggregates).to.have.lengthOf(1); // Expect 2 getMores expect(getMores).to.have.lengthOf(2); - }).skipReason = 'TODO(NODE-6387)'; + }); }); }); diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index c519da8039f..b1a7ac8b62a 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -5,11 +5,7 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; const skippedSpecs = { - 'change-streams': 'TODO(NODE-6035)', - 'convenient-transactions': 'TODO(NODE-5687)', - 'deprecated-options': 'TODO(NODE-5689)', - 'tailable-awaitData': 'TODO(NODE-6035)', - 'tailable-non-awaitData': 'TODO(NODE-6035)' + 'deprecated-options': 'TODO(NODE-5689)' }; const skippedTests = { @@ -26,11 +22,9 @@ const skippedTests = { 'TODO(DRIVERS-2965)', 'maxTimeMS value in the command is less than timeoutMS': 'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs', - 'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure': - 'TODO(DRIVERS-2965)', - 'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(DRIVERS-2965)', 'timeoutMS is refreshed for getMore - failure': - 'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs' // Skipping for both tailable awaitData and tailable non-awaitData cursors + 'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs', // Skipping for both tailable awaitData and tailable non-awaitData cursors + 'timeoutMS applies to full resume attempt in a next call': 'TODO(DRIVERS-3006)' }; describe('CSOT spec tests', function () { diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 12b380d8f1a..9bca114ddd4 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -1,5 +1,5 @@ /* Anything javascript specific relating to timeouts */ -import { once } from 'node:events'; +import { on, once } from 'node:events'; import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { setTimeout } from 'node:timers/promises'; @@ -10,6 +10,7 @@ import * as sinon from 'sinon'; import { BSON, + type ChangeStream, type ClientSession, type Collection, type CommandFailedEvent, @@ -815,6 +816,183 @@ describe('CSOT driver tests', metadata, () => { }); }); + describe('Change Streams', function () { + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: '!single' } }; + let internalClient: MongoClient; + let client: MongoClient; + let commandsStarted: CommandStartedEvent[]; + + beforeEach(async function () { + internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .dropCollection('coll') + .catch(() => null); + await internalClient.db('db').collection('coll').insertOne({ x: 0 }); + commandsStarted = []; + + client = await this.configuration.newClient(undefined, { monitorCommands: true }).connect(); + client.on('commandStarted', ev => { + commandsStarted.push(ev); + }); + }); + + afterEach(async function () { + await internalClient + .db() + .admin() + ?.command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient?.close(); + await client?.close(); + }); + + context('when in stream mode', function () { + let data: any[]; + let cs: ChangeStream; + let errorIter: AsyncIterableIterator; + + afterEach(async function () { + await cs?.close(); + }); + + context('when the initial aggregate times out', function () { + beforeEach(async function () { + data = []; + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, // fail twice to account for executeOperation's retry attempt + data: { + failCommands: ['aggregate'], + blockConnection: true, + blockTimeMS: 130 + } + }; + + await internalClient.db().admin().command(failpoint); + cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); + errorIter = on(cs, 'error'); + cs.on('change', () => { + // Add empty listener just to get the change stream running + }); + }); + + it('emits an error event', metadata, async function () { + const err = (await errorIter.next()).value[0]; + + expect(data).to.have.lengthOf(0); + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it('closes the change stream', metadata, async function () { + const err = (await errorIter.next()).value[0]; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + expect(cs.closed).to.be.true; + }); + }); + + context('when the getMore times out', function () { + beforeEach(async function () { + data = []; + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + blockConnection: true, + blockTimeMS: 130 + } + }; + + await internalClient.db().admin().command(failpoint); + cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); + errorIter = on(cs, 'error'); + cs.on('change', () => { + // Add empty listener just to get the change stream running + }); + }); + + it('emits an error event', metadata, async function () { + const [err] = (await errorIter.next()).value; + expect(data).to.have.lengthOf(0); + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it('continues emitting change events', metadata, async function () { + const err = (await errorIter.next()).value[0]; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + + await once(cs, 'resumeTokenChanged'); + + await client.db('db').collection('coll').insertOne({ x: 1 }); + + const [change] = await once(cs, 'change'); + expect(change).to.have.ownProperty('operationType', 'insert'); + }); + + it('does not close the change stream', metadata, async function () { + const [err] = (await errorIter.next()).value; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + + expect(cs.closed).to.be.false; + }); + + it('attempts to create a new change stream cursor', metadata, async function () { + await errorIter.next(); + let aggregates = commandsStarted + .filter(x => x.commandName === 'aggregate') + .map(x => x.command); + expect(aggregates).to.have.lengthOf(1); + + await once(cs, 'resumeTokenChanged'); + + aggregates = commandsStarted + .filter(x => x.commandName === 'aggregate') + .map(x => x.command); + + expect(aggregates).to.have.lengthOf(2); + + expect(aggregates[0].pipeline).to.deep.equal([{ $changeStream: {} }]); + expect(aggregates[1].pipeline).to.deep.equal([ + { $changeStream: { resumeAfter: cs.resumeToken } } + ]); + }); + }); + + context('when the resume attempt times out', function () { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 2 }, // timeout the getMore, and the aggregate + data: { + failCommands: ['getMore', 'aggregate'], + blockConnection: true, + blockTimeMS: 130 + } + }; + + beforeEach(async function () { + cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); + const _changePromise = once(cs, 'change'); + await once(cs.cursor, 'init'); + + await internalClient.db().admin().command(failpoint); + }); + + it('emits an error event', async function () { + let [err] = await once(cs, 'error'); // getMore failure + expect(err).to.be.instanceof(MongoOperationTimeoutError); + [err] = await once(cs, 'error'); // aggregate failure + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it('closes the change stream', async function () { + await once(cs, 'error'); // await the getMore Failure + await once(cs, 'error'); // await the aggregate failure + expect(cs.closed).to.be.true; + }); + }); + }); + }); + describe('GridFSBucket', () => { const blockTimeMS = 200; let internalClient: MongoClient; diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 8e154e1dc3e..ac060c9d459 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -8,6 +8,7 @@ import { AbstractCursor, type Collection, type CommandStartedEvent, + CSOTTimeoutContext, CursorTimeoutContext, CursorTimeoutMode, type FindCursor, @@ -408,37 +409,58 @@ describe('class AbstractCursor', function () { let collection: Collection; let context: CursorTimeoutContext; const commands: CommandStartedEvent[] = []; + let internalContext: TimeoutContext; beforeEach(async function () { client = this.configuration.newClient({}, { monitorCommands: true }); client.on('commandStarted', filterForCommands('killCursors', commands)); collection = client.db('abstract_cursor_integration').collection('test'); + internalContext = TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }); - context = new CursorTimeoutContext( - TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }), - Symbol() - ); + context = new CursorTimeoutContext(internalContext, Symbol()); await collection.insertMany([{ a: 1 }, { b: 2 }, { c: 3 }]); }); afterEach(async function () { + sinon.restore(); await collection.deleteMany({}); await client.close(); }); - describe('when timeoutMode != LIFETIME', function () { - it('an error is thrown', function () { - expect(() => - collection.find( - {}, - { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.ITERATION } - ) - ).to.throw( - `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME` - ); - }); + it('CursorTimeoutMode.refresh is a no-op', async function () { + const cursorTimeoutRefreshSpy = sinon.spy(CursorTimeoutContext.prototype, 'refresh'); + const csotTimeoutContextRefreshSpy = sinon.spy(CSOTTimeoutContext.prototype, 'refresh'); + const abstractCursorGetMoreSpy = sinon.spy(AbstractCursor.prototype, 'getMore'); + + const cursor = collection.find( + {}, + { timeoutMode: CursorTimeoutMode.ITERATION, timeoutContext: context, batchSize: 1 } + ); + await cursor.toArray(); + + expect(abstractCursorGetMoreSpy).to.have.been.calledThrice; + + expect(cursorTimeoutRefreshSpy.getCalls()).to.have.length(3); + expect(csotTimeoutContextRefreshSpy).to.not.have.been.called; + }); + + it('CursorTimeoutMode.clear is a no-op', async function () { + const cursorTimeoutClearSpy = sinon.spy(CursorTimeoutContext.prototype, 'clear'); + const csotTimeoutContextRefreshSpy = sinon.spy(CSOTTimeoutContext.prototype, 'clear'); + const abstractCursorGetMoreSpy = sinon.spy(AbstractCursor.prototype, 'getMore'); + + const cursor = collection.find( + {}, + { timeoutMode: CursorTimeoutMode.ITERATION, timeoutContext: context, batchSize: 1 } + ); + await cursor.toArray(); + + expect(abstractCursorGetMoreSpy).to.have.been.calledThrice; + + expect(cursorTimeoutClearSpy.getCalls()).to.have.length(4); + expect(csotTimeoutContextRefreshSpy).to.not.have.been.called; }); describe('when timeoutMode is omitted', function () {