Skip to content

Commit

Permalink
refactor(NODE-6230): executeOperation to use iterative retry mechanism (
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed Jul 8, 2024
1 parent 2207e49 commit 58d4e7b
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand Down Expand Up @@ -88,12 +88,6 @@ export async function executeOperation<
);
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand All @@ -113,12 +107,18 @@ export async function executeOperation<
session.unpin();
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

try {
return await tryOperation(operation, {
topology,
timeoutContext,
session,
readPreference,
timeoutContext
readPreference
});
} finally {
if (session?.owner != null && session.owner === owner) {
Expand Down Expand Up @@ -157,6 +157,7 @@ type RetryOptions = {
session: ClientSession | undefined;
readPreference: ReadPreference;
topology: Topology;
timeoutContext: TimeoutContext;
};

/**
Expand All @@ -180,7 +181,10 @@ type RetryOptions = {
async function tryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(operation: T, { topology, session, readPreference }: RetryOptions): Promise<TResult> {
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
Expand All @@ -198,7 +202,8 @@ async function tryOperation<

let server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
timeoutContext
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down

0 comments on commit 58d4e7b

Please sign in to comment.