Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6274): add CSOT support to bulkWrite #4250

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ export function mergeBatchResults(

function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions,
options: BulkWriteOptions & { timeoutContext?: TimeoutContext | null },
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
Expand Down Expand Up @@ -590,7 +590,11 @@ function executeCommands(
: null;

if (operation != null) {
executeOperation(bulkOperation.s.collection.client, operation).then(
executeOperation(
bulkOperation.s.collection.client,
operation,
finalOptions.timeoutContext
).then(
result => resultHandler(undefined, result),
error => resultHandler(error)
);
Expand Down Expand Up @@ -899,15 +903,19 @@ export class BulkWriteShimOperation extends AbstractOperation {
return 'bulkWrite' as const;
}

execute(_server: Server, session: ClientSession | undefined): Promise<any> {
async execute(
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
_server: Server,
session: ClientSession | undefined,
timeoutContext: TimeoutContext
): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommandsAsync(this.bulkOperation, this.options);
return await executeCommandsAsync(this.bulkOperation, { ...this.options, timeoutContext });
}
}

Expand Down Expand Up @@ -1236,7 +1244,7 @@ export abstract class BulkOperationBase {
const finalOptions = { ...this.s.options, ...options };
const operation = new BulkWriteShimOperation(this, finalOptions);

return await executeOperation(this.s.collection.client, operation);
return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as sinon from 'sinon';
import { type CommandStartedEvent } from '../../../mongodb';
import {
type CommandSucceededEvent,
MongoBulkWriteError,
MongoClient,
MongoOperationTimeoutError,
MongoServerSelectionError,
Expand All @@ -28,7 +29,7 @@ describe('CSOT spec prose tests', function () {
await client?.close();
});

context.skip('1. Multi-batch writes', () => {
describe('1. Multi-batch writes', { requires: { topology: 'single', mongodb: '>=4.4' } }, () => {
/**
* This test MUST only run against standalones on server versions 4.4 and higher.
* The `insertMany` call takes an exceedingly long time on replicasets and sharded
Expand All @@ -55,6 +56,46 @@ describe('CSOT spec prose tests', function () {
* - Expect this to fail with a timeout error.
* 1. Verify that two `insert` commands were executed against `db.coll` as part of the `insertMany` call.
*/

const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['insert'],
blockConnection: true,
blockTimeMS: 1010
}
};

beforeEach(async function () {
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

it('performs two inserts which fail to complete before 2000 ms', async () => {
const inserts = [];
client.on('commandStarted', ev => inserts.push(ev));

const a = new Uint8Array(1000000 - 22);
const oneMBDocs = Array.from({ length: 50 }, (_, _id) => ({ _id, a }));
const error = await client
.db('db')
.collection<{ _id: number; a: Uint8Array }>('coll')
.insertMany(oneMBDocs)
.catch(error => error);

expect(error).to.be.instanceOf(MongoBulkWriteError);
expect(error.errorResponse).to.be.instanceOf(MongoOperationTimeoutError);
expect(inserts.map(ev => ev.commandName)).to.deep.equal(['insert', 'insert']);
});
});

context.skip('2. maxTimeMS is not set for commands sent to mongocryptd', () => {
Expand Down Expand Up @@ -901,4 +942,103 @@ describe('CSOT spec prose tests', function () {
});
});
});

describe.skip(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
'11. Multi-batch bulkWrites',
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
function () {
/**
* ### 11. Multi-batch bulkWrites
*
* This test MUST only run against server versions 8.0+. This test must be skipped on Atlas Serverless.
*
* 1. Using `internalClient`, drop the `db.coll` collection.
*
* 2. Using `internalClient`, set the following fail point:
*
* @example
* ```javascript
* {
* configureFailPoint: "failCommand",
* mode: {
* times: 2
* },
* data: {
* failCommands: ["bulkWrite"],
* blockConnection: true,
* blockTimeMS: 1010
* }
* }
* ```
*
* 3. Using `internalClient`, perform a `hello` command and record the `maxBsonObjectSize` and `maxMessageSizeBytes` values
* in the response.
*
* 4. Create a new MongoClient (referred to as `client`) with `timeoutMS=2000`.
*
* 5. Create a list of write models (referred to as `models`) with the following write model repeated
* (`maxMessageSizeBytes / maxBsonObjectSize + 1`) times:
*
* @example
* ```json
* InsertOne {
* "namespace": "db.coll",
* "document": { "a": "b".repeat(maxBsonObjectSize - 500) }
* }
* ```
*
* 6. Call `bulkWrite` on `client` with `models`.
*
* - Expect this to fail with a timeout error.
*
* 7. Verify that two `bulkWrite` commands were executed as part of the `MongoClient.bulkWrite` call.
*/
const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['bulkWrite'],
blockConnection: true,
blockTimeMS: 1010
}
};

let maxBsonObjectSize: number;
let maxMessageSizeBytes: number;

beforeEach(async function () {
await internalClient
.db('db')
.collection('coll')
.drop()
.catch(() => null);
await internalClient.db('admin').command(failpoint);

const hello = await internalClient.db('admin').command({ hello: 1 });
maxBsonObjectSize = hello.maxBsonObjectSize;
maxMessageSizeBytes = hello.maxMessageSizeBytes;

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
const writes = [];
client.on('commandStarted', ev => writes.push(ev));

const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
const models = Array.from({ length }, () => ({
namespace: 'db.coll',
name: 'insertOne' as const,
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
}));

const error = await client.bulkWrite(models).catch(error => error);

expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
}
);
});
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

const skippedSpecs = {
bulkWrite: 'TODO(NODE-6274)',
'change-streams': 'TODO(NODE-6035)',
'convenient-transactions': 'TODO(NODE-5687)',
'deprecated-options': 'TODO(NODE-5689)',
Expand All @@ -19,18 +18,12 @@ const skippedSpecs = {
};

const skippedTests = {
'timeoutMS can be configured on a MongoClient - insertMany on collection': 'TODO(NODE-6274)',
'timeoutMS can be configured on a MongoClient - bulkWrite on collection': 'TODO(NODE-6274)',
'timeoutMS can be configured on a MongoClient - createChangeStream on client': 'TODO(NODE-6305)',
'timeoutMS applies to whole operation, not individual attempts - createChangeStream on client':
'TODO(NODE-6305)',
'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(NODE-6305)',
'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure':
'TODO(NODE-6305)',
'timeoutMS applies to whole operation, not individual attempts - insertMany on collection':
'TODO(NODE-6274)',
'timeoutMS applies to whole operation, not individual attempts - bulkWrite on collection':
'TODO(NODE-6274)',
'command is not sent if RTT is greater than timeoutMS': 'TODO(DRIVERS-2965)',
'Non=tailable cursor iteration timeoutMS is refreshed for getMore if timeoutMode is iteration - failure':
'TODO(DRIVERS-2965)',
Expand Down
14 changes: 5 additions & 9 deletions test/tools/unified-spec-runner/match.ts
Original file line number Diff line number Diff line change
Expand Up @@ -787,15 +787,11 @@ export function expectErrorCheck(
if (expected.isTimeoutError === false) {
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
} else if (expected.isTimeoutError === true) {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
}

// TODO(NODE-6274): Check for MongoBulkWriteErrors that have a MongoOperationTimeoutError in their
// errorResponse field
if (expected.isTimeoutError === false) {
expect(error).to.not.be.instanceof(MongoOperationTimeoutError);
} else if (expected.isTimeoutError === true) {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
if ('errorResponse' in error) {
expect(error.errorResponse).to.be.instanceof(MongoOperationTimeoutError);
} else {
expect(error).to.be.instanceof(MongoOperationTimeoutError);
}
}

if (expected.errorContains != null) {
Expand Down