Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6337): implement client bulk write batching #4248

Merged
merged 11 commits into from
Oct 1, 2024
70 changes: 53 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,60 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
// Document sequences starts with type 1 at the first byte.
// Field strings must always be UTF-8.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
if (documents) {
for (const doc of documents) {
this.push(doc, BSON.serialize(doc));
}
}
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @param buffer - The serialized document in raw BSON.
* @returns The new total document sequence length.
*/
push(document: Document, buffer: Uint8Array): number {
this.serializedDocumentsLength += buffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(buffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength + this.header.length;
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +593,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
29 changes: 21 additions & 8 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { Document } from '../bson';
import { type Document } from 'bson';

import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoBulkWriteCursorError } from '../error';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
Expand All @@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions
* @internal
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
commandBuilder: ClientBulkWriteCommandBuilder;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

this.command = command;
this.commandBuilder = commandBuilder;
this.clientBulkWriteOptions = options;
}

Expand All @@ -44,22 +50,29 @@ export class ClientBulkWriteCursor extends AbstractCursor {
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoBulkWriteCursorError(
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}

/**
* Get the last set of operations the cursor executed.
*/
get operations(): Document[] {
return this.commandBuilder.lastOperations;
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
Expand Down
31 changes: 29 additions & 2 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
* @public
* @category Error
*/
export class MongoBulkWriteCursorError extends MongoRuntimeError {
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
Expand All @@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
}

override get name(): string {
return 'MongoBulkWriteCursorError';
return 'MongoClientBulkWriteCursorError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteExecutionError';
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export {
MongoAWSError,
MongoAzureError,
MongoBatchReExecutionError,
MongoBulkWriteCursorError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
45 changes: 38 additions & 7 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import { type Document } from 'bson';

import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteCommandBuilder } from './command_builder';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
command: Document;
commandBuilder: ClientBulkWriteCommandBuilder;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
super(undefined, options);
this.command = command;
this.commandBuilder = commandBuilder;
this.options = options;
this.ns = new MongoDBNamespace('admin', '$cmd');
}
Expand All @@ -37,9 +37,40 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
let command;

if (server.description.type === ServerType.LoadBalancer) {
if (session) {
// Checkout a connection to build the command.
const connection = await server.pool.checkOut();
// Pin the connection to the session so it get used to execute the command and we do not
// perform a double check-in/check-out.
session.pin(connection);
command = this.commandBuilder.buildBatch(
connection.hello?.maxMessageSizeBytes,
connection.hello?.maxWriteBatchSize
);
} else {
throw new MongoClientBulkWriteExecutionError(
'Session provided to the client bulk write operation must be present.'
);
}
} else {
// At this point we have a server and the auto connect code has already
// run in executeOperation, so the server description will be populated.
// We can use that to build the command.
command = this.commandBuilder.buildBatch(
server.description.maxMessageSizeBytes,
server.description.maxWriteBatchSize
);
}
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
}
}

// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
defineAspects(ClientBulkWriteOperation, [
Aspect.WRITE_OPERATION,
Aspect.SKIP_COLLATION,
Aspect.CURSOR_CREATING
]);
Loading