From 2398fc6b4a13b6c615315df2a1b8b8ffb737d8c3 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 7 Oct 2024 13:07:46 -0400 Subject: [PATCH] feat(NODE-6305): Add CSOT support to tailable cursors (#4218) Co-authored-by: Neal Beeken --- src/cursor/abstract_cursor.ts | 51 +++- src/cursor/run_command_cursor.ts | 2 + src/mongo_client.ts | 5 + src/operations/create_collection.ts | 1 + test/benchmarks/driverBench/common.js | 4 +- ...ient_side_operations_timeout.prose.test.ts | 40 ++-- ...lient_side_operations_timeout.spec.test.ts | 7 +- .../node_csot.test.ts | 221 +++++++++++++++++- .../tailable-awaitData.json | 146 ++++++++++++ .../tailable-non-awaitData.json | 151 ++++++++++++ test/tools/unified-spec-runner/operations.ts | 45 +++- 11 files changed, 641 insertions(+), 32 deletions(-) create mode 100644 test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json create mode 100644 test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-non-awaitData.json diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index f7e488d24b2..255a977a5f9 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -209,12 +209,35 @@ export abstract class AbstractCursor< options.readPreference && options.readPreference instanceof ReadPreference ? options.readPreference : ReadPreference.primary, - ...pluckBSONSerializeOptions(options) + ...pluckBSONSerializeOptions(options), + timeoutMS: options.timeoutMS, + tailable: options.tailable, + awaitData: options.awaitData }; - this.cursorOptions.timeoutMS = options.timeoutMS; if (this.cursorOptions.timeoutMS != null) { - if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { - throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME"); + if (options.timeoutMode == null) { + if (options.tailable) { + this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; + + if (options.awaitData) { + if ( + options.maxAwaitTimeMS != null && + options.maxAwaitTimeMS >= this.cursorOptions.timeoutMS + ) + throw new MongoInvalidArgumentError( + 'Cannot specify maxAwaitTimeMS >= timeoutMS for a tailable awaitData cursor' + ); + } + } else { + this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME; + } + } else { + if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { + throw new MongoInvalidArgumentError( + "Cannot set tailable cursor's timeoutMode to LIFETIME" + ); + } + this.cursorOptions.timeoutMode = options.timeoutMode; } this.cursorOptions.timeoutMode = options.timeoutMode ?? @@ -223,6 +246,8 @@ export abstract class AbstractCursor< if (options.timeoutMode != null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); } + + // Set for initial command this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null && ((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && @@ -781,15 +806,17 @@ export abstract class AbstractCursor< 'Unexpected null selectedServer. A cursor creating command should have set this' ); } + const getMoreOptions = { + ...this.cursorOptions, + session: this.cursorSession, + batchSize + }; + const getMoreOperation = new GetMoreOperation( this.cursorNamespace, this.cursorId, this.selectedServer, - { - ...this.cursorOptions, - session: this.cursorSession, - batchSize - } + getMoreOptions ); return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext); @@ -814,6 +841,8 @@ export abstract class AbstractCursor< } try { const state = await this._initialize(this.cursorSession); + // Set omitMaxTimeMS to the value needed for subsequent getMore calls + this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null; const response = state.response; this.selectedServer = state.server; this.cursorId = response.id; @@ -866,9 +895,9 @@ export abstract class AbstractCursor< } catch (error) { try { await this.cleanup(undefined, error); - } catch (error) { + } catch (cleanupError) { // `cleanupCursor` should never throw, squash and throw the original error - squashError(error); + squashError(cleanupError); } throw error; } diff --git a/src/cursor/run_command_cursor.ts b/src/cursor/run_command_cursor.ts index 6b31ce2263a..90e4a94fd42 100644 --- a/src/cursor/run_command_cursor.ts +++ b/src/cursor/run_command_cursor.ts @@ -23,6 +23,8 @@ export type RunCursorCommandOptions = { timeoutMS?: number; /** @internal */ timeoutMode?: CursorTimeoutMode; + tailable?: boolean; + awaitData?: boolean; } & BSONSerializeOptions; /** @public */ diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 0bc9165deee..ce115142bb2 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -482,6 +482,11 @@ export class MongoClient extends TypedEventEmitter implements return this.s.bsonOptions; } + /** @internal */ + get timeoutMS(): number | undefined { + return this.options.timeoutMS; + } + /** * Executes a client bulk write operation, available on server 8.0+. * @param models - The client bulk write models. diff --git a/src/operations/create_collection.ts b/src/operations/create_collection.ts index afb2680b9a0..293ecc8be52 100644 --- a/src/operations/create_collection.ts +++ b/src/operations/create_collection.ts @@ -17,6 +17,7 @@ import { Aspect, defineAspects } from './operation'; const ILLEGAL_COMMAND_FIELDS = new Set([ 'w', 'wtimeout', + 'timeoutMS', 'j', 'fsync', 'autoIndexId', diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index bb5b48babfd..3ffd309572a 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -24,7 +24,9 @@ function loadSpecString(filePath) { } function makeClient() { - this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017'); + this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', { + timeoutMS: 0 + }); } function connectClient() { diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 1b8c34633b4..09b95d6dff0 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -77,7 +77,7 @@ describe('CSOT spec prose tests', function () { beforeEach(async function () { await internalClient .db('db') - .collection('coll') + .collection('bulkWriteTest') .drop() .catch(() => null); await internalClient.db('admin').command(failpoint); @@ -93,7 +93,7 @@ describe('CSOT spec prose tests', function () { const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a })); const error = await client .db('db') - .collection<{ _id: number; a: Uint8Array }>('coll') + .collection<{ _id: number; a: Uint8Array }>('bulkWriteTest') .insertMany(oneMBDocs) .catch(error => error); @@ -265,6 +265,7 @@ describe('CSOT spec prose tests', function () { }); context('5. Blocking Iteration Methods', () => { + const metadata = { requires: { mongodb: '>=4.4' } }; /** * Tests in this section MUST only be run against server versions 4.4 and higher and only apply to drivers that have a * blocking method for cursor iteration that executes `getMore` commands in a loop until a document is available or an @@ -276,7 +277,7 @@ describe('CSOT spec prose tests', function () { data: { failCommands: ['getMore'], blockConnection: true, - blockTimeMS: 20 + blockTimeMS: 90 } }; let internalClient: MongoClient; @@ -286,7 +287,11 @@ describe('CSOT spec prose tests', function () { beforeEach(async function () { internalClient = this.configuration.newClient(); - await internalClient.db('db').dropCollection('coll'); + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); // Creating capped collection to be able to create tailable find cursor const coll = await internalClient .db('db') @@ -294,7 +299,13 @@ describe('CSOT spec prose tests', function () { await coll.insertOne({ x: 1 }); await internalClient.db().admin().command(failpoint); - client = this.configuration.newClient(undefined, { timeoutMS: 20, monitorCommands: true }); + client = this.configuration.newClient(undefined, { + monitorCommands: true, + timeoutMS: 100, + minPoolSize: 20 + }); + await client.connect(); + commandStarted = []; commandSucceeded = []; @@ -337,11 +348,11 @@ describe('CSOT spec prose tests', function () { * 1. Verify that a `find` command and two `getMore` commands were executed against the `db.coll` collection during the test. */ - it.skip('send correct number of finds and getMores', async function () { + it('send correct number of finds and getMores', metadata, async function () { const cursor = client .db('db') .collection('coll') - .find({}, { tailable: true, awaitData: true }) + .find({}, { tailable: true }) .project({ _id: 0 }); const doc = await cursor.next(); expect(doc).to.deep.equal({ x: 1 }); @@ -358,7 +369,7 @@ describe('CSOT spec prose tests', function () { expect(commandStarted.filter(e => e.command.find != null)).to.have.lengthOf(1); // Expect 2 getMore expect(commandStarted.filter(e => e.command.getMore != null)).to.have.lengthOf(2); - }).skipReason = 'TODO(NODE-6305)'; + }); }); context('Change Streams', () => { @@ -383,8 +394,11 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a timeout error. * 1. Verify that an `aggregate` command and two `getMore` commands were executed against the `db.coll` collection during the test. */ - it.skip('sends correct number of aggregate and getMores', async function () { - const changeStream = client.db('db').collection('coll').watch(); + it.skip('sends correct number of aggregate and getMores', metadata, async function () { + const changeStream = client + .db('db') + .collection('coll') + .watch([], { timeoutMS: 20, maxAwaitTimeMS: 19 }); const maybeError = await changeStream.next().then( () => null, e => e @@ -397,9 +411,9 @@ describe('CSOT spec prose tests', function () { const getMores = commandStarted.filter(e => e.command.getMore != null).map(e => e.command); // Expect 1 aggregate expect(aggregates).to.have.lengthOf(1); - // Expect 1 getMore - expect(getMores).to.have.lengthOf(1); - }).skipReason = 'TODO(NODE-6305)'; + // Expect 2 getMores + expect(getMores).to.have.lengthOf(2); + }).skipReason = 'TODO(NODE-6387)'; }); }); diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index 49ddabc924b..d72e9bc5ebe 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -25,7 +25,12 @@ const skippedTests = { 'Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset': 'TODO(DRIVERS-2965)', 'maxTimeMS value in the command is less than timeoutMS': - 'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs' + 'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs', + 'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure': + 'TODO(DRIVERS-2965)', + 'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(DRIVERS-2965)', + 'timeoutMS is refreshed for getMore - failure': + 'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs' // Skipping for both tailable awaitData and tailable non-awaitData cursors }; describe('CSOT spec tests', function () { diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index f4cfc7d882c..b1516454cc7 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -31,13 +31,18 @@ import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils'; const metadata = { requires: { mongodb: '>=4.4' } }; describe('CSOT driver tests', metadata, () => { + // NOTE: minPoolSize here is set to ensure that connections are available when testing timeout + // behaviour. This reduces flakiness in our tests since operations will not spend time + // establishing connections, more closely mirroring long-running application behaviour + const minPoolSize = 20; + describe('timeoutMS inheritance', () => { let client: MongoClient; let db: Db; let coll: Collection; beforeEach(async function () { - client = this.configuration.newClient(undefined, { timeoutMS: 100 }); + client = this.configuration.newClient(undefined, { timeoutMS: 100, minPoolSize }); db = client.db('test', { timeoutMS: 200 }); }); @@ -159,7 +164,10 @@ describe('CSOT driver tests', metadata, () => { metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } }, test: async function () { const commandsStarted = []; - client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true }); + client = this.configuration.newClient(undefined, { + timeoutMS: 1, + monitorCommands: true + }); client.on('commandStarted', ev => commandsStarted.push(ev)); @@ -591,6 +599,211 @@ describe('CSOT driver tests', metadata, () => { }); }); + describe('Tailable cursors', function () { + let client: MongoClient; + let internalClient: MongoClient; + let commandStarted: CommandStartedEvent[]; + const metadata: MongoDBMetadataUI = { + requires: { mongodb: '>=4.4' } + }; + + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['aggregate', 'find', 'getMore'], + blockConnection: true, + blockTimeMS: 100 + } + }; + + beforeEach(async function () { + internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .dropCollection('coll') + .catch(() => null); + + await internalClient.db('db').createCollection('coll', { capped: true, size: 1_000_000 }); + + await internalClient + .db('db') + .collection('coll') + .insertMany( + Array.from({ length: 100 }, () => { + return { x: 1 }; + }) + ); + + await internalClient.db().admin().command(failpoint); + + client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize }); + commandStarted = []; + client.on('commandStarted', ev => commandStarted.push(ev)); + await client.connect(); + }); + + afterEach(async function () { + await internalClient + .db() + .admin() + .command({ ...failpoint, mode: 'off' }); + await internalClient.close(); + await client.close(); + }); + + context('when in ITERATION mode', function () { + context('awaitData cursors', function () { + let cursor: FindCursor; + afterEach(async function () { + if (cursor) await cursor.close(); + }); + + it('applies timeoutMS to initial command', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 50, tailable: true, awaitData: true, batchSize: 1 }); + const maybeError = await cursor.next().then( + () => null, + e => e + ); + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + + const finds = commandStarted.filter(x => x.commandName === 'find'); + const getMores = commandStarted.filter(x => x.commandName === 'getMore'); + expect(finds).to.have.lengthOf(1); + expect(getMores).to.have.lengthOf(0); + }); + + it('refreshes the timeout for subsequent getMores', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 150, tailable: true, awaitData: true, batchSize: 1 }); + for (let i = 0; i < 5; i++) { + // Iterate cursor 5 times (server would have blocked for 500ms overall, but client + // should not throw + await cursor.next(); + } + }); + + it('does not use timeoutMS to compute maxTimeMS for getMores', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 10_000, tailable: true, awaitData: true, batchSize: 1 }); + await cursor.next(); + await cursor.next(); + + const getMores = commandStarted + .filter(x => x.command.getMore != null) + .map(x => x.command); + expect(getMores).to.have.lengthOf(1); + + const [getMore] = getMores; + expect(getMore).to.not.haveOwnProperty('maxTimeMS'); + }); + + context('when maxAwaitTimeMS is specified', function () { + it( + 'sets maxTimeMS to the configured maxAwaitTimeMS value on getMores', + metadata, + async function () { + cursor = client.db('db').collection('coll').find( + {}, + { + timeoutMS: 10_000, + tailable: true, + awaitData: true, + batchSize: 1, + maxAwaitTimeMS: 100 + } + ); + await cursor.next(); + await cursor.next(); + + const getMores = commandStarted + .filter(x => x.command.getMore != null) + .map(x => x.command); + expect(getMores).to.have.lengthOf(1); + + const [getMore] = getMores; + expect(getMore).to.haveOwnProperty('maxTimeMS'); + expect(getMore.maxTimeMS).to.equal(100); + } + ); + }); + }); + + context('non-awaitData cursors', function () { + let cursor: FindCursor; + + afterEach(async function () { + if (cursor) await cursor.close(); + }); + + it('applies timeoutMS to initial command', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 50, tailable: true, batchSize: 1 }); + const maybeError = await cursor.next().then( + () => null, + e => e + ); + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + + const finds = commandStarted.filter(x => x.commandName === 'find'); + const getMores = commandStarted.filter(x => x.commandName === 'getMore'); + expect(finds).to.have.lengthOf(1); + expect(getMores).to.have.lengthOf(0); + }); + + it('refreshes the timeout for subsequent getMores', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 150, tailable: true, batchSize: 1 }); + for (let i = 0; i < 5; i++) { + // Iterate cursor 5 times (server would have blocked for 500ms overall, but client + // should not throw + await cursor.next(); + } + }); + + it('does not append a maxTimeMS field to original command', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 2000, tailable: true, batchSize: 1 }); + + await cursor.next(); + + const finds = commandStarted.filter(x => x.command.find != null); + expect(finds).to.have.lengthOf(1); + expect(finds[0].command.find).to.exist; + expect(finds[0].command.maxTimeMS).to.not.exist; + }); + it('does not append a maxTimeMS field to subsequent getMores', metadata, async function () { + cursor = client + .db('db') + .collection('coll') + .find({}, { timeoutMS: 2000, tailable: true, batchSize: 1 }); + + await cursor.next(); + await cursor.next(); + + const getMores = commandStarted.filter(x => x.command.getMore != null); + + expect(getMores).to.have.lengthOf(1); + expect(getMores[0].command.getMore).to.exist; + expect(getMores[0].command.getMore.maxTimeMS).to.not.exist; + }); + }); + }); + }); + describe('GridFSBucket', () => { const blockTimeMS = 200; let internalClient: MongoClient; @@ -798,6 +1011,10 @@ describe('CSOT driver tests', metadata, () => { beforeEach(async function () { client = this.configuration.newClient({ timeoutMS: 123 }); + await client + .db('db') + .dropCollection('coll') + .catch(() => null); }); afterEach(async function () { diff --git a/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json new file mode 100644 index 00000000000..17da3e3c0c9 --- /dev/null +++ b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json @@ -0,0 +1,146 @@ +{ + "description": "timeoutMS behaves correctly for tailable awaitData cursors", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "uriOptions": { + "timeoutMS": 200 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "createOptions": { + "capped": true, + "size": 500 + }, + "documents": [ + { + "_id": 0 + }, + { + "_id": 1 + } + ] + } + ], + "tests": [ + { + "description": "timeoutMS is refreshed for getMore - failure", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "getMore" + ], + "blockConnection": true, + "blockTimeMS": 250 + } + } + } + }, + { + "name": "createFindCursor", + "object": "collection", + "arguments": { + "filter": {}, + "cursorType": "tailableAwait", + "batchSize": 1 + }, + "saveResultAsEntity": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor", + "expectError": { + "isTimeoutError": true + } + } + ], + "expectEvents": [ + { + "client": "client", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "commandName": "find", + "databaseName": "test", + "command": { + "find": "coll", + "tailable": true, + "awaitData": true, + "maxTimeMS": { + "$$exists": true + } + } + } + }, + { + "commandStartedEvent": { + "commandName": "getMore", + "databaseName": "test", + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "coll" + } + } + } + ] + } + ] + } + ] +} diff --git a/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-non-awaitData.json b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-non-awaitData.json new file mode 100644 index 00000000000..80cf74a1116 --- /dev/null +++ b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-non-awaitData.json @@ -0,0 +1,151 @@ +{ + "description": "timeoutMS behaves correctly for tailable non-awaitData cursors", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "uriOptions": { + "timeoutMS": 200 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "createOptions": { + "capped": true, + "size": 500 + }, + "documents": [ + { + "_id": 0 + }, + { + "_id": 1 + } + ] + } + ], + "tests": [ + { + "description": "timeoutMS is refreshed for getMore - failure", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "getMore" + ], + "blockConnection": true, + "blockTimeMS": 250 + } + } + } + }, + { + "name": "createFindCursor", + "object": "collection", + "arguments": { + "filter": {}, + "cursorType": "tailable", + "batchSize": 1 + }, + "saveResultAsEntity": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor", + "expectError": { + "isTimeoutError": true + } + } + ], + "expectEvents": [ + { + "client": "client", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "commandName": "find", + "databaseName": "test", + "command": { + "find": "coll", + "tailable": true, + "awaitData": { + "$$exists": false + }, + "maxTimeMS": { + "$$exists": false + } + } + } + }, + { + "commandStartedEvent": { + "commandName": "getMore", + "databaseName": "test", + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "coll", + "maxTimeMS": { + "$$exists": false + } + } + } + } + ] + } + ] + } + ] +} diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index a9f79842c31..f7c34a70239 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -268,7 +268,18 @@ operations.set('createCollection', async ({ entities, operation }) => { operations.set('createFindCursor', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - const { filter, ...opts } = operation.arguments!; + const { filter, cursorType, ...opts } = operation.arguments!; + switch (cursorType) { + case 'tailableAwait': + opts.tailable = true; + opts.awaitData = true; + break; + case 'tailable': + opts.tailable = true; + break; + default: + break; + } const cursor = collection.find(filter, opts); // The spec dictates that we create the cursor and force the find command // to execute, but don't move the cursor forward. hasNext() accomplishes @@ -332,7 +343,18 @@ operations.set('find', async ({ entities, operation }) => { } else { queryable = entities.getEntity('collection', operation.object); } - const { filter, ...opts } = operation.arguments!; + const { filter, cursorType, ...opts } = operation.arguments!; + switch (cursorType) { + case 'tailableAwait': + opts.tailable = true; + opts.awaitData = true; + break; + case 'tailable': + opts.tailable = true; + break; + default: + break; + } return queryable.find(filter, opts).toArray(); }); @@ -804,10 +826,25 @@ operations.set('runCursorCommand', async ({ entities, operation }: OperationFunc operations.set('createCommandCursor', async ({ entities, operation }: OperationFunctionParams) => { const collection = entities.getEntity('db', operation.object); - const { command, ...opts } = operation.arguments!; + const { command, cursorType, ...opts } = operation.arguments!; + switch (cursorType) { + case 'tailableAwait': + opts.tailable = true; + opts.awaitData = true; + break; + case 'tailable': + opts.tailable = true; + break; + default: + break; + } const cursor = collection.runCursorCommand(command, { readPreference: ReadPreference.fromOptions({ readPreference: opts.readPreference }), - session: opts.session + session: opts.session, + tailable: opts.tailable, + awaitData: opts.awaitData, + timeoutMode: opts.timeoutMode, + timeoutMS: opts.timeoutMS }); if (!Number.isNaN(+opts.batchSize)) cursor.setBatchSize(+opts.batchSize);