Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5197): add server monitoring mode #3899

Merged
merged 6 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from './mongo_logger';
import { ReadConcern, type ReadConcernLevel } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import { ServerMonitoringModes } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import {
DEFAULT_PK_FACTORY,
Expand Down Expand Up @@ -1055,6 +1056,17 @@ export const OPTIONS = {
serializeFunctions: {
type: 'boolean'
},
serverMonitoringMode: {
default: 'auto',
transform({ values: [value] }) {
if (!ServerMonitoringModes.includes(value as string)) {
throw new MongoParseError(
'serverMonitoringMode must be one of `auto`, `poll`, or `stream`'
);
}
return value;
}
},
serverSelectionTimeoutMS: {
default: 30000,
type: 'uint'
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,9 @@ export type {
MonitorOptions,
MonitorPrivate,
RTTPinger,
RTTPingerOptions
RTTPingerOptions,
ServerMonitoringMode,
ServerMonitoringModes
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
export type {
Expand Down
4 changes: 4 additions & 0 deletions src/mongo_client.ts
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import type { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import { readPreferenceServerSelector } from './sdam/server_selection';
import type { SrvPoller } from './sdam/srv_polling';
Expand Down Expand Up @@ -257,6 +258,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
proxyUsername?: string;
/** Configures a Socks5 proxy password when the proxy in proxyHost requires username/password authentication. */
proxyPassword?: string;
/** Instructs the driver monitors to use a specific monitoring mode */
serverMonitoringMode?: ServerMonitoringMode;

/** @internal */
srvPoller?: SrvPoller;
Expand Down Expand Up @@ -816,6 +819,7 @@ export interface MongoOptions
proxyPort?: number;
proxyUsername?: string;
proxyPassword?: string;
serverMonitoringMode: ServerMonitoringMode;

/** @internal */
connectionType?: typeof Connection;
Expand Down
61 changes: 42 additions & 19 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
import { type Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, type ConnectionOptions } from '../cmap/connection';
import { getFAASEnv } from '../cmap/handshake/client_metadata';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
Expand Down Expand Up @@ -44,6 +45,11 @@ function isInCloseState(monitor: Monitor) {
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
}

/** @public */
export const ServerMonitoringModes = ['auto', 'poll', 'stream'];
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
/** @public */
export type ServerMonitoringMode = (typeof ServerMonitoringModes)[number];

/** @internal */
export interface MonitorPrivate {
state: string;
Expand All @@ -55,6 +61,7 @@ export interface MonitorOptions
connectTimeoutMS: number;
heartbeatFrequencyMS: number;
minHeartbeatFrequencyMS: number;
serverMonitoringMode: ServerMonitoringMode;
}

/** @public */
Expand All @@ -73,9 +80,16 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
s: MonitorPrivate;
address: string;
options: Readonly<
Pick<MonitorOptions, 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS'>
Pick<
MonitorOptions,
| 'connectTimeoutMS'
| 'heartbeatFrequencyMS'
| 'minHeartbeatFrequencyMS'
| 'serverMonitoringMode'
>
>;
connectOptions: ConnectionOptions;
isRunningInFaasEnv: boolean;
[kServer]: Server;
[kConnection]?: Connection;
[kCancellationToken]: CancellationToken;
Expand Down Expand Up @@ -103,8 +117,10 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
this.options = Object.freeze({
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
serverMonitoringMode: options.serverMonitoringMode
});
this.isRunningInFaasEnv = getFAASEnv() != null;

const cancellationToken = this[kCancellationToken];
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
Expand Down Expand Up @@ -207,27 +223,37 @@ function resetMonitorState(monitor: Monitor) {
monitor[kConnection] = undefined;
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
// If we have no topology version we always poll no matter
// what the user provided.
if (topologyVersion == null) return false;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

const serverMonitoringMode = monitor.options.serverMonitoringMode;
if (serverMonitoringMode === 'poll') return false;
if (serverMonitoringMode === 'stream') return true;

// If we are in auto mode, we need to figure out if we're in a FaaS
// environment or not and choose the appropriate mode.
if (monitor.isRunningInFaasEnv) return false;
return true;
}

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error) {
function failureHandler(err: Error, awaited: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
new ServerHeartbeatFailedEvent(
monitor.address,
calculateDurationInMs(start),
err,
isAwaitable
)
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
);

const error = !(err instanceof MongoError)
Expand Down Expand Up @@ -274,7 +300,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err);
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
Expand All @@ -286,15 +312,12 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

const awaited = isAwaitable && hello.topologyVersion != null;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
if (awaited) {
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
Expand All @@ -316,7 +339,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
if (err) {
monitor[kConnection] = undefined;

failureHandler(err);
failureHandler(err, false);
return;
}

Expand All @@ -337,7 +360,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
monitor.address,
calculateDurationInMs(start),
conn.hello,
false
useStreamingProtocol(monitor, conn.hello?.topologyVersion)
)
);

Expand Down Expand Up @@ -370,7 +393,7 @@ function monitorServer(monitor: Monitor) {
}

// if the check indicates streaming is supported, immediately reschedule monitoring
if (hello && hello.topologyVersion) {
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
monitor[kMonitorId]?.wake();
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './events';
import type { ServerMonitoringMode } from './monitor';
import { Server, type ServerEvents, type ServerOptions } from './server';
import { compareTopologyVersion, ServerDescription } from './server_description';
import { readPreferenceServerSelector, type ServerSelector } from './server_selection';
Expand Down Expand Up @@ -143,6 +144,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
directConnection: boolean;
loadBalanced: boolean;
metadata: ClientMetadata;
serverMonitoringMode: ServerMonitoringMode;
/** MongoDB server API version */
serverApi?: ServerApi;
[featureFlag: symbol]: any;
Expand Down
5 changes: 5 additions & 0 deletions test/lambda/mongodb/app.mjs
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as assert from 'node:assert/strict';

import { MongoClient } from 'mongodb';

// Creates the client that is cached for all requests, subscribes to
Expand Down Expand Up @@ -30,18 +32,21 @@ mongoClient.on('commandFailed', (event) => {

mongoClient.on('serverHeartbeatStarted', (event) => {
console.log('serverHeartbeatStarted', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatSucceeded', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatSucceeded', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatFailed', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatFailed', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('connectionCreated', (event) => {
Expand Down
Loading