Skip to content

Commit

Permalink
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored and W-A-James committed Oct 1, 2024
1 parent 72ccc6a commit 8c01d69
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ export function onData(
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);

const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
timeoutForSocketRead?.throwIfExpired();
// eslint-disable-next-line github/no-then
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);
timeoutForSocketRead?.then(undefined, errorHandler);

const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
timeoutForSocketRead?.throwIfExpired();
Expand Down
149 changes: 149 additions & 0 deletions test/integration/client-side-operations-timeout/node_csot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -894,4 +894,153 @@ describe('CSOT driver tests', metadata, () => {
});
});
});

describe('when using an explicit session', () => {
const metadata: MongoDBMetadataUI = {
requires: { topology: ['replicaset'], mongodb: '>=4.4' }
};

describe('created for a withTransaction callback', () => {
describe('passing a timeoutMS and a session with a timeoutContext', () => {
let client: MongoClient;

beforeEach(async function () {
client = this.configuration.newClient({ timeoutMS: 123 });
});

afterEach(async function () {
await client.close();
});

it('throws a validation error from the operation', metadata, async () => {
// Drivers MUST raise a validation error if an explicit session with a timeout is used and
// the timeoutMS option is set at the operation level for operations executed as part of a withTransaction callback.

const coll = client.db('db').collection('coll');

const session = client.startSession();

let insertError: Error | null = null;
const withTransactionError = await session
.withTransaction(async session => {
insertError = await coll
.insertOne({ x: 1 }, { session, timeoutMS: 1234 })
.catch(error => error);
throw insertError;
})
.catch(error => error);

expect(insertError).to.be.instanceOf(MongoInvalidArgumentError);
expect(withTransactionError).to.be.instanceOf(MongoInvalidArgumentError);
});
});
});

describe('created manually', () => {
describe('passing a timeoutMS and a session with an inherited timeoutMS', () => {
let client: MongoClient;

beforeEach(async function () {
client = this.configuration.newClient({ timeoutMS: 123 });
});

afterEach(async function () {
await client.close();
});

it('does not throw a validation error', metadata, async () => {
const coll = client.db('db').collection('coll');
const session = client.startSession();
session.startTransaction();
await coll.insertOne({ x: 1 }, { session, timeoutMS: 1234 });
await session.abortTransaction(); // this uses the inherited timeoutMS, not the insert
});
});
});
});

describe('Convenient Transactions', () => {
/** Tests in this section MUST only run against replica sets and sharded clusters with server versions 4.4 or higher. */
const metadata: MongoDBMetadataUI = {
requires: { topology: ['replicaset', 'sharded'], mongodb: '>=5.0' }
};

describe('when an operation fails inside withTransaction callback', () => {
const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 2 },
data: {
failCommands: ['insert', 'abortTransaction'],
blockConnection: true,
blockTimeMS: 600
}
};

beforeEach(async function () {
if (!semver.satisfies(this.configuration.version, '>=4.4')) {
this.skipReason = 'Requires server version 4.4+';
this.skip();
}
const internalClient = this.configuration.newClient();
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);
await internalClient.close();
});

let client: MongoClient;

afterEach(async function () {
if (semver.satisfies(this.configuration.version, '>=4.4')) {
const internalClient = this.configuration.newClient();
await internalClient
.db('admin')
.command({ configureFailPoint: 'failCommand', mode: 'off' });
await internalClient.close();
}
await client?.close();
});

it(
'timeoutMS is refreshed for abortTransaction and the timeout error is thrown from the operation',
metadata,
async function () {
const commandsFailed = [];
const commandsStarted = [];

client = this.configuration
.newClient({ timeoutMS: 500, monitorCommands: true })
.on('commandStarted', e => commandsStarted.push(e.commandName))
.on('commandFailed', e => commandsFailed.push(e.commandName));

const coll = client.db('db').collection('coll');

const session = client.startSession();

let insertError: Error | null = null;
const withTransactionError = await session
.withTransaction(async session => {
insertError = await coll.insertOne({ x: 1 }, { session }).catch(error => error);
throw insertError;
})
.catch(error => error);

try {
expect(insertError).to.be.instanceOf(MongoOperationTimeoutError);
expect(withTransactionError).to.be.instanceOf(MongoOperationTimeoutError);
expect(commandsStarted, 'commands started').to.deep.equal([
'insert',
'abortTransaction'
]);
expect(commandsFailed, 'commands failed').to.deep.equal(['insert', 'abortTransaction']);
} finally {
await session.endSession();
}
}
);
});
});
});

0 comments on commit 8c01d69

Please sign in to comment.