Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Oct 1, 2024
1 parent 7a12914 commit 6ee5c9c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
60 changes: 54 additions & 6 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -119,6 +119,9 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;

/** @internal */
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand All @@ -140,6 +143,46 @@ export type AbstractCursorEvents = {
[AbstractCursor.CLOSE](): void;
};

export class CursorTimeoutContext extends TimeoutContext {
constructor(
public timeoutContext: TimeoutContext,
public owner: AbstractCursor | null = null
) {
super();
}

override get serverSelectionTimeout(): Timeout | null {
return this.timeoutContext.serverSelectionTimeout;
}
override get connectionCheckoutTimeout(): Timeout | null {
return this.timeoutContext.connectionCheckoutTimeout;
}
override get clearServerSelectionTimeout(): boolean {
return this.timeoutContext.clearServerSelectionTimeout;
}
override get clearConnectionCheckoutTimeout(): boolean {
return this.timeoutContext.clearConnectionCheckoutTimeout;
}
override get timeoutForSocketWrite(): Timeout | null {
return this.timeoutContext.timeoutForSocketWrite;
}
override get timeoutForSocketRead(): Timeout | null {
return this.timeoutContext.timeoutForSocketRead;
}
override csotEnabled(): this is CSOTTimeoutContext {
return this.timeoutContext.csotEnabled();
}
override refresh(): void {
return this.timeoutContext.refresh();
}
override clear(): void {
return this.timeoutContext.clear();
}
override refreshed(): TimeoutContext {
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
}
}

/** @public */
export abstract class AbstractCursor<
TSchema = any,
Expand Down Expand Up @@ -171,7 +214,7 @@ export abstract class AbstractCursor<
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;
protected timeoutContext?: CursorTimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -264,6 +307,8 @@ export abstract class AbstractCursor<
utf8: options?.enableUtf8Validation === false ? false : true
}
};

this.timeoutContext = options.timeoutContext;
}

/**
Expand Down Expand Up @@ -790,10 +835,13 @@ export abstract class AbstractCursor<
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
this.timeoutContext = new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
}),
this
);
}
try {
const state = await this._initialize(this.cursorSession);
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
constructor(
client: MongoClient,
commandBuilder: ClientBulkWriteCommandBuilder,
options: ClientBulkWriteOptions = {}
options: ClientBulkWriteOptions & AbstractCursorOptions = {}
) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

Expand Down
2 changes: 1 addition & 1 deletion src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
return await new ClientBulkWriteExecutor(this, models, resolveOptions(this, options)).execute();
}

/**
Expand Down
16 changes: 14 additions & 2 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { CursorTimeoutContext } from '../../cursor/abstract_cursor';
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { type MongoClient } from '../../mongo_client';
import { TimeoutContext } from '../../timeout';
import { WriteConcern } from '../../write_concern';
import { executeOperation } from '../execute_operation';
import { ClientBulkWriteOperation } from './client_bulk_write';
Expand Down Expand Up @@ -55,19 +57,29 @@ export class ClientBulkWriteExecutor {
this.options,
pkFactory
);

const timeoutContext = TimeoutContext.create({
...this.options,
serverSelectionTimeoutMS: this.client.s.options.serverSelectionTimeoutMS,
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS,
socketTimeoutMS: this.client.s.options.socketTimeoutMS
});
// Unacknowledged writes need to execute all batches and return { ok: 1}
if (this.options.writeConcern?.w === 0) {
while (commandBuilder.hasNextBatch()) {
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
await executeOperation(this.client, operation);
await executeOperation(this.client, operation, timeoutContext);
}
return { ok: 1 };
} else {
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
// For each command will will create and exhaust a cursor for the results.
let currentBatchOffset = 0;
while (commandBuilder.hasNextBatch()) {
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, {
...this.options,
timeoutContext: new CursorTimeoutContext(timeoutContext)
});
const docs = await cursor.toArray();
const operations = cursor.operations;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
Expand Down
10 changes: 10 additions & 0 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ export abstract class TimeoutContext {
abstract refresh(): void;

abstract clear(): void;

abstract refreshed(): TimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -305,6 +307,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
this._serverSelectionTimeout?.clear();
this._connectionCheckoutTimeout?.clear();
}

override refreshed(): CSOTTimeoutContext {
return new CSOTTimeoutContext(this);
}
}

/** @internal */
Expand Down Expand Up @@ -351,4 +357,8 @@ export class LegacyTimeoutContext extends TimeoutContext {
clear(): void {
return;
}

override refreshed(): LegacyTimeoutContext {
return new LegacyTimeoutContext(this.options);
}
}

0 comments on commit 6ee5c9c

Please sign in to comment.