From 2f8336051ee5cbd80125db43b321416602f1ee88 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 9 Sep 2020 17:43:28 -0400 Subject: [PATCH] fix: allow event loop to process during wait queue processing Running `processWaitQueue` on the next tick allows the event loop to process while the connection pool is processing large numbers of wait queue members. This also uncovered a few issues with timing in our tests, and in some cases our top-level API: - `commitTransaction` / `abortTransaction` use `maybePromise` now - `endSession` must wait for all the machinery behind the scenes to check out a connection and write a message before considering its job finished - internal calls to `kill` a cursor now await the the process of fully sending that command, even if they ignore the response NODE-2803 --- src/cmap/connection_pool.ts | 11 ++--- src/cursor/core_cursor.ts | 25 ++++++---- src/sessions.ts | 66 +++++++++++++++++--------- test/functional/cursor.test.js | 6 +-- test/functional/spec-runner/index.js | 11 +++-- test/unit/cmap/connection_pool.test.js | 3 +- 6 files changed, 72 insertions(+), 50 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 53930060f7..0f57fb4e24 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -283,7 +283,6 @@ export class ConnectionPool extends EventEmitter { return; } - // add this request to the wait queue const waitQueueMember: WaitQueueMember = { callback }; const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; if (waitQueueTimeoutMS) { @@ -299,11 +298,8 @@ export class ConnectionPool extends EventEmitter { }, waitQueueTimeoutMS); } - // place the member at the end of the wait queue this[kWaitQueue].push(waitQueueMember); - - // process the wait queue - processWaitQueue(this); + setImmediate(() => processWaitQueue(this)); } /** @@ -316,7 +312,6 @@ export class ConnectionPool extends EventEmitter { const stale = connectionIsStale(this, connection); const willDestroy = !!(poolClosed || stale || connection.closed); - // Properly adjust state of connection if (!willDestroy) { connection.markAvailable(); this[kConnections].push(connection); @@ -329,7 +324,7 @@ export class ConnectionPool extends EventEmitter { destroyConnection(this, connection, reason); } - processWaitQueue(this); + setImmediate(() => processWaitQueue(this)); } /** @@ -503,7 +498,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback) // otherwise add it to the pool for later acquisition, and try to process the wait queue pool[kConnections].push(connection); - processWaitQueue(pool); + setImmediate(() => processWaitQueue(pool)); }); } diff --git a/src/cursor/core_cursor.ts b/src/cursor/core_cursor.ts index c3576feb92..cdf52819ff 100644 --- a/src/cursor/core_cursor.ts +++ b/src/cursor/core_cursor.ts @@ -701,9 +701,10 @@ function nextFunction(self: CoreCursor, callback: Callback) { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + return self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); } else if ( self.cursorState.cursorIndex === self.cursorState.documents.length && !Long.ZERO.equals(cursorId) @@ -775,9 +776,12 @@ function nextFunction(self: CoreCursor, callback: Callback) { } else { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); + + return; } // Increment the current cursor limit @@ -789,11 +793,12 @@ function nextFunction(self: CoreCursor, callback: Callback) { // Doc overflow if (!doc || doc.$err) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, () => - callback(new MongoError(doc ? doc.$err : undefined)) + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, () => callback(new MongoError(doc ? doc.$err : undefined))) ); + + return; } // Transform the doc with passed in transformation method if provided diff --git a/src/sessions.ts b/src/sessions.ts index a42c1c4349..d103eb274f 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -52,6 +52,8 @@ export interface ClientSessionOptions { /** @public */ export type WithTransactionCallback = (session: ClientSession) => Promise | void; +const kServerSession = Symbol('serverSession'); + /** * A class representing a client session on the server * @@ -62,7 +64,6 @@ class ClientSession extends EventEmitter { topology: Topology; sessionPool: ServerSessionPool; hasEnded: boolean; - serverSession?: ServerSession; clientOptions?: MongoClientOptions; supports: { causalConsistency: boolean }; clusterTime?: ClusterTime; @@ -71,6 +72,7 @@ class ClientSession extends EventEmitter { owner: symbol | CoreCursor; defaultTransactionOptions: TransactionOptions; transaction: Transaction; + [kServerSession]?: ServerSession; /** * Create a client session. @@ -102,8 +104,8 @@ class ClientSession extends EventEmitter { this.topology = topology; this.sessionPool = sessionPool; this.hasEnded = false; - this.serverSession = sessionPool.acquire(); this.clientOptions = clientOptions; + this[kServerSession] = undefined; this.supports = { causalConsistency: @@ -124,41 +126,61 @@ class ClientSession extends EventEmitter { return this.serverSession?.id; } + get serverSession(): ServerSession { + if (this[kServerSession] == null) { + this[kServerSession] = this.sessionPool.acquire(); + } + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this[kServerSession]!; + } + /** * Ends this session on the server * * @param options - Optional settings. Currently reserved for future use * @param callback - Optional callback for completion of this operation */ - endSession(): void; + endSession(): Promise; endSession(callback: Callback): void; + endSession(options: Record): Promise; endSession(options: Record, callback: Callback): void; - endSession(options?: Record | Callback, callback?: Callback): void { - if (typeof options === 'function') (callback = options as Callback), (options = {}); + endSession( + options?: Record | Callback, + callback?: Callback + ): void | Promise { + if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - if (this.hasEnded) { - if (typeof callback === 'function') callback(); - return; - } + return maybePromise(callback, done => { + if (this.hasEnded) { + return done(); + } - if (this.serverSession && this.inTransaction()) { - this.abortTransaction(); // pass in callback? - } + const completeEndSession = () => { + // release the server session back to the pool + this.sessionPool.release(this.serverSession); + this[kServerSession] = undefined; - // mark the session as ended, and emit a signal - this.hasEnded = true; - this.emit('ended', this); + // mark the session as ended, and emit a signal + this.hasEnded = true; + this.emit('ended', this); - // release the server session back to the pool - if (this.serverSession) { - this.sessionPool.release(this.serverSession); - } + // spec indicates that we should ignore all errors for `endSessions` + done(); + }; - this.serverSession = undefined; + if (this.serverSession && this.inTransaction()) { + this.abortTransaction(err => { + if (err) return done(err); + completeEndSession(); + }); + + return; + } - // spec indicates that we should ignore all errors for `endSessions` - if (typeof callback === 'function') callback(); + completeEndSession(); + }); } /** diff --git a/test/functional/cursor.test.js b/test/functional/cursor.test.js index 4a97f52611..412145ab6d 100644 --- a/test/functional/cursor.test.js +++ b/test/functional/cursor.test.js @@ -3999,9 +3999,9 @@ describe('Cursor', function () { const cursor = collection.find(); const promise = cursor.forEach(); expect(promise).to.exist.and.to.be.an.instanceof(Promise); - promise.catch(() => {}); - - cursor.close(() => client.close(() => done())); + promise.then(() => { + cursor.close(() => client.close(() => done())); + }); }); }); diff --git a/test/functional/spec-runner/index.js b/test/functional/spec-runner/index.js index cbc34d21f7..45dcd03362 100644 --- a/test/functional/spec-runner/index.js +++ b/test/functional/spec-runner/index.js @@ -344,11 +344,12 @@ function runTestSuiteTest(configuration, spec, context) { throw err; }) .then(() => { - if (session0) session0.endSession(); - if (session1) session1.endSession(); - - return validateExpectations(context.commandEvents, spec, savedSessionData); - }); + const promises = []; + if (session0) promises.push(session0.endSession()); + if (session1) promises.push(session1.endSession()); + return Promise.all(promises); + }) + .then(() => validateExpectations(context.commandEvents, spec, savedSessionData)); }); } diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 6d47f302a7..a4cdd57f20 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -139,8 +139,7 @@ describe('Connection Pool', function () { sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); - expect(pool).property('waitQueueSize').to.equal(0); - + setImmediate(() => expect(pool).property('waitQueueSize').to.equal(0)); done(); }); });