Skip to content

Commit

Permalink
refactor(NODE-6398): bulkWrite internals to use async/await (#4252)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Oct 9, 2024
1 parent 91f3035 commit 8785132
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 136 deletions.
200 changes: 80 additions & 120 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
MongoBatchReExecutionError,
MONGODB_ERROR_CODES,
MongoInvalidArgumentError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand All @@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import {
applyRetryableWrites,
type Callback,
getTopology,
hasAtomicOperators,
maybeAddIdToDocuments,
Expand Down Expand Up @@ -500,86 +498,46 @@ export function mergeBatchResults(
}
}

function executeCommands(
async function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions,
callback: Callback<BulkWriteResult>
) {
options: BulkWriteOptions
): Promise<BulkWriteResult> {
if (bulkOperation.s.batches.length === 0) {
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
}

const batch = bulkOperation.s.batches.shift() as Batch;
for (const batch of bulkOperation.s.batches) {
const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

function resultHandler(err?: AnyError, result?: Document) {
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
executeCommands(bulkOperation, options, callback);
}

const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});

if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}

// Set an operationIf if provided
if (bulkOperation.operationId) {
resultHandler.operationId = bulkOperation.operationId;
}

// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}

// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.multi);
}

if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
}
}
}

try {
const operation = isInsertBatch(batch)
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: isUpdateBatch(batch)
Expand All @@ -588,39 +546,50 @@ function executeCommands(
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
: null;

if (operation != null) {
executeOperation(bulkOperation.s.collection.client, operation).then(
result => resultHandler(undefined, result),
error => resultHandler(error)
);
if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`);

let thrownError = null;
let result;
try {
result = await executeOperation(bulkOperation.s.collection.client, operation);
} catch (error) {
thrownError = error;
}

if (thrownError != null) {
if (thrownError instanceof MongoWriteConcernError) {
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
const writeResult = new BulkWriteResult(
bulkOperation.s.bulkResult,
bulkOperation.isOrdered
);

throw new MongoBulkWriteError(
{
message: thrownError.result.writeConcernError.errmsg,
code: thrownError.result.writeConcernError.code
},
writeResult
);
} else {
// Error is a driver related error not a bulk op error, return early
throw new MongoBulkWriteError(
thrownError,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}
}
} catch (err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
callback();

mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
}
}

function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
mergeBatchResults(batch, bulkResult, undefined, err.result);

callback(
new MongoBulkWriteError(
{
message: err.result.writeConcernError.errmsg,
code: err.result.writeConcernError.code
},
new BulkWriteResult(bulkResult, isOrdered)
)
);
bulkOperation.s.batches.length = 0;

const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
bulkOperation.handleWriteError(writeResult);
return writeResult;
}

/**
Expand Down Expand Up @@ -875,8 +844,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
let?: Document;
}

const executeCommandsAsync = promisify(executeCommands);

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
Expand All @@ -895,15 +862,15 @@ export class BulkWriteShimOperation extends AbstractOperation {
return 'bulkWrite' as const;
}

execute(_server: Server, session: ClientSession | undefined): Promise<any> {
async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommandsAsync(this.bulkOperation, this.options);
return await executeCommands(this.bulkOperation, this.options);
}
}

Expand Down Expand Up @@ -1239,33 +1206,26 @@ export abstract class BulkOperationBase {
* Handles the write error before executing commands
* @internal
*/
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.bulkResult.writeErrors.length > 0) {
const msg = this.s.bulkResult.writeErrors[0].errmsg
? this.s.bulkResult.writeErrors[0].errmsg
: 'write operation failed';

callback(
new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
)
throw new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
);

return true;
}

const writeConcernError = writeResult.getWriteConcernError();
if (writeConcernError) {
callback(new MongoBulkWriteError(writeConcernError, writeResult));
return true;
throw new MongoBulkWriteError(writeConcernError, writeResult);
}

return false;
}

abstract addToOperationsList(
Expand Down
7 changes: 3 additions & 4 deletions src/bulk/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { Collection } from '../collection';
import { MongoInvalidArgumentError } from '../error';
import type { DeleteStatement } from '../operations/delete';
import type { UpdateStatement } from '../operations/update';
import { type Callback } from '../utils';
import {
Batch,
BatchType,
Expand All @@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase {
super(collection, options, false);
}

override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean {
override handleWriteError(writeResult: BulkWriteResult): void {
if (this.s.batches.length) {
return false;
return;
}

return super.handleWriteError(callback, writeResult);
return super.handleWriteError(writeResult);
}

addToOperationsList(
Expand Down
22 changes: 10 additions & 12 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
Collection,
CommandFailedEvent,
CommandSucceededEvent,
MongoBulkWriteError,
type MongoClient,
MongoError,
MongoServerError,
ObjectId,
ReturnDocument
Expand Down Expand Up @@ -1093,22 +1093,16 @@ describe('CRUD API', function () {
}
});

it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }
},

test: async function () {
describe('when performing a multi-batch unordered bulk write that has a duplicate key', function () {
it('throws a MongoBulkWriteError indicating the duplicate key document failed', async function () {
const ops = [];
// Create a set of operations that go over the 1000 limit causing two messages
let i = 0;
for (; i < 1005; i++) {
ops.push({ insertOne: { _id: i, a: i } });
}

ops.push({ insertOne: { _id: 0, a: i } });
ops[500] = { insertOne: { _id: 0, a: i } };

const db = client.db();

Expand All @@ -1117,8 +1111,12 @@ describe('CRUD API', function () {
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
.catch(error => error);

expect(error).to.be.instanceOf(MongoError);
}
expect(error).to.be.instanceOf(MongoBulkWriteError);
// 1004 because one of them is duplicate key
// but since it is unordered we continued to write
expect(error).to.have.property('insertedCount', 1004);
expect(error.writeErrors[0]).to.have.nested.property('err.index', 500);
});
});

it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', {
Expand Down

0 comments on commit 8785132

Please sign in to comment.