Skip to content

Commit

Permalink
feat(NODE-6274): add CSOT support to bulkWrite (#4250)
Browse files Browse the repository at this point in the history
Co-authored-by: Bailey Pearson <[email protected]>
  • Loading branch information
nbbeeken and baileympearson committed Oct 10, 2024
1 parent d829857 commit cece3c2
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 22 deletions.
18 changes: 13 additions & 5 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ export function mergeBatchResults(

async function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions
options: BulkWriteOptions & { timeoutContext?: TimeoutContext | null }
): Promise<BulkWriteResult> {
if (bulkOperation.s.batches.length === 0) {
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
Expand Down Expand Up @@ -552,7 +552,11 @@ async function executeCommands(
let thrownError = null;
let result;
try {
result = await executeOperation(bulkOperation.s.collection.client, operation);
result = await executeOperation(
bulkOperation.s.collection.client,
operation,
finalOptions.timeoutContext
);
} catch (error) {
thrownError = error;
}
Expand Down Expand Up @@ -866,15 +870,19 @@ export class BulkWriteShimOperation extends AbstractOperation {
return 'bulkWrite' as const;
}

async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
async execute(
_server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return await executeCommands(this.bulkOperation, this.options);
return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext });
}
}

Expand Down Expand Up @@ -1203,7 +1211,7 @@ export abstract class BulkOperationBase {
const finalOptions = { ...this.s.options, ...options };
const operation = new BulkWriteShimOperation(this, finalOptions);

return await executeOperation(this.s.collection.client, operation);
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as sinon from 'sinon';
import { type CommandStartedEvent } from '../../../mongodb';
import {
type CommandSucceededEvent,
MongoBulkWriteError,
MongoClient,
MongoOperationTimeoutError,
MongoServerSelectionError,
Expand All @@ -28,7 +29,7 @@ describe('CSOT spec prose tests', function () {
await client?.close();
});

context.skip('1. Multi-batch writes', () => {
describe('1. Multi-batch writes', { requires: { topology: 'single', mongodb: '>=4.4' } }, () => {
/**
* This test MUST only run against standalones on server versions 4.4 and higher.
* The `insertMany` call takes an exceedingly long time on replicasets and sharded
Expand All @@ -55,6 +56,46 @@ describe('CSOT spec prose tests', function () {
* - Expect this to fail with a timeout error.
* 1. Verify that two `insert` commands were executed against `db.coll` as part of the `insertMany` call.
*/

const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['insert'],
blockConnection: true,
blockTimeMS: 1010
}
};

beforeEach(async function () {
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

it('performs two inserts which fail to complete before 2000 ms', async () => {
const inserts = [];
client.on('commandStarted', ev => inserts.push(ev));

const a = new Uint8Array(1000000 - 22);
const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a }));
const error = await client
.db('db')
.collection<{ _id: number; a: Uint8Array }>('coll')
.insertMany(oneMBDocs)
.catch(error => error);

expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.errorResponse).to.be.instanceOf(MongoOperationTimeoutError);
expect(inserts.map(ev => ev.commandName)).to.deep.equal(['insert', 'insert']);
});
});

context.skip('2. maxTimeMS is not set for commands sent to mongocryptd', () => {
Expand Down Expand Up @@ -901,4 +942,103 @@ describe('CSOT spec prose tests', function () {
});
});
});

describe.skip(
'11. Multi-batch bulkWrites',
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
function () {
/**
* ### 11. Multi-batch bulkWrites
*
* This test MUST only run against server versions 8.0+. This test must be skipped on Atlas Serverless.
*
* 1. Using `internalClient`, drop the `db.coll` collection.
*
* 2. Using `internalClient`, set the following fail point:
*
* @example
* ```javascript
* {
* configureFailPoint: "failCommand",
* mode: {
* times: 2
* },
* data: {
* failCommands: ["bulkWrite"],
* blockConnection: true,
* blockTimeMS: 1010
* }
* }
* ```
*
* 3. Using `internalClient`, perform a `hello` command and record the `maxBsonObjectSize` and `maxMessageSizeBytes` values
* in the response.
*
* 4. Create a new MongoClient (referred to as `client`) with `timeoutMS=2000`.
*
* 5. Create a list of write models (referred to as `models`) with the following write model repeated
* (`maxMessageSizeBytes / maxBsonObjectSize + 1`) times:
*
* @example
* ```json
* InsertOne {
* "namespace": "db.coll",
* "document": { "a": "b".repeat(maxBsonObjectSize - 500) }
* }
* ```
*
* 6. Call `bulkWrite` on `client` with `models`.
*
* - Expect this to fail with a timeout error.
*
* 7. Verify that two `bulkWrite` commands were executed as part of the `MongoClient.bulkWrite` call.
*/
const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['bulkWrite'],
blockConnection: true,
blockTimeMS: 1010
}
};

let maxBsonObjectSize: number;
let maxMessageSizeBytes: number;

beforeEach(async function () {
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);

const hello = await internalClient.db('admin').command({ hello: 1 });
maxBsonObjectSize = hello.maxBsonObjectSize;
maxMessageSizeBytes = hello.maxMessageSizeBytes;

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
const writes = [];
client.on('commandStarted', ev => writes.push(ev));

const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
const models = Array.from({ length }, () => ({
namespace: 'db.coll',
name: 'insertOne' as const,
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
}));

const error = await client.bulkWrite(models).catch(error => error);

expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
}
);
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

const skippedSpecs = {
bulkWrite: 'TODO(NODE-6274)',
'change-streams': 'TODO(NODE-6035)',
'convenient-transactions': 'TODO(NODE-5687)',
'deprecated-options': 'TODO(NODE-5689)',
Expand All @@ -19,18 +18,12 @@ const skippedSpecs = {
};

const skippedTests = {
'timeoutMS can be configured on a MongoClient - insertMany on collection': 'TODO(NODE-6274)',
'timeoutMS can be configured on a MongoClient - bulkWrite on collection': 'TODO(NODE-6274)',
'timeoutMS can be configured on a MongoClient - createChangeStream on client': 'TODO(NODE-6305)',
'timeoutMS applies to whole operation, not individual attempts - createChangeStream on client':
'TODO(NODE-6305)',
'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(NODE-6305)',
'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure':
'TODO(NODE-6305)',
'timeoutMS applies to whole operation, not individual attempts - insertMany on collection':
'TODO(NODE-6274)',
'timeoutMS applies to whole operation, not individual attempts - bulkWrite on collection':
'TODO(NODE-6274)',
'command is not sent if RTT is greater than timeoutMS': 'TODO(DRIVERS-2965)',
'Non=tailable cursor iteration timeoutMS is refreshed for getMore if timeoutMode is iteration - failure':
'TODO(DRIVERS-2965)',
Expand Down
14 changes: 5 additions & 9 deletions test/tools/unified-spec-runner/match.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,11 @@ export function expectErrorCheck(
if (expected.isTimeoutError === false) {
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
} else if (expected.isTimeoutError === true) {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
}

// TODO(NODE-6274): Check for MongoBulkWriteErrors that have a MongoOperationTimeoutError in their
// errorResponse field
if (expected.isTimeoutError === false) {
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
} else if (expected.isTimeoutError === true) {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
if ('errorResponse' in error) {
expect(error.errorResponse).to.be.instanceof(MongoOperationTimeoutError);
} else {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
}
}

if (expected.errorContains != null) {
Expand Down

0 comments on commit cece3c2

Please sign in to comment.