Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6403): Add CSOT support to client bulk write #4261

Merged
merged 6 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
} finally {
timeout.clear();
}
}
return await drainEvent;
Expand Down
1 change: 1 addition & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export function onData(
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
finished = true;
timeoutForSocketRead?.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the cause of the hanging test suites - some CSOT tests spawn huge timeouts that we never clean up.

const doneResult = { value: undefined, done: finished } as const;

for (const promise of unconsumedPromises) {
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export abstract class AbstractCursor<
options.timeoutMode ??
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
} else {
if (options.timeoutMode != null)
if (options.timeoutMode != null && options.timeoutContext == null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}

Expand Down
8 changes: 6 additions & 2 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
options: ClientBulkWriteCursorOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

Expand Down Expand Up @@ -72,7 +72,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
const response = await executeOperation(
this.client,
clientBulkWriteOperation,
this.timeoutContext
);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
Expand Down
16 changes: 14 additions & 2 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import {
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoServerError
} from '../../error';
import { type MongoClient } from '../../mongo_client';
import { TimeoutContext } from '../../timeout';
import { resolveTimeoutOptions } from '../../utils';
import { WriteConcern } from '../../write_concern';
import { executeOperation } from '../execute_operation';
import { ClientBulkWriteOperation } from './client_bulk_write';
Expand Down Expand Up @@ -70,17 +73,26 @@ export class ClientBulkWriteExecutor {
pkFactory
);
// Unacknowledged writes need to execute all batches and return { ok: 1}
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
const context = TimeoutContext.create(resolvedOptions);

if (this.options.writeConcern?.w === 0) {
while (commandBuilder.hasNextBatch()) {
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, context);
}
return { ok: 1 };
} else {
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
while (commandBuilder.hasNextBatch()) {
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
const cursorContext = new CursorTimeoutContext(context, Symbol());
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const options = {
...this.options,
timeoutContext: cursorContext,
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
};
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
try {
await resultsMerger.merge(cursor);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export type ServerEvents = {
EventEmitterWithState;

/** @internal */
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
timeoutContext: TimeoutContext;
};

Expand Down
13 changes: 13 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ServerType } from './sdam/common';
import type { Server } from './sdam/server';
import type { Topology } from './sdam/topology';
import type { ClientSession } from './sessions';
import { type TimeoutContextOptions } from './timeout';
import { WriteConcern } from './write_concern';

/**
Expand Down Expand Up @@ -514,6 +515,18 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
return keys.length > 0 && keys[0][0] === '$';
}

export function resolveTimeoutOptions<T extends Partial<TimeoutContextOptions>>(
client: MongoClient,
options: T
): T &
Pick<
MongoClient['s']['options'],
'timeoutMS' | 'serverSelectionTimeoutMS' | 'waitQueueTimeoutMS' | 'socketTimeoutMS'
> {
const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } =
client.s.options;
return { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS, ...options };
}
/**
* Merge inherited properties from parent into options, prioritizing values from options,
* then values from parent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
promiseWithResolvers,
squashError
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';
import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils';
import { filterForCommands } from '../shared';

// TODO(NODE-5824): Implement CSOT prose tests
describe('CSOT spec prose tests', function () {
Expand Down Expand Up @@ -1183,9 +1184,9 @@ describe('CSOT spec prose tests', function () {
});
});

describe.skip(
describe(
'11. Multi-batch bulkWrites',
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
{ requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } },
function () {
/**
* ### 11. Multi-batch bulkWrites
Expand Down Expand Up @@ -1245,9 +1246,6 @@ describe('CSOT spec prose tests', function () {
}
};

let maxBsonObjectSize: number;
let maxMessageSizeBytes: number;

beforeEach(async function () {
await internalClient
.db('db')
Expand All @@ -1256,29 +1254,20 @@ describe('CSOT spec prose tests', function () {
.catch(() => null);
await internalClient.db('admin').command(failpoint);

const hello = await internalClient.db('admin').command({ hello: 1 });
maxBsonObjectSize = hello.maxBsonObjectSize;
maxMessageSizeBytes = hello.maxMessageSizeBytes;

client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prose test incorporates changes from mongodb/specifications#1672.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify drivers may provide maxWriteBatchSize

where are we providing that value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah in the makeMultiBatchWrite helper, got it

it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
it('performs two bulkWrites which fail to complete before 2000 ms', async function () {
const writes = [];
client.on('commandStarted', ev => writes.push(ev));
client.on('commandStarted', filterForCommands('bulkWrite', writes));
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
const models = Array.from({ length }, () => ({
namespace: 'db.coll',
name: 'insertOne' as const,
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
}));
const models = await makeMultiBatchWrite(this.configuration);

const error = await client.bulkWrite(models).catch(error => error);

expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
expect(writes).to.have.lengthOf(2);
});
}
);
});
16 changes: 10 additions & 6 deletions test/integration/client-side-operations-timeout/node_csot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ describe('CSOT driver tests', metadata, () => {
.stub(Connection.prototype, 'readMany')
.callsFake(async function* (...args) {
const realIterator = readManyStub.wrappedMethod.call(this, ...args);
const cmd = commandSpy.lastCall.args.at(1);
if ('giveMeWriteErrors' in cmd) {
await realIterator.next().catch(() => null); // dismiss response
yield { parse: () => writeErrorsReply };
} else {
yield (await realIterator.next()).value;
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the change on onData, we need to be sure we call finally in the mock too.

const cmd = commandSpy.lastCall.args.at(1);
if ('giveMeWriteErrors' in cmd) {
await realIterator.next().catch(() => null); // dismiss response
yield { parse: () => writeErrorsReply };
} else {
yield (await realIterator.next()).value;
}
} finally {
realIterator.return();
}
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';

import { Collection, type Db, type MongoClient } from '../../mongodb';
import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb';

describe('Collection Management and Db Management', function () {
let client: MongoClient;
Expand All @@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () {
});

it('returns a collection object after calling createCollection', async function () {
const collection = await db.createCollection('collection');
const collection = await db.createCollection(new ObjectId().toHexString());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the test suite twice and this failed the second time - I can remove this if we want but this guarantees we never create the same collection twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests should be more responsible about their side effects, like: returns true after calling dropDatabase is dropping the common default 'test' db. This is fine to leave in, but we could do a bit more improvement in the before/after dept. up to u!

expect(collection).to.be.instanceOf(Collection);
});

Expand Down
Loading