Skip to content

Commit

Permalink
refactor(NODE-6230): executeOperation to use iterative retry mechanism (
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Aug 20, 2024
1 parent f0d6ec3 commit 26800a9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
6 changes: 4 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { type TimeoutContext, TimeoutError } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { type Callback, List, makeCounter, now, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -356,6 +356,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* explicitly destroyed by the new owner.
*/
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
const checkoutTime = now();
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
Expand All @@ -367,7 +368,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

const waitQueueMember: WaitQueueMember = {
resolve,
reject
reject,
checkoutTime
};

this[kWaitQueue].push(waitQueueMember);
Expand Down
27 changes: 16 additions & 11 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { squashError, supportsRetryableWrites } from '../utils';
import { supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand Down Expand Up @@ -88,12 +88,6 @@ export async function executeOperation<
);
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

const readPreference = operation.readPreference ?? ReadPreference.primary;
const inTransaction = !!session?.inTransaction();

Expand All @@ -113,12 +107,18 @@ export async function executeOperation<
session.unpin();
}

timeoutContext ??= TimeoutContext.create({
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
timeoutMS: operation.options.timeoutMS
});

try {
return await tryOperation(operation, {
topology,
timeoutContext,
session,
readPreference,
timeoutContext
readPreference
});
} finally {
if (session?.owner != null && session.owner === owner) {
Expand Down Expand Up @@ -157,6 +157,7 @@ type RetryOptions = {
session: ClientSession | undefined;
readPreference: ReadPreference;
topology: Topology;
timeoutContext: TimeoutContext;
};

/**
Expand All @@ -180,7 +181,10 @@ type RetryOptions = {
async function tryOperation<
T extends AbstractOperation<TResult>,
TResult = ResultTypeFromOperation<T>
>(operation: T, { topology, session, readPreference }: RetryOptions): Promise<TResult> {
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
let selector: ReadPreference | ServerSelector;

if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
Expand All @@ -198,7 +202,8 @@ async function tryOperation<

let server = await topology.selectServer(selector, {
session,
operationName: operation.commandName
operationName: operation.commandName,
timeoutContext
});

const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
Expand Down

0 comments on commit 26800a9

Please sign in to comment.