Skip to content

Commit

Permalink
chore: fix when timeout errors throw
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Aug 20, 2024
1 parent 4cafed6 commit bad814f
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 101 deletions.
7 changes: 7 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
return;
}
}
} catch (readError) {
if (TimeoutError.is(readError)) {
throw new MongoOperationTimeoutError(
`Timed out during socket read (${readError.duration}ms)`
);
}
throw readError;
} finally {
this.dataEvents = null;
this.throwIfAborted();
Expand Down
15 changes: 7 additions & 8 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { type EventEmitter } from 'events';

import { MongoOperationTimeoutError } from '../../error';
import { type TimeoutContext, TimeoutError } from '../../timeout';
import { type TimeoutContext } from '../../timeout';
import { List, promiseWithResolvers } from '../../utils';

/**
Expand Down Expand Up @@ -91,8 +90,11 @@ export function onData(
// Adding event handlers
emitter.on('data', eventHandler);
emitter.on('error', errorHandler);

const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
timeoutForSocketRead?.throwIfExpired();
// eslint-disable-next-line github/no-then
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);
timeoutForSocketRead?.then(undefined, errorHandler);

return iterator;

Expand All @@ -104,12 +106,9 @@ export function onData(

function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
const timeoutError = TimeoutError.is(err)
? new MongoOperationTimeoutError(`Timed out during socket read (${err.duration}ms)`)
: undefined;

if (promise != null) promise.reject(timeoutError ?? err);
else error = timeoutError ?? err;
if (promise != null) promise.reject(err);
else error = err;
void closeHandler();
}

Expand Down
205 changes: 120 additions & 85 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { ReadConcernLevel } from './read_concern';
import { ReadPreference } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
import { type TimeoutContext } from './timeout';
import { TimeoutContext } from './timeout';
import {
isTransactionCommand,
Transaction,
Expand Down Expand Up @@ -280,12 +280,13 @@ export class ClientSession
async endSession(options?: EndSessionOptions): Promise<void> {
try {
if (this.inTransaction()) {
if (typeof options?.timeoutMS === 'number') {
await endTransaction(this, 'abortTransaction', { timeoutMS: options.timeoutMS });
} else {
await endTransaction(this, 'abortTransaction');
}
await this.abortTransaction({ ...options, throwTimeout: true });
}
} catch (error) {
// spec indicates that we should ignore all errors for `endSessions`
if (MongoOperationTimeoutError.is(error)) throw error;
squashError(error);
} finally {
if (!this.hasEnded) {
const serverSession = this[kServerSession];
if (serverSession != null) {
Expand All @@ -301,11 +302,6 @@ export class ClientSession
this.hasEnded = true;
this.emit('ended', this);
}
} catch (error) {
// spec indicates that we should ignore all errors for `endSessions`
if (MongoOperationTimeoutError.is(error)) throw error;
squashError(error);
} finally {
maybeClearPinnedConnection(this, { force: true, ...options });
}
}
Expand Down Expand Up @@ -460,7 +456,7 @@ export class ClientSession
*
* @param options - Optional options, can be used to override `defaultTimeoutMS`.
*/
async commitTransaction(): Promise<void> {
async commitTransaction(options?: { timeoutMS?: number }): Promise<void> {
if (this.transaction.state === TxnState.NO_TRANSACTION) {
throw new MongoTransactionError('No transaction started');
}
Expand Down Expand Up @@ -510,8 +506,19 @@ export class ClientSession
bypassPinningCheck: true
});

const timeoutMS =
typeof options?.timeoutMS === 'number'
? options.timeoutMS
: typeof this.timeoutMS === 'number'
? this.timeoutMS
: null;

const timeoutContext = this.timeoutContext?.csotEnabled()
? this.timeoutContext
: TimeoutContext.create({ timeoutMS, ...this.clientOptions });

try {
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, timeoutContext);
return;
} catch (firstCommitError) {
if (firstCommitError instanceof MongoError && isRetryableWriteError(firstCommitError)) {
Expand All @@ -521,7 +528,7 @@ export class ClientSession
this.unpin({ force: true });

try {
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, timeoutContext);
return;
} catch (retryCommitError) {
// If the retry failed, we process that error instead of the original
Expand Down Expand Up @@ -556,7 +563,10 @@ export class ClientSession
*
* @param options - Optional options, can be used to override `defaultTimeoutMS`.
*/
async abortTransaction(): Promise<void> {
async abortTransaction(options?: { timeoutMS?: number }): Promise<void>;
/** @internal */
async abortTransaction(options?: { timeoutMS?: number; throwTimeout?: true }): Promise<void>;
async abortTransaction(options?: { timeoutMS?: number; throwTimeout?: true }): Promise<void> {
if (this.transaction.state === TxnState.NO_TRANSACTION) {
throw new MongoTransactionError('No transaction started');
}
Expand Down Expand Up @@ -601,18 +611,34 @@ export class ClientSession
bypassPinningCheck: true
});

const timeoutMS =
typeof options?.timeoutMS === 'number'
? options.timeoutMS
: this.timeoutContext?.csotEnabled()
? this.timeoutContext.timeoutMS // refresh timeoutMS for abort operation
: typeof this.timeoutMS === 'number'
? this.timeoutMS
: null;

const timeoutContext = TimeoutContext.create({ timeoutMS, ...this.clientOptions });

try {
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, timeoutContext);
this.unpin();
return;
} catch (firstAbortError) {
this.unpin();

if (options?.throwTimeout && MongoOperationTimeoutError.is(firstAbortError))
throw firstAbortError;

if (firstAbortError instanceof MongoError && isRetryableWriteError(firstAbortError)) {
try {
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, timeoutContext);
return;
} catch (secondAbortError) {
if (options?.throwTimeout && MongoOperationTimeoutError.is(secondAbortError))
throw secondAbortError;
// we do not retry the retry
}
}
Expand Down Expand Up @@ -670,93 +696,102 @@ export class ClientSession
options?: TransactionOptions
): Promise<T> {
const MAX_TIMEOUT = 120000;
const startTime = now();

let committed = false;
let result: any;

while (!committed) {
this.startTransaction(options); // may throw on error
this.timeoutContext =
options != null && 'timeoutMS' in options && typeof options.timeoutMS === 'number'
? TimeoutContext.create({ timeoutMS: options.timeoutMS, ...this.clientOptions })
: null;

try {
const promise = fn(this);
if (!isPromiseLike(promise)) {
throw new MongoInvalidArgumentError(
'Function provided to `withTransaction` must return a Promise'
);
}
const startTime = this.timeoutContext?.csotEnabled() ? this.timeoutContext.start : now();

result = await promise;
let committed = false;
let result: any;

if (
this.transaction.state === TxnState.NO_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_COMMITTED ||
this.transaction.state === TxnState.TRANSACTION_ABORTED
) {
// Assume callback intentionally ended the transaction
return result;
}
} catch (fnError) {
if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) {
await this.abortTransaction();
throw fnError;
}
try {
while (!committed) {
this.startTransaction(options); // may throw on error

if (
this.transaction.state === TxnState.STARTING_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
) {
await this.abortTransaction();
}
try {
const promise = fn(this);
if (!isPromiseLike(promise)) {
throw new MongoInvalidArgumentError(
'Function provided to `withTransaction` must return a Promise'
);
}

if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
now() - startTime < MAX_TIMEOUT
) {
continue;
}
result = await promise;

throw fnError;
}
if (
this.transaction.state === TxnState.NO_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_COMMITTED ||
this.transaction.state === TxnState.TRANSACTION_ABORTED
) {
// Assume callback intentionally ended the transaction
return result;
}
} catch (fnError) {
if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) {
await this.abortTransaction();
throw fnError;
}

while (!committed) {
try {
/*
* We will rely on ClientSession.commitTransaction() to
* apply a majority write concern if commitTransaction is
* being retried (see: DRIVERS-601)
*/
await this.commitTransaction();
committed = true;
} catch (commitError) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
now() - startTime < MAX_TIMEOUT
this.transaction.state === TxnState.STARTING_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
) {
continue;
await this.abortTransaction();
}

if (
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
now() - startTime < MAX_TIMEOUT
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
break;
continue;
}

throw commitError;
throw fnError;
}

while (!committed) {
try {
/*
* We will rely on ClientSession.commitTransaction() to
* apply a majority write concern if commitTransaction is
* being retried (see: DRIVERS-601)
*/
await this.commitTransaction();
committed = true;
} catch (commitError) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
continue;
}

if (
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || now() - startTime < MAX_TIMEOUT)
) {
break;
}

