Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed Sep 27, 2024
1 parent 05c3fcd commit ffcb23b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 28 deletions.
62 changes: 49 additions & 13 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
isResumableError,
MongoAPIError,
MongoChangeStreamError,
MongoOperationTimeoutError,
MongoRuntimeError
} from './error';
import { MongoClient } from './mongo_client';
Expand All @@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
import type { ReadPreference } from './read_preference';
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
import type { ServerSessionId } from './sessions';
import { TimeoutContext } from './timeout';
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';

/** @internal */
Expand Down Expand Up @@ -584,6 +586,8 @@ export class ChangeStream<
/** @internal */
[kMode]: false | 'iterator' | 'emitter';

private timeoutContext?: TimeoutContext;

/** @event */
static readonly RESPONSE = RESPONSE;
/** @event */
Expand Down Expand Up @@ -689,6 +693,9 @@ export class ChangeStream<
try {
await this._processErrorIteratorMode(error);
} catch (error) {
if (error instanceof MongoOperationTimeoutError) {
throw error;
}
try {
await this.close();
} catch (error) {
Expand All @@ -705,25 +712,33 @@ export class ChangeStream<
this._setIsIterator();
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
// fails or until a timeout error is encountered
this.timeoutContext?.refresh();

while (true) {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
await this._processErrorIteratorMode(error);
} catch (error) {
squashError(error);
if (error instanceof MongoOperationTimeoutError) {
throw error; // Don't close the change stream, but throw the timeout error
}
try {
await this.close();
} catch (error) {
squashError(error);
}
throw error;
}
throw error;
}
}
} finally {
this.timeoutContext?.clear();
}
}

Expand All @@ -744,6 +759,9 @@ export class ChangeStream<
try {
await this._processErrorIteratorMode(error);
} catch (error) {
if (error instanceof MongoOperationTimeoutError) {
throw error; // throw the error without closing the change stream
}
try {
await this.close();
} catch (error) {
Expand Down Expand Up @@ -862,11 +880,20 @@ export class ChangeStream<
);
}

if (this.options.timeoutMS != null) {
this.timeoutContext ??= TimeoutContext.create({
timeoutMS: this.options.timeoutMS,
serverSelectionTimeoutMS: client.options.serverSelectionTimeoutMS
});
delete this.options.timeoutMS;
}

const changeStreamCursor = new ChangeStreamCursor<TSchema, TChange>(
client,
this.namespace,
pipeline,
options
options,
this.timeoutContext
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down Expand Up @@ -946,6 +973,10 @@ export class ChangeStream<
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) return;

if (changeStreamError instanceof MongoOperationTimeoutError) {
return; // FIXME: At least emit the error
}

if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
this._endStream();

Expand Down Expand Up @@ -975,7 +1006,10 @@ export class ChangeStream<
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (
!isResumableError(changeStreamError, this.cursor.maxWireVersion) &&
!(changeStreamError instanceof MongoOperationTimeoutError)
) {
try {
await this.close();
} catch (error) {
Expand All @@ -1000,6 +1034,8 @@ export class ChangeStream<
await this.close();
throw changeStreamError;
}

if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
}
}

Expand Down
28 changes: 17 additions & 11 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,11 @@ export type AbstractCursorEvents = {

/** @public */
export abstract class AbstractCursor<
TSchema = any,
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
>
TSchema = any,
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
>
extends TypedEventEmitter<CursorEvents>
implements AsyncDisposable
{
implements AsyncDisposable {
/** @internal */
private cursorId: Long | null;
/** @internal */
Expand All @@ -172,6 +171,8 @@ export abstract class AbstractCursor<
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;
/** @internal */
protected isChangeStreamCursor?: boolean;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -455,8 +456,9 @@ export abstract class AbstractCursor<
if (this.cursorId === Long.ZERO) {
return false;
}
const shouldRefresh = !this.isChangeStreamCursor && this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;

if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
if (shouldRefresh) {
this.timeoutContext?.refresh();
}
try {
Expand All @@ -467,7 +469,7 @@ export abstract class AbstractCursor<
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
if (shouldRefresh) {
this.timeoutContext?.clear();
}
}
Expand All @@ -480,7 +482,9 @@ export abstract class AbstractCursor<
if (this.cursorId === Long.ZERO) {
throw new MongoCursorExhaustedError();
}
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
const shouldRefresh = !this.isChangeStreamCursor && this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;

if (shouldRefresh) {
this.timeoutContext?.refresh();
}

Expand All @@ -494,7 +498,7 @@ export abstract class AbstractCursor<
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
if (shouldRefresh) {
this.timeoutContext?.clear();
}
}
Expand All @@ -510,7 +514,9 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
const shouldRefresh = this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;

if (shouldRefresh) {
this.timeoutContext?.refresh();
}
try {
Expand All @@ -528,7 +534,7 @@ export abstract class AbstractCursor<
return doc;
}
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
if (shouldRefresh) {
this.timeoutContext?.clear();
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/cursor/change_stream_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { AggregateOperation } from '../operations/aggregate';
import type { CollationOptions } from '../operations/command';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import {
AbstractCursor,
Expand Down Expand Up @@ -53,10 +54,13 @@ export class ChangeStreamCursor<
client: MongoClient,
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
options: ChangeStreamCursorOptions = {},
timeoutContext?: TimeoutContext
) {
super(client, namespace, options);
super(client, namespace, { ...options, tailable: true, awaitData: true });
this.timeoutContext = timeoutContext;

this.isChangeStreamCursor = true;
this.pipeline = pipeline;
this.changeStreamCursorOptions = options;
this._resumeToken = null;
Expand Down Expand Up @@ -110,6 +114,7 @@ export class ChangeStreamCursor<
}

_processBatch(response: CursorResponse): void {
console.log(response.toObject());
const { postBatchResumeToken } = response;
if (postBatchResumeToken) {
this.postBatchResumeToken = postBatchResumeToken;
Expand All @@ -130,6 +135,7 @@ export class ChangeStreamCursor<
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
...this.cursorOptions,
...this.changeStreamCursorOptions,
omitMaxTimeMS: false,
session
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ const skippedSpecs = {
'gridfs-delete': 'TODO(NODE-6275)',
'gridfs-download': 'TODO(NODE-6275)',
'gridfs-find': 'TODO(NODE-6275)',
'gridfs-upload': 'TODO(NODE-6275)',
'change-streams': 'TODO(NODE-6387)'
'gridfs-upload': 'TODO(NODE-6275)'
};

const skippedTests = {
Expand Down

0 comments on commit ffcb23b

Please sign in to comment.