Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6230): executeOperation to use iterative retry mechanism #4157

Merged
merged 14 commits into from
Jun 27, 2024
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
if (this.hasAspect(Aspect.EXPLAINABLE)) {
return this.explain == null;
}
return true;
return super.canRetryWrite;
}

public async executeCommand<T extends MongoDBResponseConstructor>(
Expand Down
280 changes: 143 additions & 137 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand All @@ -46,10 +46,9 @@ type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<
* not provided.
*
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
* - Creates a session if none is provided and cleans up the session it creates
* - Selects a server based on readPreference or various factors
* - Retries an operation if it fails for certain errors, see {@link retryOperation}
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
Expand All @@ -66,23 +65,7 @@ export async function executeOperation<
throw new MongoRuntimeError('This method requires a valid operation instance');
}

if (client.topology == null) {
// Auto connect on operation
if (client.s.hasBeenClosed) {
throw new MongoNotConnectedError('Client must be connected before running operations');
}
client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
try {
await client.connect();
} finally {
delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
}
}

const { topology } = client;
if (topology == null) {
throw new MongoRuntimeError('client.connect did not create a topology but also did not throw');
}
const topology = await autoConnect(client);

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
Expand All @@ -105,17 +88,10 @@ export async function executeOperation<
);
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);

if (
inTransaction &&
Expand All @@ -131,6 +107,84 @@ export async function executeOperation<
session.unpin();
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

try {
return await tryOperation(operation, {
topology,
timeoutContext,
session,
readPreference
});
} finally {
if (session?.owner != null && session.owner === owner) {
await session.endSession();
}
}
}

/**
* Connects a client if it has not yet been connected
* @internal
*/
async function autoConnect(client: MongoClient): Promise<Topology> {
if (client.topology == null) {
if (client.s.hasBeenClosed) {
throw new MongoNotConnectedError('Client must be connected before running operations');
}
client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
try {
await client.connect();
if (client.topology == null) {
throw new MongoRuntimeError(
'client.connect did not create a topology but also did not throw'
);
}
return client.topology;
} finally {
delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
}
}
return client.topology;
}

/** @internal */
type RetryOptions = {
session: ClientSession | undefined;
readPreference: ReadPreference;
topology: Topology;
timeoutContext: TimeoutContext;
};

/**
* Executes an operation and retries as appropriate
* @internal
*
* @remarks
* Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
* Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
*
* This function:
* - performs initial server selection
* - attempts to execute an operation
* - retries the operation if it meets the criteria for a retryable read or a retryable write
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
*
* @param operation - The operation to execute
* */
async function tryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
Expand All @@ -146,31 +200,15 @@ export async function executeOperation<
selector = readPreference;
}

const server = await topology.selectServer(selector, {
let server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
timeoutContext
});

if (session == null) {
// No session also means it is not retryable, early exit
return await operation.execute(server, undefined, timeoutContext);
}

if (!operation.hasAspect(Aspect.RETRYABLE)) {
// non-retryable operation, early exit
try {
return await operation.execute(server, session, timeoutContext);
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
const inTransaction = session?.inTransaction() ?? false;

const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;

Expand All @@ -180,108 +218,76 @@ export async function executeOperation<
supportsRetryableWrites(server) &&
operation.canRetryWrite;

const willRetry = (hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite);
const willRetry =
operation.hasAspect(Aspect.RETRYABLE) &&
session != null &&
((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite));

if (hasWriteAspect && willRetryWrite) {
if (hasWriteAspect && willRetryWrite && session != null) {
operation.options.willRetryWrite = true;
session.incrementTransactionNumber();
}

try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (willRetry && operationError instanceof MongoError) {
return await retryOperation(operation, operationError, {
session,
topology,
selector,
previousServer: server.description,
timeoutContext
});
}
throw operationError;
} finally {
if (session?.owner != null && session.owner === owner) {
try {
await session.endSession();
} catch (error) {
squashError(error);
}
}
}
}
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
const maxTries = willRetry ? 2 : 1;
let previousOperationError: MongoError | undefined;
let previousServer: ServerDescription | undefined;

/** @internal */
type RetryOptions = {
session: ClientSession;
topology: Topology;
selector: ReadPreference | ServerSelector;
previousServer: ServerDescription;
timeoutContext: TimeoutContext;
};
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
});
}

async function retryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
originalError: MongoError,
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
): Promise<TResult> {
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);

if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError
});
}
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;

if (isWriteOperation && !isRetryableWriteError(originalError)) {
throw originalError;
}
if (hasReadAspect && !isRetryableReadError(previousOperationError))
throw previousOperationError;

if (isReadOperation && !isRetryableReadError(originalError)) {
throw originalError;
}
if (
previousOperationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
!session.inTransaction()
) {
session.unpin({ force: true, forceClear: true });
}

if (
originalError instanceof MongoNetworkError &&
session.isPinned &&
!session.inTransaction() &&
operation.hasAspect(Aspect.CURSOR_CREATING)
) {
// If we have a cursor and the initial command fails with a network error,
// we can retry it on another connection. So we need to check it back in, clear the
// pool for the service id, and retry again.
session.unpin({ force: true, forceClear: true });
}
server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer
});

// select a new server, and attempt to retry the operation
const server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
previousServer,
timeoutContext
});
if (hasWriteAspect && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
}

if (isWriteOperation && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
try {
return await operation.execute(server, session, timeoutContext);
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;

try {
return await operation.execute(server, session, timeoutContext);
} catch (retryError) {
if (
retryError instanceof MongoError &&
retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw originalError;
if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw previousOperationError;
}
previousServer = server.description;
previousOperationError = operationError;
}
throw retryError;
}

throw previousOperationError;
}
4 changes: 2 additions & 2 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ export abstract class AbstractOperation<TResult = any> {
}

get canRetryRead(): boolean {
return true;
return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.READ_OPERATION);
}

get canRetryWrite(): boolean {
return true;
return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.WRITE_OPERATION);
}
}

Expand Down