Skip to content

Commit

Permalink
feat(NODE-2014)!: return executor result from withSession and withTra…
Browse files Browse the repository at this point in the history
…nsaction (#3783)
  • Loading branch information
nbbeeken authored Jul 27, 2023
1 parent 787bdbf commit 65aa288
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 65 deletions.
33 changes: 17 additions & 16 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
}

/** @public */
export type WithSessionCallback = (session: ClientSession) => Promise<any>;
export type WithSessionCallback<T = unknown> = (session: ClientSession) => Promise<T>;

/** @internal */
export interface MongoClientPrivate {
Expand Down Expand Up @@ -605,29 +605,30 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}

/**
* Runs a given operation with an implicitly created session. The lifetime of the session
* will be handled without the need for user interaction.
* A convenience method for creating and handling the clean up of a ClientSession.
* The session will always be ended when the executor finishes.
*
* NOTE: presently the operation MUST return a Promise (either explicit or implicitly as an async function)
*
* @param options - Optional settings for the command
* @param callback - An callback to execute with an implicitly created session
* @param executor - An executor function that all operations using the provided session must be invoked in
* @param options - optional settings for the session
*/
async withSession(callback: WithSessionCallback): Promise<void>;
async withSession(options: ClientSessionOptions, callback: WithSessionCallback): Promise<void>;
async withSession(
optionsOrOperation: ClientSessionOptions | WithSessionCallback,
callback?: WithSessionCallback
): Promise<void> {
async withSession<T = any>(executor: WithSessionCallback<T>): Promise<T>;
async withSession<T = any>(
options: ClientSessionOptions,
executor: WithSessionCallback<T>
): Promise<T>;
async withSession<T = any>(
optionsOrExecutor: ClientSessionOptions | WithSessionCallback<T>,
executor?: WithSessionCallback<T>
): Promise<T> {
const options = {
// Always define an owner
owner: Symbol(),
// If it's an object inherit the options
...(typeof optionsOrOperation === 'object' ? optionsOrOperation : {})
...(typeof optionsOrExecutor === 'object' ? optionsOrExecutor : {})
};

const withSessionCallback =
typeof optionsOrOperation === 'function' ? optionsOrOperation : callback;
typeof optionsOrExecutor === 'function' ? optionsOrExecutor : executor;

if (withSessionCallback == null) {
throw new MongoInvalidArgumentError('Missing required callback parameter');
Expand All @@ -636,7 +637,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
const session = this.startSession(options);

try {
await withSessionCallback(session);
return await withSessionCallback(session);
} finally {
try {
await session.endSession();
Expand Down
66 changes: 34 additions & 32 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export interface ClientSessionOptions {
}

/** @public */
export type WithTransactionCallback<T = void> = (session: ClientSession) => Promise<T>;
export type WithTransactionCallback<T = any> = (session: ClientSession) => Promise<T>;

/** @public */
export type ClientSessionEvents = {
Expand Down Expand Up @@ -432,18 +432,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
}

/**
* Runs a provided callback within a transaction, retrying either the commitTransaction operation
* or entire transaction as needed (and when the error permits) to better ensure that
* the transaction can complete successfully.
* Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
*
* **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
* Any callbacks that do not return a Promise will result in undefined behavior.
*
* @remarks
* This function:
* - Will return the command response from the final commitTransaction if every operation is successful (can be used as a truthy object)
* - Will return `undefined` if the transaction is explicitly aborted with `await session.abortTransaction()`
* - Will throw if one of the operations throws or `throw` statement is used inside the `withTransaction` callback
* - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
* - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
* - If the transaction is manually aborted within the provided function it will not throw.
* - May be called multiple times if the driver needs to attempt to retry the operations.
*
* Checkout a descriptive example here:
* @see https://www.mongodb.com/developer/quickstart/node-transactions/
Expand All @@ -452,7 +450,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
* @param options - optional settings for the transaction
* @returns A raw command response or undefined
*/
async withTransaction<T = void>(
async withTransaction<T = any>(
fn: WithTransactionCallback<T>,
options?: TransactionOptions
): Promise<Document | undefined> {
Expand Down Expand Up @@ -543,25 +541,29 @@ function attemptTransactionCommit<T>(
session: ClientSession,
startTime: number,
fn: WithTransactionCallback<T>,
options?: TransactionOptions
result: any,
options: TransactionOptions
): Promise<T> {
return session.commitTransaction().catch((err: MongoError) => {
if (
err instanceof MongoError &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
!isMaxTimeMSExpiredError(err)
) {
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
return attemptTransactionCommit(session, startTime, fn, options);
}
return session.commitTransaction().then(
() => result,
(err: MongoError) => {
if (
err instanceof MongoError &&
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
!isMaxTimeMSExpiredError(err)
) {
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
return attemptTransactionCommit(session, startTime, fn, result, options);
}

if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
return attemptTransaction(session, startTime, fn, options);
if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
return attemptTransaction(session, startTime, fn, options);
}
}
}

throw err;
});
throw err;
}
);
}

const USER_EXPLICIT_TXN_END_STATES = new Set<TxnState>([
Expand All @@ -574,11 +576,11 @@ function userExplicitlyEndedTransaction(session: ClientSession) {
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
}

function attemptTransaction<TSchema>(
function attemptTransaction<T>(
session: ClientSession,
startTime: number,
fn: WithTransactionCallback<TSchema>,
options?: TransactionOptions
fn: WithTransactionCallback<T>,
options: TransactionOptions = {}
): Promise<any> {
session.startTransaction(options);

Expand All @@ -591,18 +593,18 @@ function attemptTransaction<TSchema>(

if (!isPromiseLike(promise)) {
session.abortTransaction().catch(() => null);
throw new MongoInvalidArgumentError(
'Function provided to `withTransaction` must return a Promise'
return Promise.reject(
new MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise')
);
}

return promise.then(
() => {
result => {
if (userExplicitlyEndedTransaction(session)) {
return;
return result;
}

return attemptTransactionCommit(session, startTime, fn, options);
return attemptTransactionCommit(session, startTime, fn, result, options);
},
err => {
function maybeRetryOrThrow(err: MongoError): Promise<any> {
Expand Down
8 changes: 8 additions & 0 deletions test/integration/sessions/sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ describe('Sessions Spec', function () {

describe('withSession', function () {
let client: MongoClient;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
});
Expand Down Expand Up @@ -184,6 +185,13 @@ describe('Sessions Spec', function () {
expect(client.s.sessionPool.sessions).to.have.length(1);
expect(sessionWasEnded).to.be.true;
});

it('resolves with the value the callback returns', async () => {
const result = await client.withSession(async session => {
return client.db('test').collection('foo').find({}, { session }).toArray();
});
expect(result).to.be.an('array');
});
});

context('unacknowledged writes', () => {
Expand Down
55 changes: 49 additions & 6 deletions test/integration/transactions/transactions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
MongoNetworkError,
type ServerSessionPool
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';

describe('Transactions', function () {
describe('withTransaction', function () {
Expand Down Expand Up @@ -90,33 +91,33 @@ describe('Transactions', function () {
await client.close();
});

it('should return undefined when transaction is aborted explicitly', async () => {
it('returns result of executor when transaction is aborted explicitly', async () => {
const session = client.startSession();

const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
await collection.findOne({ a: 1 }, { session });
await session.abortTransaction();
return 'aborted!';
})
.finally(async () => await session.endSession());

expect(withTransactionResult).to.be.undefined;
expect(withTransactionResult).to.equal('aborted!');
});

it('should return raw command when transaction is successfully committed', async () => {
it('returns result of executor when transaction is successfully committed', async () => {
const session = client.startSession();

const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
await collection.findOne({ a: 1 }, { session });
return 'committed!';
})
.finally(async () => await session.endSession());

expect(withTransactionResult).to.exist;
expect(withTransactionResult).to.be.an('object');
expect(withTransactionResult).to.have.property('ok', 1);
expect(withTransactionResult).to.equal('committed!');
});

it('should throw when transaction is aborted due to an error', async () => {
Expand All @@ -136,6 +137,48 @@ describe('Transactions', function () {
});
}
);

context('when retried', { requires: { mongodb: '>=4.2.0', topology: '!single' } }, () => {
let client: MongoClient;
let collection: Collection<{ a: number }>;

beforeEach(async function () {
client = this.configuration.newClient();

await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 },
data: {
failCommands: ['commitTransaction'],
errorCode: 24,
errorLabels: ['TransientTransactionError'],
closeConnection: false
}
} as FailPoint);

collection = await client.db('withTransaction').createCollection('withTransactionRetry');
});

afterEach(async () => {
await client?.close();
});

it('returns the value of the final call to the executor', async () => {
const session = client.startSession();

let counter = 0;
const withTransactionResult = await session
.withTransaction(async session => {
await collection.insertOne({ a: 1 }, { session });
counter += 1;
return counter;
})
.finally(async () => await session.endSession());

expect(counter).to.equal(3);
expect(withTransactionResult).to.equal(3);
});
});
});

describe('startTransaction', function () {
Expand Down
15 changes: 4 additions & 11 deletions test/tools/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,18 +593,11 @@ operations.set('withTransaction', async ({ entities, operation, client, testConf
maxCommitTimeMS: operation.arguments!.maxCommitTimeMS
};

let errorFromOperations = null;
const result = await session.withTransaction(async () => {
errorFromOperations = await (async () => {
for (const callbackOperation of operation.arguments!.callback) {
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
}
})().catch(error => error);
await session.withTransaction(async () => {
for (const callbackOperation of operation.arguments!.callback) {
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
}
}, options);

if (result == null || errorFromOperations) {
throw errorFromOperations ?? Error('transaction not committed');
}
});

operations.set('countDocuments', async ({ entities, operation }) => {
Expand Down
10 changes: 10 additions & 0 deletions test/types/sessions.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ expectType<ClientSession>(
})
);
expectError(client.startSession({ defaultTransactionOptions: { readConcern: 1 } }));

let something: any;
expectType<number>(await client.withSession(async () => 2));
expectType<string>(await client.withSession<string>(async () => something));
const untypedFn: any = () => 2;
expectType<any>(await client.withSession(untypedFn));
const unknownFn: () => Promise<unknown> = async () => 2;
expectType<unknown>(await client.withSession(unknownFn));
// Not a promise returning function
expectError(await client.withSession(() => null));

0 comments on commit 65aa288

Please sign in to comment.