From fe70532f3c5379c4d8744c41236998791c4c212e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 22 Jul 2024 11:17:22 -0400 Subject: [PATCH] refactor(NODE-6230): executeOperation to use iterative retry mechanism (#4157) --- src/cmap/connection_pool.ts | 6 ++++-- src/operations/execute_operation.ts | 27 ++++++++++++++++----------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 5369cc155aa..2cd2bcc2c19 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -28,7 +28,7 @@ import { import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; import { type TimeoutContext, TimeoutError } from '../timeout'; -import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; +import { type Callback, List, makeCounter, now, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -356,6 +356,7 @@ export class ConnectionPool extends TypedEventEmitter { * explicitly destroyed by the new owner. */ async checkOut(options: { timeoutContext: TimeoutContext }): Promise { + const checkoutTime = now(); this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) @@ -367,7 +368,8 @@ export class ConnectionPool extends TypedEventEmitter { const waitQueueMember: WaitQueueMember = { resolve, - reject + reject, + checkoutTime }; this[kWaitQueue].push(waitQueueMember); diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 0cffa0c35f7..15cad8c32a7 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -25,7 +25,7 @@ import { import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { TimeoutContext } from '../timeout'; -import { squashError, supportsRetryableWrites } from '../utils'; +import { supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; @@ -87,12 +87,6 @@ export async function executeOperation< ); } - timeoutContext ??= TimeoutContext.create({ - serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, - waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, - timeoutMS: operation.options.timeoutMS - }); - const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); @@ -112,12 +106,18 @@ export async function executeOperation< session.unpin(); } + timeoutContext ??= TimeoutContext.create({ + serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS, + timeoutMS: operation.options.timeoutMS + }); + try { return await tryOperation(operation, { topology, + timeoutContext, session, - readPreference, - timeoutContext + readPreference }); } finally { if (session?.owner != null && session.owner === owner) { @@ -156,6 +156,7 @@ type RetryOptions = { session: ClientSession | undefined; readPreference: ReadPreference; topology: Topology; + timeoutContext: TimeoutContext; }; /** @@ -179,7 +180,10 @@ type RetryOptions = { async function tryOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(operation: T, { topology, session, readPreference }: RetryOptions): Promise { +>( + operation: T, + { topology, timeoutContext, session, readPreference }: RetryOptions +): Promise { let selector: ReadPreference | ServerSelector; if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) { @@ -197,7 +201,8 @@ async function tryOperation< let server = await topology.selectServer(selector, { session, - operationName: operation.commandName + operationName: operation.commandName, + timeoutContext }); const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);