Skip to content

Commit

Permalink
feat(NODE-6305): Add CSOT support to tailable cursors (#4218)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <[email protected]>
  • Loading branch information
W-A-James and nbbeeken committed Nov 1, 2024
1 parent e5531f2 commit 38fc65e
Show file tree
Hide file tree
Showing 11 changed files with 641 additions and 32 deletions.
51 changes: 40 additions & 11 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,35 @@ export abstract class AbstractCursor<
options.readPreference && options.readPreference instanceof ReadPreference
? options.readPreference
: ReadPreference.primary,
...pluckBSONSerializeOptions(options)
...pluckBSONSerializeOptions(options),
timeoutMS: options.timeoutMS,
tailable: options.tailable,
awaitData: options.awaitData
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;

if (options.awaitData) {
if (
options.maxAwaitTimeMS != null &&
options.maxAwaitTimeMS >= this.cursorOptions.timeoutMS
)
throw new MongoInvalidArgumentError(
'Cannot specify maxAwaitTimeMS >= timeoutMS for a tailable awaitData cursor'
);
}
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
}
this.cursorOptions.timeoutMode =
options.timeoutMode ??
Expand All @@ -223,6 +246,8 @@ export abstract class AbstractCursor<
if (options.timeoutMode != null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}

// Set for initial command
this.cursorOptions.omitMaxTimeMS =
this.cursorOptions.timeoutMS != null &&
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
Expand Down Expand Up @@ -781,15 +806,17 @@ export abstract class AbstractCursor<
'Unexpected null selectedServer. A cursor creating command should have set this'
);
}
const getMoreOptions = {
...this.cursorOptions,
session: this.cursorSession,
batchSize
};

const getMoreOperation = new GetMoreOperation(
this.cursorNamespace,
this.cursorId,
this.selectedServer,
{
...this.cursorOptions,
session: this.cursorSession,
batchSize
}
getMoreOptions
);

return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
Expand All @@ -814,6 +841,8 @@ export abstract class AbstractCursor<
}
try {
const state = await this._initialize(this.cursorSession);
// Set omitMaxTimeMS to the value needed for subsequent getMore calls
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
const response = state.response;
this.selectedServer = state.server;
this.cursorId = response.id;
Expand Down Expand Up @@ -866,9 +895,9 @@ export abstract class AbstractCursor<
} catch (error) {
try {
await this.cleanup(undefined, error);
} catch (error) {
} catch (cleanupError) {
// `cleanupCursor` should never throw, squash and throw the original error
squashError(error);
squashError(cleanupError);
}
throw error;
}
Expand Down
2 changes: 2 additions & 0 deletions src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export type RunCursorCommandOptions = {
timeoutMS?: number;
/** @internal */
timeoutMode?: CursorTimeoutMode;
tailable?: boolean;
awaitData?: boolean;
} & BSONSerializeOptions;

/** @public */
Expand Down
5 changes: 5 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/** @internal */
get timeoutMS(): number | undefined {
return this.options.timeoutMS;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
Expand Down
1 change: 1 addition & 0 deletions src/operations/create_collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Aspect, defineAspects } from './operation';
const ILLEGAL_COMMAND_FIELDS = new Set([
'w',
'wtimeout',
'timeoutMS',
'j',
'fsync',
'autoIndexId',
Expand Down
4 changes: 3 additions & 1 deletion test/benchmarks/driverBench/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ function loadSpecString(filePath) {
}

function makeClient() {
this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017');
this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', {
timeoutMS: 0
});
}

function connectClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('CSOT spec prose tests', function () {
beforeEach(async function () {
await internalClient
.db('db')
.collection('coll')
.collection('bulkWriteTest')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);
Expand All @@ -93,7 +93,7 @@ describe('CSOT spec prose tests', function () {
const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a }));
const error = await client
.db('db')
.collection<{ _id: number; a: Uint8Array }>('coll')
.collection<{ _id: number; a: Uint8Array }>('bulkWriteTest')
.insertMany(oneMBDocs)
.catch(error => error);

Expand Down Expand Up @@ -265,6 +265,7 @@ describe('CSOT spec prose tests', function () {
});

context('5. Blocking Iteration Methods', () => {
const metadata = { requires: { mongodb: '>=4.4' } };
/**
* Tests in this section MUST only be run against server versions 4.4 and higher and only apply to drivers that have a
* blocking method for cursor iteration that executes `getMore` commands in a loop until a document is available or an
Expand All @@ -276,7 +277,7 @@ describe('CSOT spec prose tests', function () {
data: {
failCommands: ['getMore'],
blockConnection: true,
blockTimeMS: 20
blockTimeMS: 90
}
};
let internalClient: MongoClient;
Expand All @@ -286,15 +287,25 @@ describe('CSOT spec prose tests', function () {

beforeEach(async function () {
internalClient = this.configuration.newClient();
await internalClient.db('db').dropCollection('coll');
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
// Creating capped collection to be able to create tailable find cursor
const coll = await internalClient
.db('db')
.createCollection('coll', { capped: true, size: 1_000_000 });
await coll.insertOne({ x: 1 });
await internalClient.db().admin().command(failpoint);

client = this.configuration.newClient(undefined, { timeoutMS: 20, monitorCommands: true });
client = this.configuration.newClient(undefined, {
monitorCommands: true,
timeoutMS: 100,
minPoolSize: 20
});
await client.connect();

commandStarted = [];
commandSucceeded = [];

Expand Down Expand Up @@ -337,11 +348,11 @@ describe('CSOT spec prose tests', function () {
* 1. Verify that a `find` command and two `getMore` commands were executed against the `db.coll` collection during the test.
*/

it.skip('send correct number of finds and getMores', async function () {
it('send correct number of finds and getMores', metadata, async function () {
const cursor = client
.db('db')
.collection('coll')
.find({}, { tailable: true, awaitData: true })
.find({}, { tailable: true })
.project({ _id: 0 });
const doc = await cursor.next();
expect(doc).to.deep.equal({ x: 1 });
Expand All @@ -358,7 +369,7 @@ describe('CSOT spec prose tests', function () {
expect(commandStarted.filter(e => e.command.find != null)).to.have.lengthOf(1);
// Expect 2 getMore
expect(commandStarted.filter(e => e.command.getMore != null)).to.have.lengthOf(2);
}).skipReason = 'TODO(NODE-6305)';
});
});

context('Change Streams', () => {
Expand All @@ -383,8 +394,11 @@ describe('CSOT spec prose tests', function () {
* - Expect this to fail with a timeout error.
* 1. Verify that an `aggregate` command and two `getMore` commands were executed against the `db.coll` collection during the test.
*/
it.skip('sends correct number of aggregate and getMores', async function () {
const changeStream = client.db('db').collection('coll').watch();
it.skip('sends correct number of aggregate and getMores', metadata, async function () {
const changeStream = client
.db('db')
.collection('coll')
.watch([], { timeoutMS: 20, maxAwaitTimeMS: 19 });
const maybeError = await changeStream.next().then(
() => null,
e => e
Expand All @@ -397,9 +411,9 @@ describe('CSOT spec prose tests', function () {
const getMores = commandStarted.filter(e => e.command.getMore != null).map(e => e.command);
// Expect 1 aggregate
expect(aggregates).to.have.lengthOf(1);
// Expect 1 getMore
expect(getMores).to.have.lengthOf(1);
}).skipReason = 'TODO(NODE-6305)';
// Expect 2 getMores
expect(getMores).to.have.lengthOf(2);
}).skipReason = 'TODO(NODE-6387)';
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ const skippedTests = {
'Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset':
'TODO(DRIVERS-2965)',
'maxTimeMS value in the command is less than timeoutMS':
'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs'
'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs',
'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure':
'TODO(DRIVERS-2965)',
'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(DRIVERS-2965)',
'timeoutMS is refreshed for getMore - failure':
'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs' // Skipping for both tailable awaitData and tailable non-awaitData cursors
};

describe('CSOT spec tests', function () {
Expand Down
Loading

0 comments on commit 38fc65e

Please sign in to comment.