Skip to content

Commit

Permalink
client bulk write CSOT
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Oct 9, 2024
1 parent 1a06868 commit b5bf67e
Show file tree
Hide file tree
Showing 19 changed files with 834 additions and 113 deletions.
8 changes: 5 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
if (!options.omitMaxTimeMS) {
const maxTimeMS = options.timeoutContext?.maxTimeMS;
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
Expand Down Expand Up @@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
} finally {
timeout.clear();
}
}
return await drainEvent;
Expand Down
1 change: 1 addition & 0 deletions src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export function onData(
emitter.off('data', eventHandler);
emitter.off('error', errorHandler);
finished = true;
timeoutForSocketRead?.clear();
const doneResult = { value: undefined, done: finished } as const;

for (const promise of unconsumedPromises) {
Expand Down
136 changes: 104 additions & 32 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;

/**
* @internal
*
* A timeout context to govern the total time the cursor can live. If provided, the cursor
* cannot be used in ITERATION mode.
*/
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -171,7 +179,7 @@ export abstract class AbstractCursor<
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;
protected timeoutContext?: CursorTimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -205,22 +213,14 @@ export abstract class AbstractCursor<
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
}
this.cursorOptions.timeoutMode =
options.timeoutMode ??
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
} else {
if (options.timeoutMode != null)
if (options.timeoutMode != null && options.timeoutContext == null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}
this.cursorOptions.omitMaxTimeMS =
Expand Down Expand Up @@ -264,6 +264,17 @@ export abstract class AbstractCursor<
utf8: options?.enableUtf8Validation === false ? false : true
}
};

if (
options.timeoutContext != null &&
options.timeoutMS != null &&
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
) {
throw new MongoAPIError(
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
);
}
this.timeoutContext = options.timeoutContext;
}

/**
Expand Down Expand Up @@ -721,6 +732,9 @@ export abstract class AbstractCursor<
* if the resultant data has already been retrieved by this cursor.
*/
rewind(): void {
if (this.timeoutContext && this.timeoutContext.owner !== this) {
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
}
if (!this.initialized) {
return;
}
Expand Down Expand Up @@ -790,10 +804,13 @@ export abstract class AbstractCursor<
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
this.timeoutContext ??= new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
}),
this
);
}
try {
const state = await this._initialize(this.cursorSession);
Expand Down Expand Up @@ -872,6 +889,20 @@ export abstract class AbstractCursor<
private async cleanup(timeoutMS?: number, error?: Error) {
this.isClosed = true;
const session = this.cursorSession;
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
if (timeoutMS != null) {
this.timeoutContext?.clear();
return new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
}),
this
);
} else {
return this.timeoutContext?.refreshed();
}
};
try {
if (
!this.isKilled &&
Expand All @@ -884,23 +915,13 @@ export abstract class AbstractCursor<
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
let timeoutContext: TimeoutContext | undefined;
if (timeoutMS != null) {
this.timeoutContext?.clear();
timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
});
} else {
this.timeoutContext?.refresh();
timeoutContext = this.timeoutContext;
}

await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
}),
timeoutContext
timeoutContextForKillCursors()
);
}
} catch (error) {
Expand Down Expand Up @@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
}

configureResourceManagement(AbstractCursor.prototype);

/**
* @internal
* The cursor timeout context is a wrapper around a timeout context
* that keeps track of the "owner" of the cursor. For timeout contexts
* instantiated inside a cursor, the owner will be the cursor.
*
* All timeout behavior is exactly the same as the wrapped timeout context's.
*/
export class CursorTimeoutContext extends TimeoutContext {
constructor(
public timeoutContext: TimeoutContext,
public owner: symbol | AbstractCursor
) {
super();
}
override get serverSelectionTimeout(): Timeout | null {
return this.timeoutContext.serverSelectionTimeout;
}
override get connectionCheckoutTimeout(): Timeout | null {
return this.timeoutContext.connectionCheckoutTimeout;
}
override get clearServerSelectionTimeout(): boolean {
return this.timeoutContext.clearServerSelectionTimeout;
}
override get clearConnectionCheckoutTimeout(): boolean {
return this.timeoutContext.clearConnectionCheckoutTimeout;
}
override get timeoutForSocketWrite(): Timeout | null {
return this.timeoutContext.timeoutForSocketWrite;
}
override get timeoutForSocketRead(): Timeout | null {
return this.timeoutContext.timeoutForSocketRead;
}
override csotEnabled(): this is CSOTTimeoutContext {
return this.timeoutContext.csotEnabled();
}
override refresh(): void {
return this.timeoutContext.refresh();
}
override clear(): void {
return this.timeoutContext.clear();
}
override get maxTimeMS(): number | null {
return this.timeoutContext.maxTimeMS;
}

override refreshed(): CursorTimeoutContext {
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
}
}
8 changes: 6 additions & 2 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
options: ClientBulkWriteCursorOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

