Skip to content

Commit

Permalink
chore: adjust timeouts and correct promise chain
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Aug 14, 2024
1 parent 6d9266e commit 85c086d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export function onData(
function errorHandler(err: Error) {
const promise = unconsumedPromises.shift();
const timeoutError = TimeoutError.is(err)
? new MongoOperationTimeoutError('Timed out during socket read')
? new MongoOperationTimeoutError(`Timed out during socket read (${err.duration}ms)`)
: undefined;

if (promise != null) promise.reject(timeoutError ?? err);
Expand Down
43 changes: 23 additions & 20 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ export class ClientSession
try {
if (this.inTransaction()) {
if (typeof options?.timeoutMS === 'number') {
await this.abortTransaction({ timeoutMS: options.timeoutMS });
await endTransaction(this, 'abortTransaction', { timeoutMS: options.timeoutMS });
} else {
await this.abortTransaction();
await endTransaction(this, 'abortTransaction');
}
}
if (!this.hasEnded) {
Expand Down Expand Up @@ -470,7 +470,11 @@ export class ClientSession
* @param options - Optional options, can be used to override `defaultTimeoutMS`.
*/
async abortTransaction(options?: { timeoutMS: number }): Promise<void> {
return await endTransaction(this, 'abortTransaction', options);
try {
return await endTransaction(this, 'abortTransaction', options);
} catch (error) {
squashError(error);
}
}

/**
Expand Down Expand Up @@ -664,7 +668,7 @@ async function attemptTransaction<T>(

if (!isPromiseLike(promise)) {
try {
await session.abortTransaction();
await endTransaction(session, 'abortTransaction');
} catch (error) {
squashError(error);
}
Expand All @@ -681,7 +685,7 @@ async function attemptTransaction<T>(
return await attemptTransactionCommit(session, startTime, fn, result, options);
} catch (err) {
if (session.inTransaction()) {
await session.abortTransaction();
await endTransaction(session, 'abortTransaction');
}

if (
Expand Down Expand Up @@ -768,20 +772,20 @@ async function endTransaction(
}

if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
command.maxTimeMS = session.transaction.options.maxTimeMS;
}

if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}

const timeoutMS =
'timeoutMS' in options && typeof options.timeoutMS === 'number'
'timeoutMS' in options && typeof options.timeoutMS === 'number' // timeoutMS provided to method
? options.timeoutMS
: typeof session.timeoutMS === 'number'
: session.timeoutContext?.csotEnabled() // timeoutMS provided to withTxn, ctx created
? session.timeoutContext.timeoutMS // refresh!
: typeof session.timeoutMS === 'number' // timeoutMS inherited from MamaClient
? session.timeoutMS
: session.timeoutContext?.csotEnabled()
? session.timeoutContext.timeoutMS
: null;

const timeoutContext =
Expand All @@ -795,20 +799,19 @@ async function endTransaction(
});

try {
// send the command
await executeOperation(
session.client,
new RunAdminCommandOperation(command, {
session,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
}),
timeoutContext
);
const adminCommand = new RunAdminCommandOperation(command, {
session,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
});

await executeOperation(session.client, adminCommand, timeoutContext);

if (command.abortTransaction) {
// always unpin on abort regardless of command outcome
session.unpin();
}

if (commandName !== 'commitTransaction') {
session.transaction.transition(TxnState.TRANSACTION_ABORTED);
if (session.loadBalanced) {
Expand Down
35 changes: 25 additions & 10 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import { csotMin, noop } from './utils';

/** @internal */
export class TimeoutError extends Error {
duration: number;
override get name(): 'TimeoutError' {
return 'TimeoutError';
}

constructor(message: string, options?: { cause?: Error }) {
constructor(message: string, options: { cause?: Error; duration: number }) {
super(message, options);
this.duration = options.duration;
}

static is(error: unknown): error is TimeoutError {
Expand Down Expand Up @@ -52,7 +54,12 @@ export class Timeout extends Promise<never> {
}

/** Create a new timeout that expires in `duration` ms */
private constructor(executor: Executor = () => null, duration: number, unref = true) {
private constructor(
executor: Executor = () => null,
duration: number,
unref = true,
rejection: Error | null = null
) {
let reject!: Reject;
if (duration < 0) {
throw new MongoInvalidArgumentError('Cannot create a Timeout with a negative duration');
Expand All @@ -71,13 +78,15 @@ export class Timeout extends Promise<never> {
this.id = setTimeout(() => {
this.ended = Math.trunc(performance.now());
this.timedOut = true;
reject(new TimeoutError(`Expired after ${duration}ms`));
reject(new TimeoutError(`Expired after ${duration}ms`, { duration }));
}, this.duration);
if (typeof this.id.unref === 'function' && unref) {
// Ensure we do not keep the Node.js event loop running
this.id.unref();
}
}

if (rejection != null) reject(rejection);
}

/**
Expand All @@ -90,7 +99,7 @@ export class Timeout extends Promise<never> {
}

throwIfExpired(): void {
if (this.timedOut) throw new TimeoutError('Timed out');
if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration });
}

public static expires(durationMS: number, unref?: boolean): Timeout {
Expand All @@ -108,6 +117,10 @@ export class Timeout extends Promise<never> {
typeof timeout.then === 'function'
);
}

static override reject(rejection?: Error): Timeout {
return new Timeout(undefined, 0, true, rejection);
}
}

/** @internal */
Expand Down Expand Up @@ -218,8 +231,8 @@ export class CSOTTimeoutContext extends TimeoutContext {
if (typeof this._serverSelectionTimeout !== 'object' || this._serverSelectionTimeout?.cleared) {
const { remainingTimeMS, serverSelectionTimeoutMS } = this;
if (remainingTimeMS <= 0)
throw new MongoOperationTimeoutError(
`Timed out in server selection after ${this.timeoutMS}ms`
return Timeout.reject(
new MongoOperationTimeoutError(`Timed out in server selection after ${this.timeoutMS}ms`)
);
const usingServerSelectionTimeoutMS =
serverSelectionTimeoutMS !== 0 &&
Expand Down Expand Up @@ -247,8 +260,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
// null or Timeout
this._connectionCheckoutTimeout = this._serverSelectionTimeout;
} else {
throw new MongoRuntimeError(
'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira'
return Timeout.reject(
new MongoRuntimeError(
'Unreachable. If you are seeing this error, please file a ticket on the NODE driver project on Jira'
)
);
}
}
Expand All @@ -259,14 +274,14 @@ export class CSOTTimeoutContext extends TimeoutContext {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket write');
return Timeout.reject(new MongoOperationTimeoutError('Timed out before socket write'));
}

get timeoutForSocketRead(): Timeout | null {
const { remainingTimeMS } = this;
if (!Number.isFinite(remainingTimeMS)) return null;
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
throw new MongoOperationTimeoutError('Timed out before socket read');
return Timeout.reject(new MongoOperationTimeoutError('Timed out before socket read'));
}
}

Expand Down
6 changes: 0 additions & 6 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,6 @@ export function resolveOptions<T extends CommandOperationOptions>(
result.readPreference = readPreference;
}

if (session?.explicit && session.timeoutMS != null && options?.timeoutMS != null) {
throw new MongoInvalidArgumentError(
'Do not specify timeoutMS on operation if already specified on an explicit session'
);
}

const timeoutMS = options?.timeoutMS;

result.timeoutMS = timeoutMS ?? parent?.timeoutMS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,11 @@ describe('CSOT spec prose tests', function () {
const failpoint: FailPoint = {
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: { failCommands: ['abortTransaction'], blockConnection: true, blockTimeMS: 15 }
data: {
failCommands: ['abortTransaction'],
blockConnection: true,
blockTimeMS: 60
}
};

beforeEach(async function () {
Expand All @@ -653,13 +657,18 @@ describe('CSOT spec prose tests', function () {

let client: MongoClient;

afterEach(async () => {
afterEach(async function () {
if (semver.satisfies(this.configuration.version, '>=4.4')) {
const internalClient = this.configuration.newClient();
await internalClient.db('admin').command({ ...failpoint, mode: 'off' });
await internalClient.close();
}
await client?.close();
});

describe('when timeoutMS is provided to the client', () => {
it('throws a timeout error from endSession', async function () {
client = this.configuration.newClient({ timeoutMS: 10 });
client = this.configuration.newClient({ timeoutMS: 50, monitorCommands: true });
const coll = client.db('db').collection('coll');
const session = client.startSession();
session.startTransaction();
Expand All @@ -673,7 +682,7 @@ describe('CSOT spec prose tests', function () {
it('throws a timeout error from endSession', async function () {
client = this.configuration.newClient();
const coll = client.db('db').collection('coll');
const session = client.startSession({ defaultTimeoutMS: 10 });
const session = client.startSession({ defaultTimeoutMS: 50 });
session.startTransaction();
await coll.insertOne({ x: 1 }, { session });
const error = await session.endSession().catch(error => error);
Expand All @@ -688,7 +697,7 @@ describe('CSOT spec prose tests', function () {
const session = client.startSession();
session.startTransaction();
await coll.insertOne({ x: 1 }, { session });
const error = await session.endSession({ timeoutMS: 10 }).catch(error => error);
const error = await session.endSession({ timeoutMS: 50 }).catch(error => error);
expect(error).to.be.instanceOf(MongoOperationTimeoutError);
});
});
Expand Down Expand Up @@ -736,7 +745,7 @@ describe('CSOT spec prose tests', function () {
data: {
failCommands: ['insert', 'abortTransaction'],
blockConnection: true,
blockTimeMS: 15
blockTimeMS: 60
}
};

Expand All @@ -757,15 +766,20 @@ describe('CSOT spec prose tests', function () {

let client: MongoClient;

afterEach(async () => {
afterEach(async function () {
if (semver.satisfies(this.configuration.version, '>=4.4')) {
const internalClient = this.configuration.newClient();
await internalClient.db('admin').command({ ...failpoint, mode: 'off' });
await internalClient.close();
}
await client?.close();
});

it('timeoutMS is refreshed for abortTransaction', async function () {
const commandsFailed = [];

client = this.configuration
.newClient({ timeoutMS: 10, monitorCommands: true })
.newClient({ timeoutMS: 50, monitorCommands: true })
.on('commandFailed', e => commandsFailed.push(e));

const coll = client.db('db').collection('coll');
Expand Down

0 comments on commit 85c086d

Please sign in to comment.