throw commitError;
}
}
}
return result;
} finally {
this.timeoutContext = null;
}

return result;
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class Timeout extends Promise<never> {
this.duration = duration;
this.start = Math.trunc(performance.now());

if (this.duration > 0) {
if (rejection == null && this.duration > 0) {
this.id = setTimeout(() => {
this.ended = Math.trunc(performance.now());
this.timedOut = true;
Expand All @@ -84,9 +84,11 @@ export class Timeout extends Promise<never> {
// Ensure we do not keep the Node.js event loop running
this.id.unref();
}
} else if (rejection != null) {
this.ended = Math.trunc(performance.now());
this.timedOut = true;
reject(rejection);
}

if (rejection != null) reject(rejection);
}

/**
Expand Down Expand Up @@ -260,10 +262,8 @@ export class CSOTTimeoutContext extends TimeoutContext {
// null or Timeout
this._connectionCheckoutTimeout = this._serverSelectionTimeout;
} else {
return Timeout.reject(
new MongoRuntimeError(
'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira'
)
throw new MongoRuntimeError(
'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira'
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ export interface TransactionOptions extends CommandOperationOptions {
writeConcern?: WriteConcern;
/** A default read preference for commands in this transaction */
readPreference?: ReadPreferenceLike;
/** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */
/**
* Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds
* @deprecated This option is deprecated in favor of `timeoutMS` or `defaultTimeoutMS`.
*/
maxCommitTimeMS?: number;
}

Expand Down

0 comments on commit bad814f

Please sign in to comment.