Expand Down Expand Up @@ -78,7 +78,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
const response = await executeOperation(
this.client,
clientBulkWriteOperation,
this.timeoutContext
);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export type {
CursorStreamOptions
} from './cursor/abstract_cursor';
export type {
CursorTimeoutContext,
InitialCursorResponse,
InternalAbstractCursorOptions
} from './cursor/abstract_cursor';
Expand Down
16 changes: 14 additions & 2 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { type MongoClient } from '../../mongo_client';
import { TimeoutContext } from '../../timeout';
import { resolveTimeoutOptions } from '../../utils';
import { WriteConcern } from '../../write_concern';
import { executeOperation } from '../execute_operation';
import { ClientBulkWriteOperation } from './client_bulk_write';
Expand Down Expand Up @@ -56,18 +59,27 @@ export class ClientBulkWriteExecutor {
pkFactory
);
// Unacknowledged writes need to execute all batches and return { ok: 1}
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
const context = TimeoutContext.create(resolvedOptions);

if (this.options.writeConcern?.w === 0) {
while (commandBuilder.hasNextBatch()) {
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, context);
}
return { ok: 1 };
} else {
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
let currentBatchOffset = 0;
while (commandBuilder.hasNextBatch()) {
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
const cursorContext = new CursorTimeoutContext(context, Symbol());
const options = {
...this.options,
timeoutContext: cursorContext,
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
};
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
const docs = await cursor.toArray();
const operations = cursor.operations;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
Expand Down
5 changes: 3 additions & 2 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { ReadConcern } from '../read_concern';
import type { Server } from '../sdam/server';
Expand All @@ -17,7 +17,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface FindOptions<TSchema extends Document = Document>
extends Omit<CommandOperationOptions, 'writeConcern'> {
extends Omit<CommandOperationOptions, 'writeConcern'>,
AbstractCursorOptions {
/** Sets the limit of documents returned in the query. */
limit?: number;
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export type ServerEvents = {
EventEmitterWithState;

/** @internal */
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
timeoutContext: TimeoutContext;
};

Expand Down
19 changes: 18 additions & 1 deletion src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,15 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions

/** @internal */
export abstract class TimeoutContext {
static create(options: TimeoutContextOptions): TimeoutContext {
static create(options: Partial<TimeoutContextOptions>): TimeoutContext {
if (options.session?.timeoutContext != null) return options.session?.timeoutContext;
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);
else throw new MongoRuntimeError('Unrecognized options');
}

abstract get maxTimeMS(): number | null;

abstract get serverSelectionTimeout(): Timeout | null;

abstract get connectionCheckoutTimeout(): Timeout | null;
Expand All @@ -195,6 +197,9 @@ export abstract class TimeoutContext {
abstract refresh(): void;

abstract clear(): void;

/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
abstract refreshed(): TimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`);
return remainingTimeMS;
}

override refreshed(): CSOTTimeoutContext {
return new CSOTTimeoutContext(this);
}
}

/** @internal */
Expand Down Expand Up @@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
clear(): void {
return;
}

get maxTimeMS() {
return null;
}

override refreshed(): LegacyTimeoutContext {
return new LegacyTimeoutContext(this.options);
}
}
Loading

0 comments on commit b5bf67e

Please sign in to comment.