From 0ed7ce8394b64d23c720d35610071493f31ac6d9 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 30 Sep 2024 22:56:31 +0200 Subject: [PATCH] refactor: build commands with connection --- src/cursor/client_bulk_write_cursor.ts | 25 ++- .../client_bulk_write/client_bulk_write.ts | 39 ++++- .../client_bulk_write/command_builder.ts | 154 ++++++------------ src/operations/client_bulk_write/executor.ts | 78 ++------- src/sdam/server.ts | 3 +- src/sdam/topology_description.ts | 22 --- .../client_bulk_write/command_builder.test.ts | 90 +++++----- 7 files changed, 164 insertions(+), 247 deletions(-) diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts index cd853a4647..06f34dfc52 100644 --- a/src/cursor/client_bulk_write_cursor.ts +++ b/src/cursor/client_bulk_write_cursor.ts @@ -1,8 +1,10 @@ -import type { Document } from '../bson'; +import { type Document } from 'bson'; + import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses'; import { MongoClientBulkWriteCursorError } from '../error'; import type { MongoClient } from '../mongo_client'; import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write'; +import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder'; import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common'; import { executeOperation } from '../operations/execute_operation'; import type { ClientSession } from '../sessions'; @@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions * @internal */ export class ClientBulkWriteCursor extends AbstractCursor { - public readonly command: Document; + commandBuilder: ClientBulkWriteCommandBuilder; /** @internal */ private cursorResponse?: ClientBulkWriteCursorResponse; /** @internal */ private clientBulkWriteOptions: ClientBulkWriteOptions; /** @internal */ - constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) { + constructor( + client: MongoClient, + commandBuilder: ClientBulkWriteCommandBuilder, + options: ClientBulkWriteOptions = {} + ) { super(client, new MongoDBNamespace('admin', '$cmd'), options); - this.command = command; + this.commandBuilder = commandBuilder; this.clientBulkWriteOptions = options; } @@ -49,17 +55,24 @@ export class ClientBulkWriteCursor extends AbstractCursor { ); } + /** + * Get the last set of operations the cursor executed. + */ + get operations(): Document[] { + return this.commandBuilder.lastOperations; + } + clone(): ClientBulkWriteCursor { const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions); delete clonedOptions.session; - return new ClientBulkWriteCursor(this.client, this.command, { + return new ClientBulkWriteCursor(this.client, this.commandBuilder, { ...clonedOptions }); } /** @internal */ async _initialize(session: ClientSession): Promise { - const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, { + const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, { ...this.clientBulkWriteOptions, ...this.cursorOptions, session diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts index cb020bde40..966756394a 100644 --- a/src/operations/client_bulk_write/client_bulk_write.ts +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -1,11 +1,11 @@ -import { type Document } from 'bson'; - +import { ServerType } from '../../beta'; import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; import type { Server } from '../../sdam/server'; import type { ClientSession } from '../../sessions'; import { MongoDBNamespace } from '../../utils'; import { CommandOperation } from '../command'; import { Aspect, defineAspects } from '../operation'; +import { type ClientBulkWriteCommandBuilder } from './command_builder'; import { type ClientBulkWriteOptions } from './common'; /** @@ -13,16 +13,16 @@ import { type ClientBulkWriteOptions } from './common'; * @internal */ export class ClientBulkWriteOperation extends CommandOperation { - command: Document; + commandBuilder: ClientBulkWriteCommandBuilder; override options: ClientBulkWriteOptions; override get commandName() { return 'bulkWrite' as const; } - constructor(command: Document, options: ClientBulkWriteOptions) { + constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) { super(undefined, options); - this.command = command; + this.commandBuilder = commandBuilder; this.options = options; this.ns = new MongoDBNamespace('admin', '$cmd'); } @@ -37,9 +37,34 @@ export class ClientBulkWriteOperation extends CommandOperation { - return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse); + let command; + + if (server.description.type === ServerType.LoadBalancer) { + // Checkout a connection to build the command. + const connection = await server.pool.checkOut(); + command = this.commandBuilder.buildBatch( + connection.hello?.maxMessageSizeBytes, + connection.hello?.maxWriteBatchSize + ); + // Pin the connection to the session so it get used to execute the command and we do not + // perform a double check-in/check-out. + session?.pin(connection); + } else { + // At this point we have a server and the auto connect code has already + // run in executeOperation, so the server description will be populated. + // We can use that to build the command. + command = this.commandBuilder.buildBatch( + server.description.maxMessageSizeBytes, + server.description.maxWriteBatchSize + ); + } + return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse); } } // Skipping the collation as it goes on the individual ops. -defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]); +defineAspects(ClientBulkWriteOperation, [ + Aspect.WRITE_OPERATION, + Aspect.SKIP_COLLATION, + Aspect.CURSOR_CREATING +]); diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 646dd0f254..543a82bf41 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -38,6 +38,8 @@ export class ClientBulkWriteCommandBuilder { models: AnyClientBulkWriteModel[]; options: ClientBulkWriteOptions; pkFactory: PkFactory; + currentModelIndex: number; + lastOperations: Document[]; /** * Create the command builder. @@ -51,6 +53,8 @@ export class ClientBulkWriteCommandBuilder { this.models = models; this.options = options; this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY; + this.currentModelIndex = 0; + this.lastOperations = []; } /** @@ -65,68 +69,55 @@ export class ClientBulkWriteCommandBuilder { } /** - * Build the bulk write commands from the models. + * Determines if there is another batch to process. + * @returns True if not all batches have been built. */ - buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] { - // Iterate the models to build the ops and nsInfo fields. - // We need to do this in a loop which creates one command each up - // to the max bson size or max message size. - const commands: ClientBulkWriteCommand[] = []; - let currentCommandLength = 0; + hasNextBatch(): boolean { + return this.currentModelIndex < this.models.length; + } + + /** + * Build a single batch of a client bulk write command. + * @param maxMessageSizeBytes - The max message size in bytes. + * @param maxWriteBatchSize - The max write batch size. + * @returns The client bulk write command. + */ + buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand { + let commandLength = 0; let currentNamespaceIndex = 0; - let currentCommand: ClientBulkWriteCommand = this.baseCommand(); + const command: ClientBulkWriteCommand = this.baseCommand(); const namespaces = new Map(); - for (const model of this.models) { + while (this.currentModelIndex < this.models.length) { + const model = this.models[this.currentModelIndex]; const ns = model.namespace; - const index = namespaces.get(ns); - - /** - * Convenience function for resetting everything when a new batch - * is started. - */ - const reset = () => { - commands.push(currentCommand); - namespaces.clear(); - currentNamespaceIndex = 0; - currentCommand = this.baseCommand(); - namespaces.set(ns, currentNamespaceIndex); - }; + const nsIndex = namespaces.get(ns); - if (index != null) { - // Pushing to the ops document sequence returns the bytes length added. - const operation = buildOperation(model, index, this.pkFactory); + if (nsIndex != null) { + // Build the operation and serialize it to get the bytes buffer. + const operation = buildOperation(model, nsIndex, this.pkFactory); const operationBuffer = BSON.serialize(operation); - // Check if the operation buffer can fit in the current command. If it can, + // Check if the operation buffer can fit in the command. If it can, // then add the operation to the document sequence and increment the // current length as long as the ops don't exceed the maxWriteBatchSize. if ( - currentCommandLength + operationBuffer.length < maxMessageSizeBytes && - currentCommand.ops.documents.length < maxWriteBatchSize + commandLength + operationBuffer.length < maxMessageSizeBytes && + command.ops.documents.length < maxWriteBatchSize ) { // Pushing to the ops document sequence returns the total byte length of the document sequence. - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer); + commandLength = MESSAGE_OVERHEAD_BYTES + command.ops.push(operation, operationBuffer); + // Increment the builder's current model index. + this.currentModelIndex++; } else { - // We need to batch. Push the current command to the commands - // array and create a new current command. We aslo need to clear the namespaces - // map for the new command. - reset(); - - const nsInfo = { ns: ns }; - const nsInfoBuffer = BSON.serialize(nsInfo); - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + // The operation cannot fit in the current command and will need to + // go in the next batch. Exit the loop and set the last ops. + this.lastOperations = command.ops.documents; + break; } } else { + // The namespace is not already in the nsInfo so we will set it in the map, and + // construct our nsInfo and ops documents and buffers. namespaces.set(ns, currentNamespaceIndex); const nsInfo = { ns: ns }; const nsInfoBuffer = BSON.serialize(nsInfo); @@ -138,68 +129,27 @@ export class ClientBulkWriteCommandBuilder { // sequences and increment the current length as long as the ops don't exceed // the maxWriteBatchSize. if ( - currentCommandLength + nsInfoBuffer.length + operationBuffer.length < - maxMessageSizeBytes && - currentCommand.ops.documents.length < maxWriteBatchSize + commandLength + nsInfoBuffer.length + operationBuffer.length < maxMessageSizeBytes && + command.ops.documents.length < maxWriteBatchSize ) { - currentCommandLength = + // Pushing to the ops document sequence returns the total byte length of the document sequence. + commandLength = MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + command.nsInfo.push(nsInfo, nsInfoBuffer) + + command.ops.push(operation, operationBuffer); + // We've added a new namespace, increment the namespace index. + currentNamespaceIndex++; + // Increment the builder's current model index. + this.currentModelIndex++; } else { - // We need to batch. Push the current command to the commands - // array and create a new current command. Aslo clear the namespaces map. - reset(); - - currentCommandLength = - MESSAGE_OVERHEAD_BYTES + - this.addOperationAndNsInfo( - currentCommand, - operation, - operationBuffer, - nsInfo, - nsInfoBuffer - ); + // The operation cannot fit in the current command and will need to + // go in the next batch. Exit the loop and set the last ops. + this.lastOperations = command.ops.documents; + break; } - // We've added a new namespace, increment the namespace index. - currentNamespaceIndex++; } } - - // After we've finisihed iterating all the models put the last current command - // only if there are operations in it. - if (currentCommand.ops.documents.length > 0) { - commands.push(currentCommand); - } - - return commands; - } - - private addOperation( - command: ClientBulkWriteCommand, - operation: Document, - operationBuffer: Uint8Array - ): number { - // Pushing to the ops document sequence returns the total byte length of the document sequence. - return command.ops.push(operation, operationBuffer); - } - - private addOperationAndNsInfo( - command: ClientBulkWriteCommand, - operation: Document, - operationBuffer: Uint8Array, - nsInfo: Document, - nsInfoBuffer: Uint8Array - ): number { - // Pushing to the nsInfo document sequence returns the total byte length of the document sequence. - const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer); - const opsLength = this.addOperation(command, operation, operationBuffer); - return nsInfoLength + opsLength; + return command; } private baseCommand(): ClientBulkWriteCommand { diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 1c02a42add..c8ec2a2aae 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -1,12 +1,7 @@ -import { type Document } from 'bson'; - import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; -import { MongoClientBulkWriteExecutionError } from '../../error'; import { type MongoClient } from '../../mongo_client'; import { WriteConcern } from '../../write_concern'; -import { executeOperation } from '../execute_operation'; -import { ClientBulkWriteOperation } from './client_bulk_write'; -import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './command_builder'; +import { ClientBulkWriteCommandBuilder } from './command_builder'; import { type AnyClientBulkWriteModel, type ClientBulkWriteOptions, @@ -50,23 +45,6 @@ export class ClientBulkWriteExecutor { * @returns The result. */ async execute(): Promise { - const topologyDescription = this.client.topology?.description; - const maxMessageSizeBytes = topologyDescription?.maxMessageSizeBytes; - const maxWriteBatchSize = topologyDescription?.maxWriteBatchSize; - // If we don't know the maxMessageSizeBytes or for some reason it's 0 - // then we cannot calculate the batch. - if (!maxMessageSizeBytes) { - throw new MongoClientBulkWriteExecutionError( - 'No maxMessageSizeBytes value found - client bulk writes cannot execute without this value set from the monitoring connections.' - ); - } - - if (!maxWriteBatchSize) { - throw new MongoClientBulkWriteExecutionError( - 'No maxWriteBatchSize value found - client bulk writes cannot execute without this value set from the monitoring connections.' - ); - } - // The command builder will take the user provided models and potential split the batch // into multiple commands due to size. const pkFactory = this.client.s.options.pkFactory; @@ -75,47 +53,19 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); - const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); - if (this.options.writeConcern?.w === 0) { - return await executeUnacknowledged(this.client, this.options, commands); + // const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); + // if (this.options.writeConcern?.w === 0) { + const resultsMerger = new ClientBulkWriteResultsMerger(this.options); + // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; + while (commandBuilder.hasNextBatch()) { + const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options); + const docs = await cursor.toArray(); + const operations = cursor.operations; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; } - return await executeAcknowledged(this.client, this.options, commands); - } -} - -/** - * Execute an acknowledged bulk write. - */ -async function executeAcknowledged( - client: MongoClient, - options: ClientBulkWriteOptions, - commands: ClientBulkWriteCommand[] -): Promise { - const resultsMerger = new ClientBulkWriteResultsMerger(options); - // For each command will will create and exhaust a cursor for the results. - let currentBatchOffset = 0; - for (const command of commands) { - const cursor = new ClientBulkWriteCursor(client, command, options); - const docs = await cursor.toArray(); - const operations = command.ops.documents; - resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); - // Set the new batch index so we can back back to the index in the original models. - currentBatchOffset += operations.length; - } - return resultsMerger.result; -} - -/** - * Execute an unacknowledged bulk write. - */ -async function executeUnacknowledged( - client: MongoClient, - options: ClientBulkWriteOptions, - commands: Document[] -): Promise<{ ok: 1 }> { - for (const command of commands) { - const operation = new ClientBulkWriteOperation(command, options); - await executeOperation(client, operation); + return resultsMerger.result; } - return { ok: 1 }; } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index b4450f0072..4c1d37519a 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -513,7 +513,8 @@ function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { 'find' in cmd || 'getMore' in cmd || 'listCollections' in cmd || - 'listIndexes' in cmd + 'listIndexes' in cmd || + 'bulkWrite' in cmd ); } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 3f646975f2..f171423f59 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -43,9 +43,6 @@ export class TopologyDescription { heartbeatFrequencyMS: number; localThresholdMS: number; commonWireVersion: number; - maxMessageSizeBytes?: number; - maxWriteBatchSize?: number; - /** * Create a TopologyDescription */ @@ -73,25 +70,6 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { - // Find the lowest maxMessageSizeBytes from all the servers. - if (this.maxMessageSizeBytes == null) { - this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes; - } else { - this.maxMessageSizeBytes = Math.min( - this.maxMessageSizeBytes, - serverDescription.maxMessageSizeBytes - ); - } - - // Find the lowest maxWriteBatchSize from all the servers. - if (this.maxWriteBatchSize == null) { - this.maxWriteBatchSize = serverDescription.maxWriteBatchSize; - } else { - this.maxWriteBatchSize = Math.min( - this.maxWriteBatchSize, - serverDescription.maxWriteBatchSize - ); - } // Load balancer mode is always compatible. if ( serverDescription.type === ServerType.Unknown || diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index fade57d408..e92966795b 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -20,7 +20,7 @@ import { } from '../../../mongodb'; describe('ClientBulkWriteCommandBuilder', function () { - describe('#buildCommand', function () { + describe('#buildBatch', function () { context('when custom options are provided', function () { const id = new ObjectId(); const model: ClientInsertOneModel = { @@ -34,39 +34,39 @@ describe('ClientBulkWriteCommandBuilder', function () { ordered: false, comment: { bulk: 'write' } }); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the errorsOnly field to the inverse of verboseResults', function () { - expect(commands[0].errorsOnly).to.be.false; + expect(command.errorsOnly).to.be.false; }); it('sets the ordered field', function () { - expect(commands[0].ordered).to.be.false; + expect(command.ordered).to.be.false; }); it('sets the bypassDocumentValidation field', function () { - expect(commands[0].bypassDocumentValidation).to.be.true; + expect(command.bypassDocumentValidation).to.be.true; }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents[0]).to.deep.equal({ insert: 0, document: { _id: id, name: 1 } }); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); }); it('passes comment options into the commands', function () { - expect(commands[0].comment).to.deep.equal({ bulk: 'write' }); + expect(command.comment).to.deep.equal({ bulk: 'write' }); }); }); @@ -79,31 +79,31 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the default errorsOnly field', function () { - expect(commands[0].errorsOnly).to.be.true; + expect(command.errorsOnly).to.be.true; }); it('sets the default ordered field', function () { - expect(commands[0].ordered).to.be.true; + expect(command.ordered).to.be.true; }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents[0]).to.deep.equal({ insert: 0, document: { _id: id, name: 1 } }); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); }); }); @@ -122,14 +122,14 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 1); + const commandOne = builder.buildBatch(48000000, 1); + const commandTwo = builder.buildBatch(48000000, 1); it('splits the operations into multiple commands', function () { - expect(commands.length).to.equal(2); - expect(commands[0].ops.documents).to.deep.equal([ + expect(commandOne.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } } ]); - expect(commands[1].ops.documents).to.deep.equal([ + expect(commandTwo.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); @@ -149,14 +149,14 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(1090, 100000); + const commandOne = builder.buildBatch(1090, 100000); + const commandTwo = builder.buildBatch(1090, 100000); it('splits the operations into multiple commands', function () { - expect(commands.length).to.equal(2); - expect(commands[0].ops.documents).to.deep.equal([ + expect(commandOne.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } } ]); - expect(commands[1].ops.documents).to.deep.equal([ + expect(commandTwo.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); @@ -176,23 +176,23 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([{ ns: 'test.coll' }]); + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([{ ns: 'test.coll' }]); }); }); @@ -210,23 +210,23 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 1, document: { _id: idTwo, name: 2 } } ]); }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([ + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([ { ns: 'test.coll' }, { ns: 'test.coll2' } ]); @@ -253,15 +253,15 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const commands = builder.buildCommands(48000000, 100000); + const command = builder.buildBatch(48000000, 100000); it('sets the bulkWrite command', function () { - expect(commands[0].bulkWrite).to.equal(1); + expect(command.bulkWrite).to.equal(1); }); it('sets the ops document sequence', function () { - expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents).to.deep.equal([ + expect(command.ops).to.be.instanceOf(DocumentSequence); + expect(command.ops.documents).to.deep.equal([ { insert: 0, document: { _id: idOne, name: 1 } }, { insert: 1, document: { _id: idTwo, name: 2 } }, { insert: 0, document: { _id: idThree, name: 2 } } @@ -269,8 +269,8 @@ describe('ClientBulkWriteCommandBuilder', function () { }); it('sets the nsInfo document sequence', function () { - expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); - expect(commands[0].nsInfo.documents).to.deep.equal([ + expect(command.nsInfo).to.be.instanceOf(DocumentSequence); + expect(command.nsInfo.documents).to.deep.equal([ { ns: 'test.coll' }, { ns: 'test.coll2' } ]);