diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 9c42e4220b..c4e2198fc2 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -90,8 +90,11 @@ export function onData( // Adding event handlers emitter.on('data', eventHandler); emitter.on('error', errorHandler); + + const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead; + timeoutForSocketRead?.throwIfExpired(); // eslint-disable-next-line github/no-then - timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler); + timeoutForSocketRead?.then(undefined, errorHandler); const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead; timeoutForSocketRead?.throwIfExpired(); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 40026da085..f49835e3c5 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -894,4 +894,153 @@ describe('CSOT driver tests', metadata, () => { }); }); }); + + describe('when using an explicit session', () => { + const metadata: MongoDBMetadataUI = { + requires: { topology: ['replicaset'], mongodb: '>=4.4' } + }; + + describe('created for a withTransaction callback', () => { + describe('passing a timeoutMS and a session with a timeoutContext', () => { + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient({ timeoutMS: 123 }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('throws a validation error from the operation', metadata, async () => { + // Drivers MUST raise a validation error if an explicit session with a timeout is used and + // the timeoutMS option is set at the operation level for operations executed as part of a withTransaction callback. + + const coll = client.db('db').collection('coll'); + + const session = client.startSession(); + + let insertError: Error | null = null; + const withTransactionError = await session + .withTransaction(async session => { + insertError = await coll + .insertOne({ x: 1 }, { session, timeoutMS: 1234 }) + .catch(error => error); + throw insertError; + }) + .catch(error => error); + + expect(insertError).to.be.instanceOf(MongoInvalidArgumentError); + expect(withTransactionError).to.be.instanceOf(MongoInvalidArgumentError); + }); + }); + }); + + describe('created manually', () => { + describe('passing a timeoutMS and a session with an inherited timeoutMS', () => { + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient({ timeoutMS: 123 }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('does not throw a validation error', metadata, async () => { + const coll = client.db('db').collection('coll'); + const session = client.startSession(); + session.startTransaction(); + await coll.insertOne({ x: 1 }, { session, timeoutMS: 1234 }); + await session.abortTransaction(); // this uses the inherited timeoutMS, not the insert + }); + }); + }); + }); + + describe('Convenient Transactions', () => { + /** Tests in this section MUST only run against replica sets and sharded clusters with server versions 4.4 or higher. */ + const metadata: MongoDBMetadataUI = { + requires: { topology: ['replicaset', 'sharded'], mongodb: '>=5.0' } + }; + + describe('when an operation fails inside withTransaction callback', () => { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 2 }, + data: { + failCommands: ['insert', 'abortTransaction'], + blockConnection: true, + blockTimeMS: 600 + } + }; + + beforeEach(async function () { + if (!semver.satisfies(this.configuration.version, '>=4.4')) { + this.skipReason = 'Requires server version 4.4+'; + this.skip(); + } + const internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .collection('coll') + .drop() + .catch(() => null); + await internalClient.db('admin').command(failpoint); + await internalClient.close(); + }); + + let client: MongoClient; + + afterEach(async function () { + if (semver.satisfies(this.configuration.version, '>=4.4')) { + const internalClient = this.configuration.newClient(); + await internalClient + .db('admin') + .command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + } + await client?.close(); + }); + + it( + 'timeoutMS is refreshed for abortTransaction and the timeout error is thrown from the operation', + metadata, + async function () { + const commandsFailed = []; + const commandsStarted = []; + + client = this.configuration + .newClient({ timeoutMS: 500, monitorCommands: true }) + .on('commandStarted', e => commandsStarted.push(e.commandName)) + .on('commandFailed', e => commandsFailed.push(e.commandName)); + + const coll = client.db('db').collection('coll'); + + const session = client.startSession(); + + let insertError: Error | null = null; + const withTransactionError = await session + .withTransaction(async session => { + insertError = await coll.insertOne({ x: 1 }, { session }).catch(error => error); + throw insertError; + }) + .catch(error => error); + + try { + expect(insertError).to.be.instanceOf(MongoOperationTimeoutError); + expect(withTransactionError).to.be.instanceOf(MongoOperationTimeoutError); + expect(commandsStarted, 'commands started').to.deep.equal([ + 'insert', + 'abortTransaction' + ]); + expect(commandsFailed, 'commands failed').to.deep.equal(['insert', 'abortTransaction']); + } finally { + await session.endSession(); + } + } + ); + }); + }); });