From bf7fed9fe8f6e19f4c6a1b2080c8ad8e17b38b73 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 19 Sep 2024 16:00:55 +0200 Subject: [PATCH 01/11] feat(NODE-6337): implement client bulk write batching --- src/cmap/commands.ts | 76 +++- src/cursor/client_bulk_write_cursor.ts | 4 +- src/error.ts | 31 +- src/index.ts | 3 +- .../client_bulk_write/command_builder.ts | 142 ++++++- src/operations/client_bulk_write/executor.ts | 26 +- .../client_bulk_write/results_merger.ts | 12 +- src/sdam/server_description.ts | 12 + src/sdam/topology_description.ts | 21 + test/integration/crud/crud.prose.test.ts | 131 ++++++ test/unit/cmap/commands.test.ts | 6 +- test/unit/index.test.ts | 3 +- .../client_bulk_write/command_builder.test.ts | 64 ++- .../client_bulk_write/results_merger.test.ts | 402 +++++++++++------- 14 files changed, 737 insertions(+), 196 deletions(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 9322fc5341..e04897eedf 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -429,10 +429,66 @@ export interface OpMsgOptions { /** @internal */ export class DocumentSequence { + field: string; documents: Document[]; + serializedDocumentsLength: number; + private chunks: Uint8Array[]; + private header?: Buffer; - constructor(documents: Document[]) { - this.documents = documents; + /** + * Create a new document sequence for the provided field. + * @param field - The field it will replace. + */ + constructor(field: string, documents?: Document[]) { + this.field = field; + this.documents = []; + this.chunks = []; + this.serializedDocumentsLength = 0; + this.init(); + if (documents) { + for (const doc of documents) { + this.push(doc, BSON.serialize(doc)); + } + } + } + + /** + * Initialize the buffer chunks. + */ + private init() { + // Document sequences starts with type 1 at the first byte. + const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1); + buffer[0] = 1; + // Third part is the field name at offset 5 with trailing null byte. + encodeUTF8Into(buffer, `${this.field}\0`, 5); + this.chunks.push(buffer); + this.header = buffer; + } + + /** + * Push a document to the document sequence. Will serialize the document + * as well and return the current serialized length of all documents. + * @param document - The document to add. + * @param buffer - The serialized document in raw BSON. + * @returns The new totoal document sequence length. + */ + push(document: Document, buffer: Uint8Array): number { + this.serializedDocumentsLength += buffer.length; + // Push the document. + this.documents.push(document); + // Push the document raw bson. + this.chunks.push(buffer); + // Write the new length. + this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1); + return this.serializedDocumentsLength + (this.header?.length ?? 0); + } + + /** + * Get the fully serialized bytes for the document sequence section. + * @returns The section bytes. + */ + toBin(): Uint8Array { + return Buffer.concat(this.chunks); } } @@ -543,21 +599,7 @@ export class OpMsgRequest { const chunks = []; for (const [key, value] of Object.entries(document)) { if (value instanceof DocumentSequence) { - // Document sequences starts with type 1 at the first byte. - const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1); - buffer[0] = 1; - // Third part is the field name at offset 5 with trailing null byte. - encodeUTF8Into(buffer, `${key}\0`, 5); - chunks.push(buffer); - // Fourth part are the documents' bytes. - let docsLength = 0; - for (const doc of value.documents) { - const docBson = this.serializeBson(doc); - docsLength += docBson.length; - chunks.push(docBson); - } - // Second part of the sequence is the length at offset 1; - buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1); + chunks.push(value.toBin()); // Why are we removing the field from the command? This is because it needs to be // removed in the OP_MSG request first section, and DocumentSequence is not a // BSON type and is specific to the MongoDB wire protocol so there's nothing diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index a1ae31fba3..cd853a4647 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -1,6 +1,6 @@ import type { Document } from '../bson'; import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses'; -import { MongoBulkWriteCursorError } from '../error'; +import { MongoClientBulkWriteCursorError } from '../error'; import type { MongoClient } from '../mongo_client'; import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write'; import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common'; @@ -44,7 +44,7 @@ export class ClientBulkWriteCursor extends AbstractCursor { */ get response(): ClientBulkWriteCursorResponse { if (this.cursorResponse) return this.cursorResponse; - throw new MongoBulkWriteCursorError( + throw new MongoClientBulkWriteCursorError( 'No client bulk write cursor response returned from the server.' ); } diff --git a/src/error.ts b/src/error.ts index c9652877cb..4aed6b9314 100644 --- a/src/error.ts +++ b/src/error.ts @@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError { * @public * @category Error */ -export class MongoBulkWriteCursorError extends MongoRuntimeError { +export class MongoClientBulkWriteCursorError extends MongoRuntimeError { /** * **Do not use this constructor!** * @@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError { } override get name(): string { - return 'MongoBulkWriteCursorError'; + return 'MongoClientBulkWriteCursorError'; + } +} + +/** + * An error indicating that an error occurred on the client when executing a client bulk write. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteExecutionError extends MongoRuntimeError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message: string) { + super(message); + } + + override get name(): string { + return 'MongoClientBulkWriteExecutionError'; } } diff --git a/src/index.ts b/src/index.ts index f68dd7699e..97f964ce54 100644 --- a/src/index.ts +++ b/src/index.ts @@ -44,8 +44,9 @@ export { MongoAWSError, MongoAzureError, MongoBatchReExecutionError, - MongoBulkWriteCursorError, MongoChangeStreamError, + MongoClientBulkWriteCursorError, + MongoClientBulkWriteExecutionError, MongoCompatibilityError, MongoCursorExhaustedError, MongoCursorInUseError, diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index ad7ab95360..6b809a08c5 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,4 +1,4 @@ -import { type Document } from '../../bson'; +import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; @@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand { comment?: any; } +/** + * The bytes overhead for the extra fields added post command generation. + */ +const MESSAGE_OVERHEAD_BYTES = 1000; + /** @internal */ export class ClientBulkWriteCommandBuilder { models: AnyClientBulkWriteModel[]; @@ -62,32 +67,148 @@ export class ClientBulkWriteCommandBuilder { /** * Build the bulk write commands from the models. */ - buildCommands(): ClientBulkWriteCommand[] { + buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] { // Iterate the models to build the ops and nsInfo fields. - const operations = []; + // We need to do this in a loop which creates one command each up + // to the max bson size or max message size. + const commands: ClientBulkWriteCommand[] = []; + let currentCommandLength = 0; let currentNamespaceIndex = 0; + let currentCommand: ClientBulkWriteCommand = this.baseCommand(); const namespaces = new Map(); + for (const model of this.models) { const ns = model.namespace; const index = namespaces.get(ns); + + /** + * Convenience function for resetting everything when a new batch + * is started. + */ + const reset = () => { + commands.push(currentCommand); + namespaces.clear(); + currentNamespaceIndex = 0; + currentCommand = this.baseCommand(); + namespaces.set(ns, currentNamespaceIndex); + }; + if (index != null) { - operations.push(buildOperation(model, index, this.pkFactory)); + // Pushing to the ops document sequence returns the bytes length added. + const operation = buildOperation(model, index, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation buffer can fit in the current command. If it can, + // then add the operation to the document sequence and increment the + // current length as long as the ops don't exceed the maxWriteBatchSize. + if ( + currentCommandLength + operationBuffer.length < maxMessageSizeBytes && + currentCommand.ops.documents.length < maxWriteBatchSize + ) { + // Pushing to the ops document sequence returns the bytes length added. + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer); + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command. We aslo need to clear the namespaces + // map for the new command. + reset(); + + const nsInfo = { ns: ns }; + const nsInfoBuffer = BSON.serialize(nsInfo); + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } } else { namespaces.set(ns, currentNamespaceIndex); - operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory)); + const nsInfo = { ns: ns }; + const nsInfoBuffer = BSON.serialize(nsInfo); + const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation and nsInfo buffers can fit in the command. If they + // can, then add the operation and nsInfo to their respective document + // sequences and increment the current length as long as the ops don't exceed + // the maxWriteBatchSize. + if ( + currentCommandLength + nsInfoBuffer.length + operationBuffer.length < + maxMessageSizeBytes && + currentCommand.ops.documents.length < maxWriteBatchSize + ) { + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command. Aslo clear the namespaces map. + reset(); + + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } + // We've added a new namespace, increment the namespace index. currentNamespaceIndex++; } } - const nsInfo = Array.from(namespaces.keys(), ns => ({ ns })); + // After we've finisihed iterating all the models put the last current command + // only if there are operations in it. + if (currentCommand.ops.documents.length > 0) { + commands.push(currentCommand); + } - // The base command. + return commands; + } + + private addOperation( + command: ClientBulkWriteCommand, + operation: Document, + operationBuffer: Uint8Array + ): number { + // Pushing to the ops document sequence returns the bytes length added. + return command.ops.push(operation, operationBuffer); + } + + private addOperationAndNsInfo( + command: ClientBulkWriteCommand, + operation: Document, + operationBuffer: Uint8Array, + nsInfo: Document, + nsInfoBuffer: Uint8Array + ): number { + // Pushing to the nsInfo document sequence returns the bytes length added. + const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer); + const opsLength = this.addOperation(command, operation, operationBuffer); + return nsInfoLength + opsLength; + } + + private baseCommand(): ClientBulkWriteCommand { const command: ClientBulkWriteCommand = { bulkWrite: 1, errorsOnly: this.errorsOnly, ordered: this.options.ordered ?? true, - ops: new DocumentSequence(operations), - nsInfo: new DocumentSequence(nsInfo) + ops: new DocumentSequence('ops'), + nsInfo: new DocumentSequence('nsInfo') }; // Add bypassDocumentValidation if it was present in the options. if (this.options.bypassDocumentValidation != null) { @@ -103,7 +224,8 @@ export class ClientBulkWriteCommandBuilder { if (this.options.comment !== undefined) { command.comment = this.options.comment; } - return [command]; + + return command; } } diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 74511ede9d..1c02a42add 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,6 +1,7 @@ import { type Document } from 'bson'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; +import { MongoClientBulkWriteExecutionError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; @@ -49,6 +50,23 @@ export class ClientBulkWriteExecutor { * @returns The result. */ async execute(): Promise { + const topologyDescription = this.client.topology?.description; + const maxMessageSizeBytes = topologyDescription?.maxMessageSizeBytes; + const maxWriteBatchSize = topologyDescription?.maxWriteBatchSize; + // If we don't know the maxMessageSizeBytes or for some reason it's 0 + // then we cannot calculate the batch. + if (!maxMessageSizeBytes) { + throw new MongoClientBulkWriteExecutionError( + 'No maxMessageSizeBytes value found - client bulk writes cannot execute without this value set from the monitoring connections.' + ); + } + + if (!maxWriteBatchSize) { + throw new MongoClientBulkWriteExecutionError( + 'No maxWriteBatchSize value found - client bulk writes cannot execute without this value set from the monitoring connections.' + ); + } + // The command builder will take the user provided models and potential split the batch // into multiple commands due to size. const pkFactory = this.client.s.options.pkFactory; @@ -57,7 +75,7 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); - const commands = commandBuilder.buildCommands(); + const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); if (this.options.writeConcern?.w === 0) { return await executeUnacknowledged(this.client, this.options, commands); } @@ -75,10 +93,14 @@ async function executeAcknowledged( ): Promise { const resultsMerger = new ClientBulkWriteResultsMerger(options); // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); const docs = await cursor.toArray(); - resultsMerger.merge(command.ops.documents, cursor.response, docs); + const operations = command.ops.documents; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index 48169b93b7..ca5f3f1604 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger { /** * Merge the results in the cursor to the existing result. + * @param currentBatchOffset - The offset index to the original models. * @param response - The cursor response. * @param documents - The documents in the cursor. * @returns The current result. */ merge( + currentBatchOffset: number, operations: Document[], response: ClientBulkWriteCursorResponse, documents: Document[] @@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger { const operation = operations[document.idx]; // Handle insert results. if ('insert' in operation) { - this.result.insertResults?.set(document.idx, { insertedId: operation.document._id }); + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); } // Handle update results. if ('update' in operation) { @@ -80,11 +84,13 @@ export class ClientBulkWriteResultsMerger { if (document.upserted) { result.upsertedId = document.upserted._id; } - this.result.updateResults?.set(document.idx, result); + this.result.updateResults?.set(document.idx + currentBatchOffset, result); } // Handle delete results. if ('delete' in operation) { - this.result.deleteResults?.set(document.idx, { deletedCount: document.n }); + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); } } } diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index cd32f4968b..320a43bc16 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -18,6 +18,12 @@ const DATA_BEARING_SERVER_TYPES = new Set([ ServerType.LoadBalancer ]); +/** Default in case we are in load balanced mode. */ +const MAX_MESSAGE_SIZE_BYTES = 48000000; + +/** Default in case we are in load balanced mode. */ +const MAX_WRITE_BATCH_SIZE = 100000; + /** @public */ export interface TopologyVersion { processId: ObjectId; @@ -69,6 +75,10 @@ export class ServerDescription { setVersion: number | null; electionId: ObjectId | null; logicalSessionTimeoutMinutes: number | null; + /** The max message size in bytes for the server. */ + maxMessageSizeBytes: number; + /** The max number of writes in a bulk write command. */ + maxWriteBatchSize: number; // NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level $clusterTime?: ClusterTime; @@ -111,6 +121,8 @@ export class ServerDescription { this.setVersion = hello?.setVersion ?? null; this.electionId = hello?.electionId ?? null; this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null; + this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? MAX_MESSAGE_SIZE_BYTES; + this.maxWriteBatchSize = hello?.maxWriteBatchSize ?? MAX_WRITE_BATCH_SIZE; this.primary = hello?.primary ?? null; this.me = hello?.me?.toLowerCase() ?? null; this.$clusterTime = hello?.$clusterTime ?? null; diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 436321c7f1..3f646975f2 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -43,6 +43,8 @@ export class TopologyDescription { heartbeatFrequencyMS: number; localThresholdMS: number; commonWireVersion: number; + maxMessageSizeBytes?: number; + maxWriteBatchSize?: number; /** * Create a TopologyDescription @@ -71,6 +73,25 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { + // Find the lowest maxMessageSizeBytes from all the servers. + if (this.maxMessageSizeBytes == null) { + this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes; + } else { + this.maxMessageSizeBytes = Math.min( + this.maxMessageSizeBytes, + serverDescription.maxMessageSizeBytes + ); + } + + // Find the lowest maxWriteBatchSize from all the servers. + if (this.maxWriteBatchSize == null) { + this.maxWriteBatchSize = serverDescription.maxWriteBatchSize; + } else { + this.maxWriteBatchSize = Math.min( + this.maxWriteBatchSize, + serverDescription.maxWriteBatchSize + ); + } // Load balancer mode is always compatible. if ( serverDescription.type === ServerType.Unknown || diff --git a/test/integration/crud/crud.prose.test.ts b/test/integration/crud/crud.prose.test.ts index 3ddc126d33..dbd93bebf9 100644 --- a/test/integration/crud/crud.prose.test.ts +++ b/test/integration/crud/crud.prose.test.ts @@ -3,6 +3,7 @@ import { once } from 'events'; import { type CommandStartedEvent } from '../../../mongodb'; import { + type AnyClientBulkWriteModel, type Collection, MongoBulkWriteError, type MongoClient, @@ -151,6 +152,136 @@ describe('CRUD Prose Spec Tests', () => { }); }); + describe('3. MongoClient.bulkWrite batch splits a writeModels input with greater than maxWriteBatchSize operations', function () { + // Test that MongoClient.bulkWrite properly handles writeModels inputs containing a number of writes greater than + // maxWriteBatchSize. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the maxWriteBatchSize value contained in the response. Then, + // construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": { "a": "b" } + // } + // Construct a list of write models (referred to as models) with model repeated maxWriteBatchSize + 1 times. Execute + // bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult with an + // insertedCount value of maxWriteBatchSize + 1. + // Assert that two CommandStartedEvents (referred to as firstEvent and secondEvent) were observed for the bulkWrite + // command. Assert that the length of firstEvent.command.ops is maxWriteBatchSize. Assert that the length of + // secondEvent.command.ops is 1. If the driver exposes operationIds in its CommandStartedEvents, assert that + // firstEvent.operationId is equal to secondEvent.operationId. + let client: MongoClient; + let maxWriteBatchSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxWriteBatchSize = hello.maxWriteBatchSize; + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: maxWriteBatchSize + 1 }, () => { + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { a: 'b' } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('splits the commands into 2 operations', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const result = await client.bulkWrite(models); + expect(result.insertedCount).to.equal(maxWriteBatchSize + 1); + expect(commands.length).to.equal(2); + expect(commands[0].command.ops.length).to.equal(maxWriteBatchSize); + expect(commands[1].command.ops.length).to.equal(1); + } + }); + }); + + describe('4. MongoClient.bulkWrite batch splits when an ops payload exceeds maxMessageSizeBytes', function () { + // Test that MongoClient.bulkWrite properly handles a writeModels input which constructs an ops array larger + // than maxMessageSizeBytes. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the following values from the response: maxBsonObjectSize + // and maxMessageSizeBytes. Then, construct the following document (referred to as document): + // { + // "a": "b".repeat(maxBsonObjectSize - 500) + // } + // Construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": document + // } + // Use the following calculation to determine the number of inserts that should be provided to + // MongoClient.bulkWrite: maxMessageSizeBytes / maxBsonObjectSize + 1 (referred to as numModels). This number + // ensures that the inserts provided to MongoClient.bulkWrite will require multiple bulkWrite commands to be + // sent to the server. + // Construct as list of write models (referred to as models) with model repeated numModels times. Then execute + // bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult with + // an insertedCount value of numModels. + // Assert that two CommandStartedEvents (referred to as firstEvent and secondEvent) were observed. Assert + // that the length of firstEvent.command.ops is numModels - 1. Assert that the length of secondEvent.command.ops + // is 1. If the driver exposes operationIds in its CommandStartedEvents, assert that firstEvent.operationId is + // equal to secondEvent.operationId. + let client: MongoClient; + let maxBsonObjectSize; + let maxMessageSizeBytes; + let numModels; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + maxMessageSizeBytes = hello.maxMessageSizeBytes; + numModels = Math.floor(maxMessageSizeBytes / maxBsonObjectSize + 1); + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: numModels }, () => { + models.push({ + name: 'insertOne', + namespace: 'db.coll', + document: { + a: 'b'.repeat(maxBsonObjectSize - 500) + } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('splits the commands into 2 operations', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const result = await client.bulkWrite(models); + expect(result.insertedCount).to.equal(numModels); + expect(commands.length).to.equal(2); + expect(commands[0].command.ops.length).to.equal(numModels - 1); + expect(commands[1].command.ops.length).to.equal(1); + } + }); + }); + describe('14. `explain` helpers allow users to specify `maxTimeMS`', function () { let client: MongoClient; const commands: CommandStartedEvent[] = []; diff --git a/test/unit/cmap/commands.test.ts b/test/unit/cmap/commands.test.ts index f4b3fdf025..5725f5b249 100644 --- a/test/unit/cmap/commands.test.ts +++ b/test/unit/cmap/commands.test.ts @@ -15,7 +15,7 @@ describe('commands', function () { context('when there is one document sequence', function () { const command = { test: 1, - field: new DocumentSequence([{ test: 1 }]) + field: new DocumentSequence('field', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); @@ -53,8 +53,8 @@ describe('commands', function () { context('when there are multiple document sequences', function () { const command = { test: 1, - fieldOne: new DocumentSequence([{ test: 1 }]), - fieldTwo: new DocumentSequence([{ test: 1 }]) + fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]), + fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index c8a1406a00..883cc4b4ba 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -69,11 +69,12 @@ const EXPECTED_EXPORTS = [ 'MongoAWSError', 'MongoAzureError', 'MongoBatchReExecutionError', - 'MongoBulkWriteCursorError', 'MongoBulkWriteError', 'MongoChangeStreamError', 'MongoClient', 'MongoClientAuthProviders', + 'MongoClientBulkWriteCursorError', + 'MongoClientBulkWriteExecutionError', 'MongoCompatibilityError', 'MongoCryptAzureKMSRequestError', 'MongoCryptCreateDataKeyError', diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index 6b34ef9a81..fade57d408 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -34,7 +34,7 @@ describe('ClientBulkWriteCommandBuilder', function () { ordered: false, comment: { bulk: 'write' } }); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -79,7 +79,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -108,6 +108,60 @@ describe('ClientBulkWriteCommandBuilder', function () { }); context('when multiple models are provided', function () { + context('when exceeding the max batch size', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); + const modelOne: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idOne, name: 1 } + }; + const modelTwo: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idTwo, name: 2 } + }; + const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); + const commands = builder.buildCommands(48000000, 1); + + it('splits the operations into multiple commands', function () { + expect(commands.length).to.equal(2); + expect(commands[0].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idOne, name: 1 } } + ]); + expect(commands[1].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idTwo, name: 2 } } + ]); + }); + }); + + context('when exceeding the max message size in bytes', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); + const modelOne: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idOne, name: 1 } + }; + const modelTwo: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idTwo, name: 2 } + }; + const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); + const commands = builder.buildCommands(1090, 100000); + + it('splits the operations into multiple commands', function () { + expect(commands.length).to.equal(2); + expect(commands[0].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idOne, name: 1 } } + ]); + expect(commands[1].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idTwo, name: 2 } } + ]); + }); + }); + context('when the namespace is the same', function () { const idOne = new ObjectId(); const idTwo = new ObjectId(); @@ -122,7 +176,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -156,7 +210,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -199,7 +253,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); diff --git a/test/unit/operations/client_bulk_write/results_merger.test.ts b/test/unit/operations/client_bulk_write/results_merger.test.ts index ec43843af6..342502eebb 100644 --- a/test/unit/operations/client_bulk_write/results_merger.test.ts +++ b/test/unit/operations/client_bulk_write/results_merger.test.ts @@ -28,180 +28,282 @@ describe('ClientBulkWriteResultsMerger', function () { describe('#merge', function () { context('when the bulk write is acknowledged', function () { - context('when requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = [ - { ok: 1, idx: 0, n: 1 }, // Insert - { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match - { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert - { ok: 1, idx: 3, n: 1 } // Delete - ]; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on the first batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets insert results', function () { - expect(result.insertResults.get(0).insertedId).to.equal(1); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(0).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets the update results', function () { - expect(result.updateResults.get(1)).to.deep.equal({ - matchedCount: 1, - modifiedCount: 1, - didUpsert: false + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); }); - }); - it('sets the upsert results', function () { - expect(result.updateResults.get(2)).to.deep.equal({ - matchedCount: 0, - modifiedCount: 0, - upsertedId: 1, - didUpsert: true + it('sets the update results', function () { + expect(result.updateResults.get(1)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(2)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); }); - }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(3).deletedCount).to.equal(1); + }); }); + }); + + context('when not requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = []; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets the delete results', function () { - expect(result.deleteResults.get(3).deletedCount).to.equal(1); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets no insert results', function () { + expect(result.insertResults).to.equal(undefined); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets no update results', function () { + expect(result.updateResults).to.equal(undefined); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets no delete results', function () { + expect(result.deleteResults).to.equal(undefined); + }); }); }); }); - context('when not requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = []; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on a later batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(20, operations, response, documents); + }); - it('sets no insert results', function () { - expect(result.insertResults).to.equal(undefined); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(20).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets no update results', function () { - expect(result.updateResults).to.equal(undefined); - }); + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); - }); + it('sets the update results', function () { + expect(result.updateResults.get(21)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(22)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); + }); - it('sets no delete results', function () { - expect(result.deleteResults).to.equal(undefined); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(23).deletedCount).to.equal(1); + }); }); }); }); From 1b2e8de6abfcce8b26e0c13154515af4f3e5a622 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 27 Sep 2024 00:08:44 +0200 Subject: [PATCH 02/11] feat(NODE-6338): implement client bulk write error handling --- src/cmap/wire_protocol/responses.ts | 4 + src/error.ts | 27 +++++ .../client_bulk_write/command_builder.ts | 43 ++++++++ src/operations/client_bulk_write/common.ts | 50 +++++++++ src/operations/client_bulk_write/executor.ts | 49 ++++++++- .../client_bulk_write/results_merger.ts | 104 ++++++++++++------ test/integration/crud/crud.spec.test.ts | 28 +---- test/tools/unified-spec-runner/match.ts | 7 +- 8 files changed, 250 insertions(+), 62 deletions(-) diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 6c166afd61..18afde92e7 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -354,4 +354,8 @@ export class ClientBulkWriteCursorResponse extends CursorResponse { get deletedCount() { return this.get('nDeleted', BSONType.int, true); } + + get writeConcernError() { + return this.get('writeConcernError', BSONType.object, false); + } } diff --git a/src/error.ts b/src/error.ts index 4aed6b9314..4e3679bd9a 100644 --- a/src/error.ts +++ b/src/error.ts @@ -643,6 +643,33 @@ export class MongoClientBulkWriteCursorError extends MongoRuntimeError { } } +/** + * An error indicating that an error occurred when generating a bulk write update. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteUpdateError extends MongoRuntimeError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message: string) { + super(message); + } + + override get name(): string { + return 'MongoClientBulkWriteUpdateError'; + } +} + /** * An error indicating that an error occurred on the client when executing a client bulk write. * diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 6b809a08c5..bd6aee836b 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,5 +1,6 @@ import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; +import { MongoClientBulkWriteUpdateError } from '../../error'; import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import { DEFAULT_PK_FACTORY } from '../../utils'; @@ -343,6 +344,22 @@ export const buildUpdateManyOperation = ( return createUpdateOperation(model, index, true); }; +/** + * Validate the update document. + * @param update - The update document. + */ +function validateUpdate(update: Document) { + const keys = Object.keys(update); + if (keys.length === 0) { + throw new MongoClientBulkWriteUpdateError('Client bulk write update models may not be empty.'); + } + if (!keys[0].startsWith('$')) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write update models must only contain atomic modifiers (start with $).' + ); + } +} + /** * Creates a delete operation based on the parameters. */ @@ -351,6 +368,22 @@ function createUpdateOperation( index: number, multi: boolean ): ClientUpdateOperation { + // Update documents provided in UpdateOne and UpdateMany write models are + // required only to contain atomic modifiers (i.e. keys that start with "$"). + // Drivers MUST throw an error if an update document is empty or if the + // document's first key does not start with "$". + if (Array.isArray(model.update)) { + if (model.update.length === 0) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write update model pipelines may not be empty.' + ); + } + for (const update of model.update) { + validateUpdate(update); + } + } else { + validateUpdate(model.update); + } const document: ClientUpdateOperation = { update: index, multi: multi, @@ -393,6 +426,16 @@ export const buildReplaceOneOperation = ( model: ClientReplaceOneModel, index: number ): ClientReplaceOneOperation => { + const keys = Object.keys(model.replacement); + if (keys.length === 0) { + throw new MongoClientBulkWriteUpdateError('Client bulk write replace models may not be empty.'); + } + if (keys[0].startsWith('$')) { + throw new MongoClientBulkWriteUpdateError( + 'Client bulk write replace models must not contain atomic modifiers (start with $).' + ); + } + const document: ClientReplaceOneOperation = { update: index, multi: false, diff --git a/src/operations/client_bulk_write/common.ts b/src/operations/client_bulk_write/common.ts index c41d971f02..29d3e5e04f 100644 --- a/src/operations/client_bulk_write/common.ts +++ b/src/operations/client_bulk_write/common.ts @@ -1,4 +1,5 @@ import { type Document } from '../../bson'; +import { type ErrorDescription, type MongoRuntimeError, MongoServerError } from '../../error'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import type { CollationOptions, CommandOperationOptions } from '../../operations/command'; import type { Hint } from '../../operations/operation'; @@ -181,6 +182,55 @@ export interface ClientBulkWriteResult { deleteResults?: Map; } +export interface ClientBulkWriteError { + code: number; + message: string; +} + +/** + * An error indicating that an error occurred when executing the bulk write. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteError extends MongoServerError { + /** + * A top-level error that occurred when attempting to communicate with the server or execute + * the bulk write. This value may not be populated if the exception was thrown due to errors + * occurring on individual writes. + */ + error?: MongoRuntimeError; + /** + * Write concern errors that occurred while executing the bulk write. This list may have + * multiple items if more than one server command was required to execute the bulk write. + */ + writeConcernErrors: Document[]; + /** + * Errors that occurred during the execution of individual write operations. This map will + * contain at most one entry if the bulk write was ordered. + */ + writeErrors: Map; + /** + * The results of any successful operations that were performed before the error was + * encountered. + */ + partialResult?: ClientBulkWriteResult; + + /** + * Initialize the client bulk write error. + * @param message - The error message. + */ + constructor(message: ErrorDescription) { + super(message); + this.writeConcernErrors = []; + this.writeErrors = new Map(); + } + + override get name(): string { + return 'MongoClientBulkWriteError'; + } +} + /** @public */ export interface ClientInsertOneResult { /** diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 1c02a42add..0925f7661b 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,7 +1,7 @@ import { type Document } from 'bson'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; -import { MongoClientBulkWriteExecutionError } from '../../error'; +import { MongoClientBulkWriteExecutionError, MongoWriteConcernError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; @@ -10,7 +10,8 @@ import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './co import { type AnyClientBulkWriteModel, type ClientBulkWriteOptions, - type ClientBulkWriteResult + type ClientBulkWriteResult, + MongoClientBulkWriteError } from './common'; import { ClientBulkWriteResultsMerger } from './results_merger'; @@ -34,9 +35,13 @@ export class ClientBulkWriteExecutor { operations: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ) { + if (operations.length === 0) { + throw new MongoClientBulkWriteExecutionError('No client bulk write models were provided.'); + } + this.client = client; this.operations = operations; - this.options = { ...options }; + this.options = { ordered: true, ...options }; // If no write concern was provided, we inherit one from the client. if (!this.options.writeConcern) { @@ -96,12 +101,46 @@ async function executeAcknowledged( let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); - const docs = await cursor.toArray(); + let docs = []; + let writeConcernErrorResult; + try { + docs = await cursor.toArray(); + } catch (error) { + // Write concern errors are recorded in the writeConcernErrors field on MongoClientBulkWriteError. + // When a write concern error is encountered, it should not terminate execution of the bulk write + // for either ordered or unordered bulk writes. However, drivers MUST throw an exception at the end + // of execution if any write concern errors were observed. + if (error instanceof MongoWriteConcernError) { + const result = error.result; + writeConcernErrorResult = { + insertedCount: result.nInserted, + upsertedCount: result.nUpserted, + matchedCount: result.nMatched, + modifiedCount: result.nModified, + deletedCount: result.nDeleted, + writeConcernError: result.writeConcernError + }; + docs = result.cursor.firstBatch; + } else { + throw error; + } + } + // Note if we have a write concern error there will be no cursor response present. + const response = writeConcernErrorResult ?? cursor.response; const operations = command.ops.documents; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + resultsMerger.merge(currentBatchOffset, operations, response, docs); // Set the new batch index so we can back back to the index in the original models. currentBatchOffset += operations.length; } + + if (resultsMerger.writeConcernErrors.length > 0) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client bulk write encountered write concern errors during execution.' + }); + error.writeConcernErrors = resultsMerger.writeConcernErrors; + error.partialResult = resultsMerger.result; + throw error; + } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index ca5f3f1604..34bf615e89 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -5,7 +5,8 @@ import { type ClientBulkWriteResult, type ClientDeleteResult, type ClientInsertOneResult, - type ClientUpdateResult + type ClientUpdateResult, + MongoClientBulkWriteError } from './common'; /** @@ -15,6 +16,7 @@ import { export class ClientBulkWriteResultsMerger { result: ClientBulkWriteResult; options: ClientBulkWriteOptions; + writeConcernErrors: Document[]; /** * Instantiate the merger. @@ -22,6 +24,7 @@ export class ClientBulkWriteResultsMerger { */ constructor(options: ClientBulkWriteOptions) { this.options = options; + this.writeConcernErrors = []; this.result = { insertedCount: 0, upsertedCount: 0, @@ -50,7 +53,7 @@ export class ClientBulkWriteResultsMerger { merge( currentBatchOffset: number, operations: Document[], - response: ClientBulkWriteCursorResponse, + response: ClientBulkWriteCursorResponse | Document, documents: Document[] ): ClientBulkWriteResult { // Update the counts from the cursor response. @@ -60,42 +63,77 @@ export class ClientBulkWriteResultsMerger { this.result.modifiedCount += response.modifiedCount; this.result.deletedCount += response.deletedCount; - if (this.options.verboseResults) { - // Iterate all the documents in the cursor and update the result. - for (const document of documents) { - // Only add to maps if ok: 1 - if (document.ok === 1) { - // Get the corresponding operation from the command. - const operation = operations[document.idx]; - // Handle insert results. - if ('insert' in operation) { - this.result.insertResults?.set(document.idx + currentBatchOffset, { - insertedId: operation.document._id - }); - } - // Handle update results. - if ('update' in operation) { - const result: ClientUpdateResult = { - matchedCount: document.n, - modifiedCount: document.nModified ?? 0, - // Check if the bulk did actually upsert. - didUpsert: document.upserted != null - }; - if (document.upserted) { - result.upsertedId = document.upserted._id; - } - this.result.updateResults?.set(document.idx + currentBatchOffset, result); - } - // Handle delete results. - if ('delete' in operation) { - this.result.deleteResults?.set(document.idx + currentBatchOffset, { - deletedCount: document.n - }); + if (response.writeConcernError) { + this.writeConcernErrors.push({ + code: response.writeConcernError.code, + message: response.writeConcernError.errmsg + }); + } + // Iterate all the documents in the cursor and update the result. + const writeErrors = new Map(); + for (const document of documents) { + // Only add to maps if ok: 1 + if (document.ok === 1 && this.options.verboseResults) { + // Get the corresponding operation from the command. + const operation = operations[document.idx]; + // Handle insert results. + if ('insert' in operation) { + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); + } + // Handle update results. + if ('update' in operation) { + const result: ClientUpdateResult = { + matchedCount: document.n, + modifiedCount: document.nModified ?? 0, + // Check if the bulk did actually upsert. + didUpsert: document.upserted != null + }; + if (document.upserted) { + result.upsertedId = document.upserted._id; } + this.result.updateResults?.set(document.idx + currentBatchOffset, result); + } + // Handle delete results. + if ('delete' in operation) { + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); + } + } else { + // If an individual write error is encountered during an ordered bulk write, drivers MUST + // record the error in writeErrors and immediately throw the exception. Otherwise, drivers + // MUST continue to iterate the results cursor and execute any further bulkWrite batches. + if (this.options.ordered) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client ordered bulk write encountered a write error.' + }); + error.writeErrors.set(document.idx + currentBatchOffset, { + code: document.code, + message: document.errmsg + }); + error.partialResult = this.result; + throw error; + } else { + writeErrors.set(document.idx + currentBatchOffset, { + code: document.code, + message: document.errmsg + }); } } } + // Handle the unordered bulk write errors here. + if (writeErrors.size > 0) { + const error = new MongoClientBulkWriteError({ + message: 'Mongo client unordered bulk write encountered write errors.' + }); + error.writeErrors = writeErrors; + error.partialResult = this.result; + throw error; + } + return this.result; } } diff --git a/test/integration/crud/crud.spec.test.ts b/test/integration/crud/crud.spec.test.ts index a8a0d2987f..5439c77523 100644 --- a/test/integration/crud/crud.spec.test.ts +++ b/test/integration/crud/crud.spec.test.ts @@ -3,22 +3,6 @@ import * as path from 'path'; import { loadSpecTests } from '../../spec/index'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; -const clientBulkWriteTests = new RegExp( - [ - 'client bulkWrite operations support errorResponse assertions', - 'an individual operation fails during an ordered bulkWrite', - 'an individual operation fails during an unordered bulkWrite', - 'detailed results are omitted from error when verboseResults is false', - 'a top-level failure occurs during a bulkWrite', - 'a bulk write with only errors does not report a partial result', - 'an empty list of write models is a client-side error', - 'a write concern error occurs during a bulkWrite', - 'client bulkWrite replaceOne prohibits atomic modifiers', - 'client bulkWrite updateOne requires atomic modifiers', - 'client bulkWrite updateMany requires atomic modifiers' - ].join('|') -); - const unacknowledgedHintTests = [ 'Unacknowledged updateOne with hint document on 4.2+ server', 'Unacknowledged updateOne with hint string on 4.2+ server', @@ -59,13 +43,11 @@ describe('CRUD unified', function () { runUnifiedSuite( loadSpecTests(path.join('crud', 'unified')), ({ description }, { isLoadBalanced }) => { - return description.match(clientBulkWriteTests) - ? 'TODO(NODE-6257): implement client level bulk write' - : unacknowledgedHintTests.includes(description) - ? `TODO(NODE-3541)` - : isLoadBalanced && loadBalancedCollationTests.includes(description) - ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` - : false; + return unacknowledgedHintTests.includes(description) + ? `TODO(NODE-3541)` + : isLoadBalanced && loadBalancedCollationTests.includes(description) + ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` + : false; } ); }); diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index f92004c776..cb1266d006 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -240,6 +240,7 @@ export function resultCheck( } if (typeof actual !== 'object') { + console.log(expected, actual); expect.fail('Expected actual value to be an object'); } @@ -793,7 +794,11 @@ export function expectErrorCheck( } if (expected.expectResult != null) { - resultCheck(error, expected.expectResult as any, entities); + if ('partialResult' in error) { + resultCheck(error.partialResult, expected.expectResult as any, entities); + } else { + resultCheck(error, expected.expectResult as any, entities); + } } if (expected.errorResponse != null) { From a088639da07762c0f487019efa3ccbd5cd8013f6 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 27 Sep 2024 00:12:15 +0200 Subject: [PATCH 03/11] Revert "feat(NODE-6338): implement client bulk write error handling" This reverts commit dff773615fea5ae72b7b6e452d816d1e9c5dd9b3. --- src/cmap/wire_protocol/responses.ts | 4 - src/error.ts | 27 ----- .../client_bulk_write/command_builder.ts | 43 -------- src/operations/client_bulk_write/common.ts | 50 --------- src/operations/client_bulk_write/executor.ts | 49 +-------- .../client_bulk_write/results_merger.ts | 104 ++++++------------ test/integration/crud/crud.spec.test.ts | 28 ++++- test/tools/unified-spec-runner/match.ts | 7 +- 8 files changed, 62 insertions(+), 250 deletions(-) diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index 18afde92e7..6c166afd61 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -354,8 +354,4 @@ export class ClientBulkWriteCursorResponse extends CursorResponse { get deletedCount() { return this.get('nDeleted', BSONType.int, true); } - - get writeConcernError() { - return this.get('writeConcernError', BSONType.object, false); - } } diff --git a/src/error.ts b/src/error.ts index 4e3679bd9a..4aed6b9314 100644 --- a/src/error.ts +++ b/src/error.ts @@ -643,33 +643,6 @@ export class MongoClientBulkWriteCursorError extends MongoRuntimeError { } } -/** - * An error indicating that an error occurred when generating a bulk write update. - * - * @public - * @category Error - */ -export class MongoClientBulkWriteUpdateError extends MongoRuntimeError { - /** - * **Do not use this constructor!** - * - * Meant for internal use only. - * - * @remarks - * This class is only meant to be constructed within the driver. This constructor is - * not subject to semantic versioning compatibility guarantees and may change at any time. - * - * @public - **/ - constructor(message: string) { - super(message); - } - - override get name(): string { - return 'MongoClientBulkWriteUpdateError'; - } -} - /** * An error indicating that an error occurred on the client when executing a client bulk write. * diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index bd6aee836b..6b809a08c5 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,6 +1,5 @@ import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; -import { MongoClientBulkWriteUpdateError } from '../../error'; import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import { DEFAULT_PK_FACTORY } from '../../utils'; @@ -344,22 +343,6 @@ export const buildUpdateManyOperation = ( return createUpdateOperation(model, index, true); }; -/** - * Validate the update document. - * @param update - The update document. - */ -function validateUpdate(update: Document) { - const keys = Object.keys(update); - if (keys.length === 0) { - throw new MongoClientBulkWriteUpdateError('Client bulk write update models may not be empty.'); - } - if (!keys[0].startsWith('$')) { - throw new MongoClientBulkWriteUpdateError( - 'Client bulk write update models must only contain atomic modifiers (start with $).' - ); - } -} - /** * Creates a delete operation based on the parameters. */ @@ -368,22 +351,6 @@ function createUpdateOperation( index: number, multi: boolean ): ClientUpdateOperation { - // Update documents provided in UpdateOne and UpdateMany write models are - // required only to contain atomic modifiers (i.e. keys that start with "$"). - // Drivers MUST throw an error if an update document is empty or if the - // document's first key does not start with "$". - if (Array.isArray(model.update)) { - if (model.update.length === 0) { - throw new MongoClientBulkWriteUpdateError( - 'Client bulk write update model pipelines may not be empty.' - ); - } - for (const update of model.update) { - validateUpdate(update); - } - } else { - validateUpdate(model.update); - } const document: ClientUpdateOperation = { update: index, multi: multi, @@ -426,16 +393,6 @@ export const buildReplaceOneOperation = ( model: ClientReplaceOneModel, index: number ): ClientReplaceOneOperation => { - const keys = Object.keys(model.replacement); - if (keys.length === 0) { - throw new MongoClientBulkWriteUpdateError('Client bulk write replace models may not be empty.'); - } - if (keys[0].startsWith('$')) { - throw new MongoClientBulkWriteUpdateError( - 'Client bulk write replace models must not contain atomic modifiers (start with $).' - ); - } - const document: ClientReplaceOneOperation = { update: index, multi: false, diff --git a/src/operations/client_bulk_write/common.ts b/src/operations/client_bulk_write/common.ts index 29d3e5e04f..c41d971f02 100644 --- a/src/operations/client_bulk_write/common.ts +++ b/src/operations/client_bulk_write/common.ts @@ -1,5 +1,4 @@ import { type Document } from '../../bson'; -import { type ErrorDescription, type MongoRuntimeError, MongoServerError } from '../../error'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; import type { CollationOptions, CommandOperationOptions } from '../../operations/command'; import type { Hint } from '../../operations/operation'; @@ -182,55 +181,6 @@ export interface ClientBulkWriteResult { deleteResults?: Map; } -export interface ClientBulkWriteError { - code: number; - message: string; -} - -/** - * An error indicating that an error occurred when executing the bulk write. - * - * @public - * @category Error - */ -export class MongoClientBulkWriteError extends MongoServerError { - /** - * A top-level error that occurred when attempting to communicate with the server or execute - * the bulk write. This value may not be populated if the exception was thrown due to errors - * occurring on individual writes. - */ - error?: MongoRuntimeError; - /** - * Write concern errors that occurred while executing the bulk write. This list may have - * multiple items if more than one server command was required to execute the bulk write. - */ - writeConcernErrors: Document[]; - /** - * Errors that occurred during the execution of individual write operations. This map will - * contain at most one entry if the bulk write was ordered. - */ - writeErrors: Map; - /** - * The results of any successful operations that were performed before the error was - * encountered. - */ - partialResult?: ClientBulkWriteResult; - - /** - * Initialize the client bulk write error. - * @param message - The error message. - */ - constructor(message: ErrorDescription) { - super(message); - this.writeConcernErrors = []; - this.writeErrors = new Map(); - } - - override get name(): string { - return 'MongoClientBulkWriteError'; - } -} - /** @public */ export interface ClientInsertOneResult { /** diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 0925f7661b..1c02a42add 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,7 +1,7 @@ import { type Document } from 'bson'; import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; -import { MongoClientBulkWriteExecutionError, MongoWriteConcernError } from '../../error'; +import { MongoClientBulkWriteExecutionError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; import { executeOperation } from '../execute_operation'; @@ -10,8 +10,7 @@ import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './co import { type AnyClientBulkWriteModel, type ClientBulkWriteOptions, - type ClientBulkWriteResult, - MongoClientBulkWriteError + type ClientBulkWriteResult } from './common'; import { ClientBulkWriteResultsMerger } from './results_merger'; @@ -35,13 +34,9 @@ export class ClientBulkWriteExecutor { operations: AnyClientBulkWriteModel[], options?: ClientBulkWriteOptions ) { - if (operations.length === 0) { - throw new MongoClientBulkWriteExecutionError('No client bulk write models were provided.'); - } - this.client = client; this.operations = operations; - this.options = { ordered: true, ...options }; + this.options = { ...options }; // If no write concern was provided, we inherit one from the client. if (!this.options.writeConcern) { @@ -101,46 +96,12 @@ async function executeAcknowledged( let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); - let docs = []; - let writeConcernErrorResult; - try { - docs = await cursor.toArray(); - } catch (error) { - // Write concern errors are recorded in the writeConcernErrors field on MongoClientBulkWriteError. - // When a write concern error is encountered, it should not terminate execution of the bulk write - // for either ordered or unordered bulk writes. However, drivers MUST throw an exception at the end - // of execution if any write concern errors were observed. - if (error instanceof MongoWriteConcernError) { - const result = error.result; - writeConcernErrorResult = { - insertedCount: result.nInserted, - upsertedCount: result.nUpserted, - matchedCount: result.nMatched, - modifiedCount: result.nModified, - deletedCount: result.nDeleted, - writeConcernError: result.writeConcernError - }; - docs = result.cursor.firstBatch; - } else { - throw error; - } - } - // Note if we have a write concern error there will be no cursor response present. - const response = writeConcernErrorResult ?? cursor.response; + const docs = await cursor.toArray(); const operations = command.ops.documents; - resultsMerger.merge(currentBatchOffset, operations, response, docs); + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); // Set the new batch index so we can back back to the index in the original models. currentBatchOffset += operations.length; } - - if (resultsMerger.writeConcernErrors.length > 0) { - const error = new MongoClientBulkWriteError({ - message: 'Mongo client bulk write encountered write concern errors during execution.' - }); - error.writeConcernErrors = resultsMerger.writeConcernErrors; - error.partialResult = resultsMerger.result; - throw error; - } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index 34bf615e89..ca5f3f1604 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -5,8 +5,7 @@ import { type ClientBulkWriteResult, type ClientDeleteResult, type ClientInsertOneResult, - type ClientUpdateResult, - MongoClientBulkWriteError + type ClientUpdateResult } from './common'; /** @@ -16,7 +15,6 @@ import { export class ClientBulkWriteResultsMerger { result: ClientBulkWriteResult; options: ClientBulkWriteOptions; - writeConcernErrors: Document[]; /** * Instantiate the merger. @@ -24,7 +22,6 @@ export class ClientBulkWriteResultsMerger { */ constructor(options: ClientBulkWriteOptions) { this.options = options; - this.writeConcernErrors = []; this.result = { insertedCount: 0, upsertedCount: 0, @@ -53,7 +50,7 @@ export class ClientBulkWriteResultsMerger { merge( currentBatchOffset: number, operations: Document[], - response: ClientBulkWriteCursorResponse | Document, + response: ClientBulkWriteCursorResponse, documents: Document[] ): ClientBulkWriteResult { // Update the counts from the cursor response. @@ -63,77 +60,42 @@ export class ClientBulkWriteResultsMerger { this.result.modifiedCount += response.modifiedCount; this.result.deletedCount += response.deletedCount; - if (response.writeConcernError) { - this.writeConcernErrors.push({ - code: response.writeConcernError.code, - message: response.writeConcernError.errmsg - }); - } - // Iterate all the documents in the cursor and update the result. - const writeErrors = new Map(); - for (const document of documents) { - // Only add to maps if ok: 1 - if (document.ok === 1 && this.options.verboseResults) { - // Get the corresponding operation from the command. - const operation = operations[document.idx]; - // Handle insert results. - if ('insert' in operation) { - this.result.insertResults?.set(document.idx + currentBatchOffset, { - insertedId: operation.document._id - }); - } - // Handle update results. - if ('update' in operation) { - const result: ClientUpdateResult = { - matchedCount: document.n, - modifiedCount: document.nModified ?? 0, - // Check if the bulk did actually upsert. - didUpsert: document.upserted != null - }; - if (document.upserted) { - result.upsertedId = document.upserted._id; + if (this.options.verboseResults) { + // Iterate all the documents in the cursor and update the result. + for (const document of documents) { + // Only add to maps if ok: 1 + if (document.ok === 1) { + // Get the corresponding operation from the command. + const operation = operations[document.idx]; + // Handle insert results. + if ('insert' in operation) { + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); + } + // Handle update results. + if ('update' in operation) { + const result: ClientUpdateResult = { + matchedCount: document.n, + modifiedCount: document.nModified ?? 0, + // Check if the bulk did actually upsert. + didUpsert: document.upserted != null + }; + if (document.upserted) { + result.upsertedId = document.upserted._id; + } + this.result.updateResults?.set(document.idx + currentBatchOffset, result); + } + // Handle delete results. + if ('delete' in operation) { + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); } - this.result.updateResults?.set(document.idx + currentBatchOffset, result); - } - // Handle delete results. - if ('delete' in operation) { - this.result.deleteResults?.set(document.idx + currentBatchOffset, { - deletedCount: document.n - }); - } - } else { - // If an individual write error is encountered during an ordered bulk write, drivers MUST - // record the error in writeErrors and immediately throw the exception. Otherwise, drivers - // MUST continue to iterate the results cursor and execute any further bulkWrite batches. - if (this.options.ordered) { - const error = new MongoClientBulkWriteError({ - message: 'Mongo client ordered bulk write encountered a write error.' - }); - error.writeErrors.set(document.idx + currentBatchOffset, { - code: document.code, - message: document.errmsg - }); - error.partialResult = this.result; - throw error; - } else { - writeErrors.set(document.idx + currentBatchOffset, { - code: document.code, - message: document.errmsg - }); } } } - // Handle the unordered bulk write errors here. - if (writeErrors.size > 0) { - const error = new MongoClientBulkWriteError({ - message: 'Mongo client unordered bulk write encountered write errors.' - }); - error.writeErrors = writeErrors; - error.partialResult = this.result; - throw error; - } - return this.result; } } diff --git a/test/integration/crud/crud.spec.test.ts b/test/integration/crud/crud.spec.test.ts index 5439c77523..a8a0d2987f 100644 --- a/test/integration/crud/crud.spec.test.ts +++ b/test/integration/crud/crud.spec.test.ts @@ -3,6 +3,22 @@ import * as path from 'path'; import { loadSpecTests } from '../../spec/index'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; +const clientBulkWriteTests = new RegExp( + [ + 'client bulkWrite operations support errorResponse assertions', + 'an individual operation fails during an ordered bulkWrite', + 'an individual operation fails during an unordered bulkWrite', + 'detailed results are omitted from error when verboseResults is false', + 'a top-level failure occurs during a bulkWrite', + 'a bulk write with only errors does not report a partial result', + 'an empty list of write models is a client-side error', + 'a write concern error occurs during a bulkWrite', + 'client bulkWrite replaceOne prohibits atomic modifiers', + 'client bulkWrite updateOne requires atomic modifiers', + 'client bulkWrite updateMany requires atomic modifiers' + ].join('|') +); + const unacknowledgedHintTests = [ 'Unacknowledged updateOne with hint document on 4.2+ server', 'Unacknowledged updateOne with hint string on 4.2+ server', @@ -43,11 +59,13 @@ describe('CRUD unified', function () { runUnifiedSuite( loadSpecTests(path.join('crud', 'unified')), ({ description }, { isLoadBalanced }) => { - return unacknowledgedHintTests.includes(description) - ? `TODO(NODE-3541)` - : isLoadBalanced && loadBalancedCollationTests.includes(description) - ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` - : false; + return description.match(clientBulkWriteTests) + ? 'TODO(NODE-6257): implement client level bulk write' + : unacknowledgedHintTests.includes(description) + ? `TODO(NODE-3541)` + : isLoadBalanced && loadBalancedCollationTests.includes(description) + ? `TODO(NODE-6280): fix collation for find and modify commands on load balanced mode` + : false; } ); }); diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index cb1266d006..f92004c776 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -240,7 +240,6 @@ export function resultCheck( } if (typeof actual !== 'object') { - console.log(expected, actual); expect.fail('Expected actual value to be an object'); } @@ -794,11 +793,7 @@ export function expectErrorCheck( } if (expected.expectResult != null) { - if ('partialResult' in error) { - resultCheck(error.partialResult, expected.expectResult as any, entities); - } else { - resultCheck(error, expected.expectResult as any, entities); - } + resultCheck(error, expected.expectResult as any, entities); } if (expected.errorResponse != null) { From 55b68b715be0ced904ade806938f3b95206fe026 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 27 Sep 2024 00:27:11 +0200 Subject: [PATCH 04/11] chore: comments --- src/cmap/commands.ts | 22 +++++++------------ .../client_bulk_write/command_builder.ts | 6 ++--- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index e04897eedf..fe23159270 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -433,7 +433,7 @@ export class DocumentSequence { documents: Document[]; serializedDocumentsLength: number; private chunks: Uint8Array[]; - private header?: Buffer; + private header: Buffer; /** * Create a new document sequence for the provided field. @@ -444,25 +444,19 @@ export class DocumentSequence { this.documents = []; this.chunks = []; this.serializedDocumentsLength = 0; - this.init(); - if (documents) { - for (const doc of documents) { - this.push(doc, BSON.serialize(doc)); - } - } - } - - /** - * Initialize the buffer chunks. - */ - private init() { // Document sequences starts with type 1 at the first byte. + // Field strings must always be UTF-8. const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1); buffer[0] = 1; // Third part is the field name at offset 5 with trailing null byte. encodeUTF8Into(buffer, `${this.field}\0`, 5); this.chunks.push(buffer); this.header = buffer; + if (documents) { + for (const doc of documents) { + this.push(doc, BSON.serialize(doc)); + } + } } /** @@ -470,7 +464,7 @@ export class DocumentSequence { * as well and return the current serialized length of all documents. * @param document - The document to add. * @param buffer - The serialized document in raw BSON. - * @returns The new totoal document sequence length. + * @returns The new total document sequence length. */ push(document: Document, buffer: Uint8Array): number { this.serializedDocumentsLength += buffer.length; diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 6b809a08c5..646dd0f254 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -105,7 +105,7 @@ export class ClientBulkWriteCommandBuilder { currentCommandLength + operationBuffer.length < maxMessageSizeBytes && currentCommand.ops.documents.length < maxWriteBatchSize ) { - // Pushing to the ops document sequence returns the bytes length added. + // Pushing to the ops document sequence returns the total byte length of the document sequence. currentCommandLength = MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer); } else { @@ -185,7 +185,7 @@ export class ClientBulkWriteCommandBuilder { operation: Document, operationBuffer: Uint8Array ): number { - // Pushing to the ops document sequence returns the bytes length added. + // Pushing to the ops document sequence returns the total byte length of the document sequence. return command.ops.push(operation, operationBuffer); } @@ -196,7 +196,7 @@ export class ClientBulkWriteCommandBuilder { nsInfo: Document, nsInfoBuffer: Uint8Array ): number { - // Pushing to the nsInfo document sequence returns the bytes length added. + // Pushing to the nsInfo document sequence returns the total byte length of the document sequence. const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer); const opsLength = this.addOperation(command, operation, operationBuffer); return nsInfoLength + opsLength; From d15f7cd5c8f911620421d19c8593c14ae175a587 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 27 Sep 2024 00:28:38 +0200 Subject: [PATCH 05/11] chore: comments --- src/cmap/commands.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index fe23159270..f14c3f5de4 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -474,7 +474,7 @@ export class DocumentSequence { this.chunks.push(buffer); // Write the new length. this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1); - return this.serializedDocumentsLength + (this.header?.length ?? 0); + return this.serializedDocumentsLength + this.header.length; } /** From 0ed7ce8394b64d23c720d35610071493f31ac6d9 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 30 Sep 2024 22:56:31 +0200 Subject: [PATCH 06/11] refactor: build commands with connection --- src/cursor/client_bulk_write_cursor.ts | 25 ++- .../client_bulk_write/client_bulk_write.ts | 39 ++++- .../client_bulk_write/command_builder.ts | 154 ++++++------------ src/operations/client_bulk_write/executor.ts | 78 ++------- src/sdam/server.ts | 3 +- src/sdam/topology_description.ts | 22 --- .../client_bulk_write/command_builder.test.ts | 90 +++++----- 7 files changed, 164 insertions(+), 247 deletions(-) diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index cd853a4647..06f34dfc52 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -1,8 +1,10 @@ -import type { Document } from '../bson'; +import { type Document } from 'bson'; + import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses'; import { MongoClientBulkWriteCursorError } from '../error'; import type { MongoClient } from '../mongo_client'; import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write'; +import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder'; import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common'; import { executeOperation } from '../operations/execute_operation'; import type { ClientSession } from '../sessions'; @@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions * @internal */ export class ClientBulkWriteCursor extends AbstractCursor { - public readonly command: Document; + commandBuilder: ClientBulkWriteCommandBuilder; /** @internal */ private cursorResponse?: ClientBulkWriteCursorResponse; /** @internal */ private clientBulkWriteOptions: ClientBulkWriteOptions; /** @internal */ - constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) { + constructor( + client: MongoClient, + commandBuilder: ClientBulkWriteCommandBuilder, + options: ClientBulkWriteOptions = {} + ) { super(client, new MongoDBNamespace('admin', '$cmd'), options); - this.command = command; + this.commandBuilder = commandBuilder; this.clientBulkWriteOptions = options; } @@ -49,17 +55,24 @@ export class ClientBulkWriteCursor extends AbstractCursor { ); } + /** + * Get the last set of operations the cursor executed. + */ + get operations(): Document[] { + return this.commandBuilder.lastOperations; + } + clone(): ClientBulkWriteCursor { const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions); delete clonedOptions.session; - return new ClientBulkWriteCursor(this.client, this.command, { + return new ClientBulkWriteCursor(this.client, this.commandBuilder, { ...clonedOptions }); } /** @internal */ async _initialize(session: ClientSession): Promise { - const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, { + const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, { ...this.clientBulkWriteOptions, ...this.cursorOptions, session diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index cb020bde40..966756394a 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -1,11 +1,11 @@ -import { type Document } from 'bson'; - +import { ServerType } from '../../beta'; import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { MongoDBNamespace } from '../../utils'; import { CommandOperation } from '../command'; import { Aspect, defineAspects } from '../operation'; +import { type ClientBulkWriteCommandBuilder } from './command_builder'; import { type ClientBulkWriteOptions } from './common'; /** @@ -13,16 +13,16 @@ import { type ClientBulkWriteOptions } from './common'; * @internal */ export class ClientBulkWriteOperation extends CommandOperation { - command: Document; + commandBuilder: ClientBulkWriteCommandBuilder; override options: ClientBulkWriteOptions; override get commandName() { return 'bulkWrite' as const; } - constructor(command: Document, options: ClientBulkWriteOptions) { + constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) { super(undefined, options); - this.command = command; + this.commandBuilder = commandBuilder; this.options = options; this.ns = new MongoDBNamespace('admin', '$cmd'); } @@ -37,9 +37,34 @@ export class ClientBulkWriteOperation extends CommandOperation { - return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse); + let command; + + if (server.description.type === ServerType.LoadBalancer) { + // Checkout a connection to build the command. + const connection = await server.pool.checkOut(); + command = this.commandBuilder.buildBatch( + connection.hello?.maxMessageSizeBytes, + connection.hello?.maxWriteBatchSize + ); + // Pin the connection to the session so it get used to execute the command and we do not + // perform a double check-in/check-out. + session?.pin(connection); + } else { + // At this point we have a server and the auto connect code has already + // run in executeOperation, so the server description will be populated. + // We can use that to build the command. + command = this.commandBuilder.buildBatch( + server.description.maxMessageSizeBytes, + server.description.maxWriteBatchSize + ); + } + return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse); } } // Skipping the collation as it goes on the individual ops. -defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]); +defineAspects(ClientBulkWriteOperation, [ + Aspect.WRITE_OPERATION, + Aspect.SKIP_COLLATION, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 646dd0f254..543a82bf41 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -38,6 +38,8 @@ export class ClientBulkWriteCommandBuilder { models: AnyClientBulkWriteModel[]; options: ClientBulkWriteOptions; pkFactory: PkFactory; + currentModelIndex: number; + lastOperations: Document[]; /** * Create the command builder. @@ -51,6 +53,8 @@ export class ClientBulkWriteCommandBuilder { this.models = models; this.options = options; this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY; + this.currentModelIndex = 0; + this.lastOperations = []; } /** @@ -65,68 +69,55 @@ export class ClientBulkWriteCommandBuilder { } /** - * Build the bulk write commands from the models. + * Determines if there is another batch to process. + * @returns True if not all batches have been built. */ - buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] { - // Iterate the models to build the ops and nsInfo fields. - // We need to do this in a loop which creates one command each up - // to the max bson size or max message size. - const commands: ClientBulkWriteCommand[] = []; - let currentCommandLength = 0; + hasNextBatch(): boolean { + return this.currentModelIndex < this.models.length; + } + + /** + * Build a single batch of a client bulk write command. + * @param maxMessageSizeBytes - The max message size in bytes. + * @param maxWriteBatchSize - The max write batch size. + * @returns The client bulk write command. + */ + buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand { + let commandLength = 0; let currentNamespaceIndex = 0; - let currentCommand: ClientBulkWriteCommand = this.baseCommand(); + const command: ClientBulkWriteCommand = this.baseCommand(); const namespaces = new Map(); - for (const model of this.models) { + while (this.currentModelIndex < this.models.length) { + const model = this.models[this.currentModelIndex]; const ns = model.namespace; - const index = namespaces.get(ns); - - /** - * Convenience function for resetting everything when a new batch - * is started. - */ - const reset = () => { - commands.push(currentCommand); - namespaces.clear(); - currentNamespaceIndex = 0; - currentCommand = this.baseCommand(); - namespaces.set(ns, currentNamespaceIndex); - }; + const nsIndex = namespaces.get(ns); - if (index != null) { - // Pushing to the ops document sequence returns the bytes length added. - const operation = buildOperation(model, index, this.pkFactory); + if (nsIndex != null) { + // Build the operation and serialize it to get the bytes buffer. + const operation = buildOperation(model, nsIndex, this.pkFactory); const operationBuffer = BSON.serialize(operation); - // Check if the operation buffer can fit in the current command. If it can, + // Check if the operation buffer can fit in the command. If it can, // then add the operation to the document sequence and increment the // current length as long as the ops don't exceed the maxWriteBatchSize. if ( - currentCommandLength + operationBuffer.length < maxMessageSizeBytes && - currentCommand.ops.documents.length < maxWriteBatchSize + commandLength + operationBuffer.length < maxMessageSizeBytes && + command.ops.documents.length < maxWriteBatchSize ) { // Pushing to the ops document sequence returns the total byte length of the document sequence. - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer); + commandLength = MESSAGE_OVERHEAD_BYTES + command.ops.push(operation, operationBuffer); + // Increment the builder's current model index. + this.currentModelIndex++; } else { - // We need to batch. Push the current command to the commands - // array and create a new current command. We aslo need to clear the namespaces - // map for the new command. - reset(); - - const nsInfo = { ns: ns }; - const nsInfoBuffer = BSON.serialize(nsInfo); - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + // The operation cannot fit in the current command and will need to + // go in the next batch. Exit the loop and set the last ops. + this.lastOperations = command.ops.documents; + break; } } else { + // The namespace is not already in the nsInfo so we will set it in the map, and + // construct our nsInfo and ops documents and buffers. namespaces.set(ns, currentNamespaceIndex); const nsInfo = { ns: ns }; const nsInfoBuffer = BSON.serialize(nsInfo); @@ -138,68 +129,27 @@ export class ClientBulkWriteCommandBuilder { // sequences and increment the current length as long as the ops don't exceed // the maxWriteBatchSize. if ( - currentCommandLength + nsInfoBuffer.length + operationBuffer.length < - maxMessageSizeBytes && - currentCommand.ops.documents.length < maxWriteBatchSize + commandLength + nsInfoBuffer.length + operationBuffer.length < maxMessageSizeBytes && + command.ops.documents.length < maxWriteBatchSize ) { - currentCommandLength = + // Pushing to the ops document sequence returns the total byte length of the document sequence. + commandLength = MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + command.nsInfo.push(nsInfo, nsInfoBuffer) + + command.ops.push(operation, operationBuffer); + // We've added a new namespace, increment the namespace index. + currentNamespaceIndex++; + // Increment the builder's current model index. + this.currentModelIndex++; } else { - // We need to batch. Push the current command to the commands - // array and create a new current command. Aslo clear the namespaces map. - reset(); - - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + // The operation cannot fit in the current command and will need to + // go in the next batch. Exit the loop and set the last ops. + this.lastOperations = command.ops.documents; + break; } - // We've added a new namespace, increment the namespace index. - currentNamespaceIndex++; } } - - // After we've finisihed iterating all the models put the last current command - // only if there are operations in it. - if (currentCommand.ops.documents.length > 0) { - commands.push(currentCommand); - } - - return commands; - } - - private addOperation( - command: ClientBulkWriteCommand, - operation: Document, - operationBuffer: Uint8Array - ): number { - // Pushing to the ops document sequence returns the total byte length of the document sequence. - return command.ops.push(operation, operationBuffer); - } - - private addOperationAndNsInfo( - command: ClientBulkWriteCommand, - operation: Document, - operationBuffer: Uint8Array, - nsInfo: Document, - nsInfoBuffer: Uint8Array - ): number { - // Pushing to the nsInfo document sequence returns the total byte length of the document sequence. - const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer); - const opsLength = this.addOperation(command, operation, operationBuffer); - return nsInfoLength + opsLength; + return command; } private baseCommand(): ClientBulkWriteCommand { diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 1c02a42add..c8ec2a2aae 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,12 +1,7 @@ -import { type Document } from 'bson'; - import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; -import { MongoClientBulkWriteExecutionError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; -import { executeOperation } from '../execute_operation'; -import { ClientBulkWriteOperation } from './client_bulk_write'; -import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './command_builder'; +import { ClientBulkWriteCommandBuilder } from './command_builder'; import { type AnyClientBulkWriteModel, type ClientBulkWriteOptions, @@ -50,23 +45,6 @@ export class ClientBulkWriteExecutor { * @returns The result. */ async execute(): Promise { - const topologyDescription = this.client.topology?.description; - const maxMessageSizeBytes = topologyDescription?.maxMessageSizeBytes; - const maxWriteBatchSize = topologyDescription?.maxWriteBatchSize; - // If we don't know the maxMessageSizeBytes or for some reason it's 0 - // then we cannot calculate the batch. - if (!maxMessageSizeBytes) { - throw new MongoClientBulkWriteExecutionError( - 'No maxMessageSizeBytes value found - client bulk writes cannot execute without this value set from the monitoring connections.' - ); - } - - if (!maxWriteBatchSize) { - throw new MongoClientBulkWriteExecutionError( - 'No maxWriteBatchSize value found - client bulk writes cannot execute without this value set from the monitoring connections.' - ); - } - // The command builder will take the user provided models and potential split the batch // into multiple commands due to size. const pkFactory = this.client.s.options.pkFactory; @@ -75,47 +53,19 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); - const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); - if (this.options.writeConcern?.w === 0) { - return await executeUnacknowledged(this.client, this.options, commands); + // const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); + // if (this.options.writeConcern?.w === 0) { + const resultsMerger = new ClientBulkWriteResultsMerger(this.options); + // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; + while (commandBuilder.hasNextBatch()) { + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const docs = await cursor.toArray(); + const operations = cursor.operations; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; } - return await executeAcknowledged(this.client, this.options, commands); - } -} - -/** - * Execute an acknowledged bulk write. - */ -async function executeAcknowledged( - client: MongoClient, - options: ClientBulkWriteOptions, - commands: ClientBulkWriteCommand[] -): Promise { - const resultsMerger = new ClientBulkWriteResultsMerger(options); - // For each command will will create and exhaust a cursor for the results. - let currentBatchOffset = 0; - for (const command of commands) { - const cursor = new ClientBulkWriteCursor(client, command, options); - const docs = await cursor.toArray(); - const operations = command.ops.documents; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); - // Set the new batch index so we can back back to the index in the original models. - currentBatchOffset += operations.length; - } - return resultsMerger.result; -} - -/** - * Execute an unacknowledged bulk write. - */ -async function executeUnacknowledged( - client: MongoClient, - options: ClientBulkWriteOptions, - commands: Document[] -): Promise<{ ok: 1 }> { - for (const command of commands) { - const operation = new ClientBulkWriteOperation(command, options); - await executeOperation(client, operation); + return resultsMerger.result; } - return { ok: 1 }; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index b4450f0072..4c1d37519a 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -513,7 +513,8 @@ function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { 'find' in cmd || 'getMore' in cmd || 'listCollections' in cmd || - 'listIndexes' in cmd + 'listIndexes' in cmd || + 'bulkWrite' in cmd ); } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 3f646975f2..f171423f59 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -43,9 +43,6 @@ export class TopologyDescription { heartbeatFrequencyMS: number; localThresholdMS: number; commonWireVersion: number; - maxMessageSizeBytes?: number; - maxWriteBatchSize?: number; - /** * Create a TopologyDescription */ @@ -73,25 +70,6 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { - // Find the lowest maxMessageSizeBytes from all the servers. - if (this.maxMessageSizeBytes == null) { - this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes; - } else { - this.maxMessageSizeBytes = Math.min( - this.maxMessageSizeBytes, - serverDescription.maxMessageSizeBytes - ); - } - - // Find the lowest maxWriteBatchSize from all the servers. - if (this.maxWriteBatchSize == null) { - this.maxWriteBatchSize = serverDescription.maxWriteBatchSize; - } else { - this.maxWriteBatchSize = Math.min( - this.maxWriteBatchSize, - serverDescription.maxWriteBatchSize - ); - } // Load balancer mode is always compatible. if ( serverDescription.type === ServerType.Unknown || diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index fade57d408..e92966795b 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -20,7 +20,7 @@ import { } from '../../../mongodb'; describe('ClientBulkWriteCommandBuilder', function () { - describe('#buildCommand', function () { + describe('#buildBatch', function () { context('when custom options are provided', function () { const id = new ObjectId(); const model: ClientInsertOneModel = { @@ -34,39 +34,39 @@ describe('ClientBulkWriteCommandBuilder', function () { ordered: false, comment: { bulk: 'write' } }); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the errorsOnly field to the inverse of verboseResults', function () { - expect(commands[0].errorsOnly).to.be.false; + expect(command.errorsOnly).to.be.false; }); it('sets the ordered field', function () { - expect(commands[0].ordered).to.be.false; + expect(command.ordered).to.be.false; }); it('sets the bypassDocumentValidation field', function () { - expect(commands[0].bypassDocumentValidation).to.be.true; + expect(command.bypassDocumentValidation).to.be.true; }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents[0]).to.deep.equal({ insert: 0, document: { _id: id, name: 1 } }); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); }); it('passes comment options into the commands', function () { - expect(commands[0].comment).to.deep.equal({ bulk: 'write' }); + expect(command.comment).to.deep.equal({ bulk: 'write' }); }); }); @@ -79,31 +79,31 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the default errorsOnly field', function () { - expect(commands[0].errorsOnly).to.be.true; + expect(command.errorsOnly).to.be.true; }); it('sets the default ordered field', function () { - expect(commands[0].ordered).to.be.true; + expect(command.ordered).to.be.true; }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents[0]).to.deep.equal({ insert: 0, document: { _id: id, name: 1 } }); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); }); }); @@ -122,14 +122,14 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 1); + const commandOne = builder.buildBatch(48000000, 1); + const commandTwo = builder.buildBatch(48000000, 1); it('splits the operations into multiple commands', function () { - expect(commands.length).to.equal(2); - expect(commands[0].ops.documents).to.deep.equal([ + expect(commandOne.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } } ]); - expect(commands[1].ops.documents).to.deep.equal([ + expect(commandTwo.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); @@ -149,14 +149,14 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(1090, 100000); + const commandOne = builder.buildBatch(1090, 100000); + const commandTwo = builder.buildBatch(1090, 100000); it('splits the operations into multiple commands', function () { - expect(commands.length).to.equal(2); - expect(commands[0].ops.documents).to.deep.equal([ + expect(commandOne.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } } ]); - expect(commands[1].ops.documents).to.deep.equal([ + expect(commandTwo.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); @@ -176,23 +176,23 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([{ ns: 'test.coll' }]); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([{ ns: 'test.coll' }]); }); }); @@ -210,23 +210,23 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 1, document: { _id: idTwo, name: 2 } } ]); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([ + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([ { ns: 'test.coll' }, { ns: 'test.coll2' } ]); @@ -253,15 +253,15 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 1, document: { _id: idTwo, name: 2 } }, { insert: 0, document: { _id: idThree, name: 2 } } @@ -269,8 +269,8 @@ describe('ClientBulkWriteCommandBuilder', function () { }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([ + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([ { ns: 'test.coll' }, { ns: 'test.coll2' } ]); From 6b5725f1ac8eff8a1816954122e68703aee9e493 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 1 Oct 2024 00:11:32 +0200 Subject: [PATCH 07/11] fix: last documents setting --- src/operations/client_bulk_write/command_builder.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 543a82bf41..bf1b72b2b2 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -111,8 +111,7 @@ export class ClientBulkWriteCommandBuilder { this.currentModelIndex++; } else { // The operation cannot fit in the current command and will need to - // go in the next batch. Exit the loop and set the last ops. - this.lastOperations = command.ops.documents; + // go in the next batch. Exit the loop. break; } } else { @@ -143,12 +142,13 @@ export class ClientBulkWriteCommandBuilder { this.currentModelIndex++; } else { // The operation cannot fit in the current command and will need to - // go in the next batch. Exit the loop and set the last ops. - this.lastOperations = command.ops.documents; + // go in the next batch. Exit the loop. break; } } } + // Set the last operations and return the command. + this.lastOperations = command.ops.documents; return command; } From 9532f5657c78d2d1dd4414e32380a6cea50fd1ef Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 1 Oct 2024 00:45:23 +0200 Subject: [PATCH 08/11] fix: unacknowledged bulk --- src/operations/client_bulk_write/executor.ts | 35 ++++++++++++-------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index c8ec2a2aae..5baf1ed6b6 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,6 +1,8 @@ import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; +import { executeOperation } from '../execute_operation'; +import { ClientBulkWriteOperation } from './client_bulk_write'; import { ClientBulkWriteCommandBuilder } from './command_builder'; import { type AnyClientBulkWriteModel, @@ -53,19 +55,26 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); - // const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); - // if (this.options.writeConcern?.w === 0) { - const resultsMerger = new ClientBulkWriteResultsMerger(this.options); - // For each command will will create and exhaust a cursor for the results. - let currentBatchOffset = 0; - while (commandBuilder.hasNextBatch()) { - const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); - const docs = await cursor.toArray(); - const operations = cursor.operations; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); - // Set the new batch index so we can back back to the index in the original models. - currentBatchOffset += operations.length; + // Unacknowledged writes need to execute all batches and return { ok: 1} + if (this.options.writeConcern?.w === 0) { + while (commandBuilder.hasNextBatch()) { + const operation = new ClientBulkWriteOperation(commandBuilder, this.options); + await executeOperation(this.client, operation); + } + return { ok: 1 }; + } else { + const resultsMerger = new ClientBulkWriteResultsMerger(this.options); + // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; + while (commandBuilder.hasNextBatch()) { + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const docs = await cursor.toArray(); + const operations = cursor.operations; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; + } + return resultsMerger.result; } - return resultsMerger.result; } } From de4a82ec9f986e6f0af94d7bd01da1e0e114a27e Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 1 Oct 2024 16:36:30 +0200 Subject: [PATCH 09/11] chore: comments --- .../client_bulk_write/client_bulk_write.ts | 26 +- test/integration/crud/crud.prose.test.ts | 291 ++++++++++++++++++ 2 files changed, 307 insertions(+), 10 deletions(-) diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index 966756394a..1cec238590 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -1,4 +1,4 @@ -import { ServerType } from '../../beta'; +import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta'; import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; @@ -40,15 +40,21 @@ export class ClientBulkWriteOperation extends CommandOperation { }); }); + describe('7. MongoClient.bulkWrite handles a cursor requiring a getMore', function () { + // Test that MongoClient.bulkWrite properly iterates the results cursor when getMore is required. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe + // CommandStartedEvents. Perform a hello command using client and record the maxBsonObjectSize value from the response. + // Construct a MongoCollection (referred to as collection) with the namespace "db.coll" (referred to as namespace). + // Drop collection. Then create the following list of write models (referred to as models): + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "a".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "b".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // Execute bulkWrite on client with models and verboseResults set to true. Assert that the bulk write succeeds and returns a BulkWriteResult (referred to as result). + // Assert that result.upsertedCount is equal to 2. + // Assert that the length of result.updateResults is equal to 2. + // Assert that a CommandStartedEvent was observed for the getMore command. + let client: MongoClient; + let maxBsonObjectSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + + client.on('commandStarted', filterForCommands('getMore', commands)); + commands.length = 0; + + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'a'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'b'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('handles a getMore on the results', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const result = await client.bulkWrite(models, { verboseResults: true }); + expect(result.upsertedCount).to.equal(2); + expect(result.updateResults.size).to.equal(2); + expect(commands.length).to.equal(1); + } + }); + }); + + describe('8. MongoClient.bulkWrite handles a cursor requiring getMore within a transaction', function () { + // Test that MongoClient.bulkWrite executed within a transaction properly iterates the results + // cursor when getMore is required. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // This test must not be run against standalone servers. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe + // CommandStartedEvents. Perform a hello command using client and record the maxBsonObjectSize value from the response. + // Construct a MongoCollection (referred to as collection) with the namespace "db.coll" (referred to as namespace). Drop collection. + // Start a session on client (referred to as session). Start a transaction on session. + // Create the following list of write models (referred to as models): + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "a".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // UpdateOne { + // "namespace": namespace, + // "filter": { "_id": "b".repeat(maxBsonObjectSize / 2) }, + // "update": { "$set": { "x": 1 } }, + // "upsert": true + // }, + // Execute bulkWrite on client with models, session, and verboseResults set to true. Assert that the bulk + // write succeeds and returns a BulkWriteResult (referred to as result). + // Assert that result.upsertedCount is equal to 2. + // Assert that the length of result.updateResults is equal to 2. + // Assert that a CommandStartedEvent was observed for the getMore command. + let client: MongoClient; + let session: ClientSession; + let maxBsonObjectSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + + client.on('commandStarted', filterForCommands('getMore', commands)); + commands.length = 0; + + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'a'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + models.push({ + name: 'updateOne', + namespace: 'db.coll', + filter: { _id: 'b'.repeat(maxBsonObjectSize / 2) }, + update: { $set: { x: 1 } }, + upsert: true + }); + + session = client.startSession(); + session.startTransaction(); + }); + + afterEach(async function () { + await session.endSession(); + await client.close(); + }); + + it('handles a getMore on the results in a transaction', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid', topology: '!single' } }, + async test() { + const result = await client.bulkWrite(models, { verboseResults: true, session }); + expect(result.upsertedCount).to.equal(2); + expect(result.updateResults.size).to.equal(2); + expect(commands.length).to.equal(1); + } + }); + }); + + describe('11. MongoClient.bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size', function () { + // Test that MongoClient.bulkWrite batch splits a bulk write when the addition of a new namespace to nsInfo causes the size + // of the message to exceed maxMessageSizeBytes - 1000. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Repeat the following setup for each test case: + // Setup + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. Perform + // a hello command using client and record the following values from the response: maxBsonObjectSize and maxMessageSizeBytes. + // Calculate the following values: + // opsBytes = maxMessageSizeBytes - 1122 + // numModels = opsBytes / maxBsonObjectSize + // remainderBytes = opsBytes % maxBsonObjectSize + // Construct the following write model (referred to as firstModel): + // InsertOne { + // "namespace": "db.coll", + // "document": { "a": "b".repeat(maxBsonObjectSize - 57) } + // } + // Create a list of write models (referred to as models) with firstModel repeated numModels times. + // If remainderBytes is greater than or equal to 217, add 1 to numModels and append the following write model to models: + // InsertOne { + // "namespace": "db.coll", + // "document": { "a": "b".repeat(remainderBytes - 57) } + // } + // Then perform the following two tests: + let client: MongoClient; + let maxBsonObjectSize; + let maxMessageSizeBytes; + let opsBytes; + let numModels; + let remainderBytes; + let models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + await client.db('db').collection('coll').drop(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + maxMessageSizeBytes = hello.maxMessageSizeBytes; + opsBytes = maxMessageSizeBytes - 1122; + numModels = Math.floor(opsBytes / maxBsonObjectSize); + remainderBytes = opsBytes % maxBsonObjectSize; + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + models = []; + + Array.from({ length: numModels }, () => { + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { a: 'b'.repeat(maxBsonObjectSize - 57) } + }); + }); + + if (remainderBytes >= 217) { + numModels++; + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { a: 'b'.repeat(remainderBytes - 57) } + }); + } + }); + + afterEach(async function () { + await client.close(); + }); + + context('when no batch splitting is required', function () { + // Case 1: No batch-splitting required + // Create the following write model (referred to as sameNamespaceModel): + // InsertOne { + // "namespace": "db.coll", + // "document": { "a": "b" } + // } + // Append sameNamespaceModel to models. + // Execute bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult (referred to as result). + // Assert that result.insertedCount is equal to numModels + 1. + // Assert that one CommandStartedEvent was observed for the bulkWrite command (referred to as event). + // Assert that the length of event.command.ops is numModels + 1. Assert that the length of event.command.nsInfo is 1. + // Assert that the namespace contained in event.command.nsInfo is "db.coll". + it('executes in a single batch', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const sameNamespaceModel: AnyClientBulkWriteModel = { + name: 'insertOne', + namespace: 'db.coll', + document: { a: 'b' } + }; + const testModels = models.concat([sameNamespaceModel]); + const result = await client.bulkWrite(testModels); + expect(result.insertedCount).to.equal(numModels + 1); + expect(commands.length).to.equal(1); + expect(commands[0].command.ops.length).to.equal(numModels + 1); + expect(commands[0].command.nsInfo.length).to.equal(1); + expect(commands[0].command.nsInfo[0].ns).to.equal('db.coll'); + } + }); + }); + + context('when batch splitting is required', function () { + // Case 2: Batch-splitting required + // Construct the following namespace (referred to as namespace): + // "db." + "c".repeat(200) + // Create the following write model (referred to as newNamespaceModel): + // InsertOne { + // "namespace": namespace, + // "document": { "a": "b" } + // } + // Append newNamespaceModel to models. + // Execute bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult (referred to as result). + // Assert that result.insertedCount is equal to numModels + 1. + // Assert that two CommandStartedEvents were observed for the bulkWrite command (referred to as firstEvent and secondEvent). + // Assert that the length of firstEvent.command.ops is equal to numModels. Assert that the length of firstEvent.command.nsInfo + // is equal to 1. Assert that the namespace contained in firstEvent.command.nsInfo is "db.coll". + // Assert that the length of secondEvent.command.ops is equal to 1. Assert that the length of secondEvent.command.nsInfo + // is equal to 1. Assert that the namespace contained in secondEvent.command.nsInfo is namespace. + it('executes in multiple batches', { + metadata: { requires: { mongodb: '>=8.0.0', serverless: 'forbid' } }, + async test() { + const namespace = `db.${'c'.repeat(200)}`; + const newNamespaceModel: AnyClientBulkWriteModel = { + name: 'insertOne', + namespace: namespace, + document: { a: 'b' } + }; + const testModels = models.concat([newNamespaceModel]); + const result = await client.bulkWrite(testModels); + expect(result.insertedCount).to.equal(numModels + 1); + expect(commands.length).to.equal(2); + expect(commands[0].command.ops.length).to.equal(numModels); + expect(commands[0].command.nsInfo.length).to.equal(1); + expect(commands[0].command.nsInfo[0].ns).to.equal('db.coll'); + expect(commands[1].command.ops.length).to.equal(1); + expect(commands[1].command.nsInfo.length).to.equal(1); + expect(commands[1].command.nsInfo[0].ns).to.equal(namespace); + } + }); + }); + }); + describe('14. `explain` helpers allow users to specify `maxTimeMS`', function () { let client: MongoClient; const commands: CommandStartedEvent[] = []; From b76b143bf6e7eebc916069e024da2e6a3d82f81f Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 1 Oct 2024 16:39:38 +0200 Subject: [PATCH 10/11] chore: remove defaults --- src/sdam/server_description.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 320a43bc16..4817e012a0 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -18,12 +18,6 @@ const DATA_BEARING_SERVER_TYPES = new Set([ ServerType.LoadBalancer ]); -/** Default in case we are in load balanced mode. */ -const MAX_MESSAGE_SIZE_BYTES = 48000000; - -/** Default in case we are in load balanced mode. */ -const MAX_WRITE_BATCH_SIZE = 100000; - /** @public */ export interface TopologyVersion { processId: ObjectId; @@ -79,6 +73,8 @@ export class ServerDescription { maxMessageSizeBytes: number; /** The max number of writes in a bulk write command. */ maxWriteBatchSize: number; + /** The max bson object size. */ + maxBsonObjectSize: number; // NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level $clusterTime?: ClusterTime; @@ -121,8 +117,9 @@ export class ServerDescription { this.setVersion = hello?.setVersion ?? null; this.electionId = hello?.electionId ?? null; this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null; - this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? MAX_MESSAGE_SIZE_BYTES; - this.maxWriteBatchSize = hello?.maxWriteBatchSize ?? MAX_WRITE_BATCH_SIZE; + this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? 0; + this.maxWriteBatchSize = hello?.maxWriteBatchSize ?? 0; + this.maxBsonObjectSize = hello?.maxBsonObjectSize ?? 0; this.primary = hello?.primary ?? null; this.me = hello?.me?.toLowerCase() ?? null; this.$clusterTime = hello?.$clusterTime ?? null; From 884e6b766dc866dbc7e387c1b217cb6e29d5d28b Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 1 Oct 2024 21:14:47 +0200 Subject: [PATCH 11/11] chore: update server description --- .../client_bulk_write/client_bulk_write.ts | 5 +++++ src/sdam/server_description.ts | 12 ++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index 1cec238590..b04c978114 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -59,6 +59,11 @@ export class ClientBulkWriteOperation extends CommandOperation