Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow event loop to process during wait queue processing #2541

Merged
merged 1 commit into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ export class ConnectionPool extends EventEmitter {
return;
}

// add this request to the wait queue
const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
Expand All @@ -299,11 +298,8 @@ export class ConnectionPool extends EventEmitter {
}, waitQueueTimeoutMS);
}

// place the member at the end of the wait queue
this[kWaitQueue].push(waitQueueMember);

// process the wait queue
processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand All @@ -316,7 +312,6 @@ export class ConnectionPool extends EventEmitter {
const stale = connectionIsStale(this, connection);
const willDestroy = !!(poolClosed || stale || connection.closed);

// Properly adjust state of connection
if (!willDestroy) {
connection.markAvailable();
this[kConnections].push(connection);
Expand All @@ -329,7 +324,7 @@ export class ConnectionPool extends EventEmitter {
destroyConnection(this, connection, reason);
}

processWaitQueue(this);
setImmediate(() => processWaitQueue(this));
}

/**
Expand Down Expand Up @@ -503,7 +498,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
processWaitQueue(pool);
setImmediate(() => processWaitQueue(pool));
});
}

Expand Down
25 changes: 15 additions & 10 deletions src/cursor/core_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,9 +701,10 @@ function nextFunction(self: CoreCursor, callback: Callback) {

if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
return self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);
} else if (
self.cursorState.cursorIndex === self.cursorState.documents.length &&
!Long.ZERO.equals(cursorId)
Expand Down Expand Up @@ -775,9 +776,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
} else {
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, callback);
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, callback)
);

return;
}

// Increment the current cursor limit
Expand All @@ -789,11 +793,12 @@ function nextFunction(self: CoreCursor, callback: Callback) {
// Doc overflow
if (!doc || doc.$err) {
// Ensure we kill the cursor on the server
self.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(self, () =>
callback(new MongoError(doc ? doc.$err : undefined))
self.kill(() =>
// Set cursor in dead and notified state
setCursorDeadAndNotified(self, () => callback(new MongoError(doc ? doc.$err : undefined)))
);

return;
}

// Transform the doc with passed in transformation method if provided
Expand Down
42 changes: 9 additions & 33 deletions src/cursor/cursor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { emitDeprecatedOptionWarning } from '../utils';
import { PromiseProvider } from '../promise_provider';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import { Transform, PassThrough } from 'stream';
import { deprecate } from 'util';
Expand Down Expand Up @@ -656,46 +655,23 @@ export class Cursor<
forEach(iterator: (doc: Document) => void): Promise<Document>;
forEach(iterator: (doc: Document) => void, callback: Callback): void;
forEach(iterator: (doc: Document) => void, callback?: Callback): Promise<Document> | void {
const Promise = PromiseProvider.get();
if (typeof iterator !== 'function') {
throw new TypeError('Missing required parameter `iterator`');
}

// Rewind cursor state
this.rewind();

// Set current cursor to INIT
this.s.state = CursorState.INIT;

if (typeof callback === 'function') {
return maybePromise(callback, done => {
each(this, (err, doc) => {
if (err) {
callback(err);
return false;
}

if (doc != null) {
iterator(doc);
return true;
}

if (doc == null) {
callback(undefined);
return false;
}
if (err) return done(err);
if (doc != null) return iterator(doc);
done();
});
} else {
return new Promise<Document>((fulfill, reject) => {
each(this, (err, doc) => {
if (err) {
reject(err);
return false;
} else if (doc == null) {
fulfill();
return false;
} else {
iterator(doc);
return true;
}
});
});
}
});
}

/**
Expand Down
66 changes: 44 additions & 22 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export interface ClientSessionOptions {
/** @public */
export type WithTransactionCallback = (session: ClientSession) => Promise<any> | void;

const kServerSession = Symbol('serverSession');

/**
* A class representing a client session on the server
*
Expand All @@ -62,7 +64,6 @@ class ClientSession extends EventEmitter {
topology: Topology;
sessionPool: ServerSessionPool;
hasEnded: boolean;
serverSession?: ServerSession;
clientOptions?: MongoClientOptions;
supports: { causalConsistency: boolean };
clusterTime?: ClusterTime;
Expand All @@ -71,6 +72,7 @@ class ClientSession extends EventEmitter {
owner: symbol | CoreCursor;
defaultTransactionOptions: TransactionOptions;
transaction: Transaction;
[kServerSession]?: ServerSession;

/**
* Create a client session.
Expand Down Expand Up @@ -102,8 +104,8 @@ class ClientSession extends EventEmitter {
this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.serverSession = sessionPool.acquire();
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.supports = {
causalConsistency:
Expand All @@ -124,41 +126,61 @@ class ClientSession extends EventEmitter {
return this.serverSession?.id;
}

get serverSession(): ServerSession {
if (this[kServerSession] == null) {
this[kServerSession] = this.sessionPool.acquire();
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return this[kServerSession]!;
}

/**
* Ends this session on the server
*
* @param options - Optional settings. Currently reserved for future use
* @param callback - Optional callback for completion of this operation
*/
endSession(): void;
endSession(): Promise<void>;
endSession(callback: Callback<void>): void;
endSession(options: Record<string, unknown>): Promise<void>;
endSession(options: Record<string, unknown>, callback: Callback<void>): void;
endSession(options?: Record<string, unknown> | Callback<void>, callback?: Callback<void>): void {
if (typeof options === 'function') (callback = options as Callback), (options = {});
endSession(
options?: Record<string, unknown> | Callback<void>,
callback?: Callback<void>
): void | Promise<void> {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

if (this.hasEnded) {
if (typeof callback === 'function') callback();
return;
}
return maybePromise(callback, done => {
if (this.hasEnded) {
return done();
}

if (this.serverSession && this.inTransaction()) {
this.abortTransaction(); // pass in callback?
}
const completeEndSession = () => {
// release the server session back to the pool
this.sessionPool.release(this.serverSession);
this[kServerSession] = undefined;

// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);
// mark the session as ended, and emit a signal
this.hasEnded = true;
this.emit('ended', this);

// release the server session back to the pool
if (this.serverSession) {
this.sessionPool.release(this.serverSession);
}
// spec indicates that we should ignore all errors for `endSessions`
done();
};

this.serverSession = undefined;
if (this.serverSession && this.inTransaction()) {
this.abortTransaction(err => {
if (err) return done(err);
completeEndSession();
});

return;
}

// spec indicates that we should ignore all errors for `endSessions`
if (typeof callback === 'function') callback();
completeEndSession();
});
}

/**
Expand Down
12 changes: 4 additions & 8 deletions test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3987,21 +3987,17 @@ describe('Cursor', function () {
}
);

it('should return a promise when no callback supplied to forEach method', function (done) {
it('should return a promise when no callback supplied to forEach method', function () {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { poolSize: 1, auto_reconnect: false });

client.connect(function (err, client) {
expect(err).to.not.exist;
return client.connect(() => {
const db = client.db(configuration.db);
const collection = db.collection('cursor_session_tests2');

const cursor = collection.find();
const promise = cursor.forEach();
const promise = cursor.forEach(() => {});
expect(promise).to.exist.and.to.be.an.instanceof(Promise);
promise.catch(() => {});

cursor.close(() => client.close(() => done()));
return promise.then(() => cursor.close()).then(() => client.close());
});
});

Expand Down
11 changes: 6 additions & 5 deletions test/functional/spec-runner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,12 @@ function runTestSuiteTest(configuration, spec, context) {
throw err;
})
.then(() => {
if (session0) session0.endSession();
if (session1) session1.endSession();

return validateExpectations(context.commandEvents, spec, savedSessionData);
});
const promises = [];
if (session0) promises.push(session0.endSession());
if (session1) promises.push(session1.endSession());
return Promise.all(promises);
})
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
});
}

Expand Down
3 changes: 1 addition & 2 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ describe('Connection Pool', function () {
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
pool.checkIn(conn);

expect(pool).property('waitQueueSize').to.equal(0);

setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0));
done();
});
});
Expand Down