Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3255): add minPoolSizeCheckIntervalMS option to connection pool #3429

Merged
merged 7 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
waitQueueTimeoutMS: number;
/** If we are in load balancer mode. */
loadBalanced: boolean;
/** @internal */
minPoolSizeCheckFrequencyMS?: number;
}

/** @internal */
Expand Down Expand Up @@ -234,6 +236,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
maxConnecting: options.maxConnecting ?? 2,
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,
autoEncrypter: options.autoEncrypter,
metadata: options.metadata
});
Expand Down Expand Up @@ -683,12 +686,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
if (this[kPoolState] === PoolState.ready) {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
this[kMinPoolSizeTimer] = setTimeout(
() => this.ensureMinPoolSize(),
this.options.minPoolSizeCheckFrequencyMS
);
}
});
} else {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100);
this[kMinPoolSizeTimer] = setTimeout(
() => this.ensureMinPoolSize(),
this.options.minPoolSizeCheckFrequencyMS
);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { expect } from 'chai';
import { setTimeout } from 'timers';
import { promisify } from 'util';
import { on } from 'events';

import { CommandStartedEvent } from '../../../src';
import { Collection } from '../../../src/collection';
import { MongoClient } from '../../../src/mongo_client';
import { sleep } from '../../tools/utils';

const failPoint = {
configureFailPoint: 'failCommand',
Expand All @@ -25,21 +25,14 @@ async function runTaskGroup(collection: Collection, count: 10 | 100 | 1000) {
}
}

async function ensurePoolIsFull(client: MongoClient) {
async function ensurePoolIsFull(client: MongoClient): Promise<boolean> {
let connectionCount = 0;
const onConnectionCreated = () => connectionCount++;
client.on('connectionCreated', onConnectionCreated);

// 250ms should be plenty of time to fill the connection pool,
// but just in case we'll loop a couple of times.
for (let i = 0; connectionCount < POOL_SIZE * 2 && i < 10; ++i) {
await promisify(setTimeout)(250);
}

client.removeListener('connectionCreated', onConnectionCreated);

if (connectionCount !== POOL_SIZE * 2) {
throw new Error('Connection pool did not fill up');
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _event of on(client, 'connectionCreated')) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
connectionCount++;
if (connectionCount === POOL_SIZE * 2) {
break;
}
}
}

Expand Down Expand Up @@ -82,7 +75,10 @@ describe('operationCount-based Selection Within Latency Window - Prose Test', fu
await client.connect();

// Step 4: Using CMAP events, ensure the client's connection pools for both mongoses have been saturated
await poolIsFullPromise;
const poolIsFull = Promise.race([poolIsFullPromise, sleep(30 * 1000)]);
if (!poolIsFull) {
throw new Error('Timed out waiting for connection pool to fill to minPoolSize');
}

seeds = client.topology.s.seedlist.map(address => address.toString());

Expand Down
7 changes: 5 additions & 2 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const poolOptions = test.poolOptions || {};
expect(CMAP_POOL_OPTION_NAMES).to.include.members(Object.keys(poolOptions));

// TODO(NODE-3255): update condition to only remove option if set to -1
let minPoolSizeCheckFrequencyMS;
if (poolOptions.backgroundThreadIntervalMS) {
if (poolOptions.backgroundThreadIntervalMS !== -1) {
minPoolSizeCheckFrequencyMS = poolOptions.backgroundThreadIntervalMS;
}
delete poolOptions.backgroundThreadIntervalMS;
}

Expand All @@ -373,7 +376,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const mainThread = threadContext.getThread(MAIN_THREAD_KEY);
mainThread.start();

threadContext.createPool({ ...poolOptions, metadata });
threadContext.createPool({ ...poolOptions, metadata, minPoolSizeCheckFrequencyMS });
// yield control back to the event loop so that the ConnectionPoolCreatedEvent
// has a chance to be fired before any synchronously-emitted events from
// the queued operations
Expand Down
88 changes: 88 additions & 0 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { expect } = require('chai');
const { setImmediate } = require('timers');
const { ns, isHello } = require('../../../src/utils');
const { LEGACY_HELLO_COMMAND } = require('../../../src/constants');
const { createTimerSandbox } = require('../timer_sandbox');

describe('Connection Pool', function () {
let server;
Expand Down Expand Up @@ -128,6 +129,93 @@ describe('Connection Pool', function () {
});
});

describe('minPoolSize population', function () {
let clock, timerSandbox;
beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
});

afterEach(() => {
if (clock) {
timerSandbox.restore();
clock.restore();
clock = undefined;
}
});

it('should respect the minPoolSizeCheckFrequencyMS option', function () {
const pool = new ConnectionPool(server, {
minPoolSize: 2,
minPoolSizeCheckFrequencyMS: 42,
hostAddress: server.hostAddress()
});
const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize');

// return a fake connection that won't get identified as perished
const createConnStub = sinon
.stub(pool, 'createConnection')
.yields(null, { destroy: () => null, generation: 0 });

pool.ready();

// expect ensureMinPoolSize to execute immediately
expect(ensureSpy).to.have.been.calledOnce;
expect(createConnStub).to.have.been.calledOnce;

// check that the successful connection return schedules another run
clock.tick(42);
expect(ensureSpy).to.have.been.calledTwice;
expect(createConnStub).to.have.been.calledTwice;

// check that the 2nd successful connection return schedules another run
// but don't expect to get a new connection since we are at minPoolSize
clock.tick(42);
expect(ensureSpy).to.have.been.calledThrice;
expect(createConnStub).to.have.been.calledTwice;

// check that the next scheduled check runs even after we're at minPoolSize
clock.tick(42);
expect(ensureSpy).to.have.callCount(4);
expect(createConnStub).to.have.been.calledTwice;
});

it('should default minPoolSizeCheckFrequencyMS to 100ms', function () {
const pool = new ConnectionPool(server, {
minPoolSize: 2,
hostAddress: server.hostAddress()
});
const ensureSpy = sinon.spy(pool, 'ensureMinPoolSize');

// return a fake connection that won't get identified as perished
const createConnStub = sinon
.stub(pool, 'createConnection')
.yields(null, { destroy: () => null, generation: 0 });

pool.ready();

// expect ensureMinPoolSize to execute immediately
expect(ensureSpy).to.have.been.calledOnce;
expect(createConnStub).to.have.been.calledOnce;

// check that the successful connection return schedules another run
clock.tick(100);
expect(ensureSpy).to.have.been.calledTwice;
expect(createConnStub).to.have.been.calledTwice;

// check that the 2nd successful connection return schedules another run
// but don't expect to get a new connection since we are at minPoolSize
clock.tick(100);
expect(ensureSpy).to.have.been.calledThrice;
expect(createConnStub).to.have.been.calledTwice;

// check that the next scheduled check runs even after we're at minPoolSize
clock.tick(100);
expect(ensureSpy).to.have.callCount(4);
expect(createConnStub).to.have.been.calledTwice;
});
});

describe('withConnection', function () {
it('should manage a connection for a successful operation', function (done) {
server.setMessageHandler(request => {
Expand Down