From 1ebb2847318578c6123351786d4f5376dd85801c Mon Sep 17 00:00:00 2001 From: Seth Silesky Date: Wed, 28 Sep 2022 22:34:52 -0500 Subject: [PATCH] add graceful shutdown Conflicts: packages/node/src/app/analytics-node.ts --- internal/test-helpers/.eslintrc.js | 7 + internal/test-helpers/.lintstagedrc.js | 1 + internal/test-helpers/README.md | 3 + internal/test-helpers/jest.config.js | 3 + internal/test-helpers/package.json | 21 ++ internal/test-helpers/tsconfig.json | 11 + packages/core/src/callback/index.ts | 17 +- packages/core/src/index.ts | 1 + packages/node/README.md | 38 +++- .../src/__tests__/graceful-shutdown.test.ts | 188 ++++++++++++++++++ .../src/__tests__/http-integration.test.ts | 4 +- .../node/src/__tests__/test-helpers/sleep.ts | 3 + packages/node/src/app/analytics-node.ts | 70 ++++++- packages/node/src/app/plugin.ts | 17 +- packages/node/src/app/settings.ts | 2 + yarn.lock | 9 + 16 files changed, 354 insertions(+), 41 deletions(-) create mode 100644 internal/test-helpers/.eslintrc.js create mode 100644 internal/test-helpers/.lintstagedrc.js create mode 100644 internal/test-helpers/README.md create mode 100644 internal/test-helpers/jest.config.js create mode 100644 internal/test-helpers/package.json create mode 100644 internal/test-helpers/tsconfig.json create mode 100644 packages/node/src/__tests__/graceful-shutdown.test.ts create mode 100644 packages/node/src/__tests__/test-helpers/sleep.ts diff --git a/internal/test-helpers/.eslintrc.js b/internal/test-helpers/.eslintrc.js new file mode 100644 index 000000000..7e8674ea7 --- /dev/null +++ b/internal/test-helpers/.eslintrc.js @@ -0,0 +1,7 @@ +/** @type { import('eslint').Linter.Config } */ +module.exports = { + extends: ['../../.eslintrc'], + env: { + node: true, + }, +} diff --git a/internal/test-helpers/.lintstagedrc.js b/internal/test-helpers/.lintstagedrc.js new file mode 100644 index 000000000..bc1f1c780 --- /dev/null +++ b/internal/test-helpers/.lintstagedrc.js @@ -0,0 +1 @@ +module.exports = require("@internal/config").lintStagedConfig diff --git a/internal/test-helpers/README.md b/internal/test-helpers/README.md new file mode 100644 index 000000000..7aff58836 --- /dev/null +++ b/internal/test-helpers/README.md @@ -0,0 +1,3 @@ +# This is for code that will used as part of testing + +There is no build step included because we use ts-jest, so this could gets compiled in-memory. diff --git a/internal/test-helpers/jest.config.js b/internal/test-helpers/jest.config.js new file mode 100644 index 000000000..43505965a --- /dev/null +++ b/internal/test-helpers/jest.config.js @@ -0,0 +1,3 @@ +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig() diff --git a/internal/test-helpers/package.json b/internal/test-helpers/package.json new file mode 100644 index 000000000..88ddf7fcf --- /dev/null +++ b/internal/test-helpers/package.json @@ -0,0 +1,21 @@ +{ + "name": "@internal/test-helpers", + "version": "0.0.0", + "private": true, + "engines": { + "node": ">=12" + }, + "scripts": { + "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", + "tsc": "yarn run -T tsc", + "eslint": "yarn run -T eslint", + "concurrently": "yarn run -T concurrently" + }, + "dependencies": { + "tslib": "^2.4.0" + }, + "packageManager": "yarn@3.2.1", + "devDependencies": { + "@internal/config": "0.0.0" + } +} diff --git a/internal/test-helpers/tsconfig.json b/internal/test-helpers/tsconfig.json new file mode 100644 index 000000000..a79f0e174 --- /dev/null +++ b/internal/test-helpers/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "resolveJsonModule": true, + "module": "esnext", + "target": "ES5", + "moduleResolution": "node", + "lib": ["es2020"] + } +} diff --git a/packages/core/src/callback/index.ts b/packages/core/src/callback/index.ts index 1fca70a94..864f74802 100644 --- a/packages/core/src/callback/index.ts +++ b/packages/core/src/callback/index.ts @@ -1,23 +1,22 @@ import { CoreContext } from '../context' import type { Callback } from '../events' -export function pTimeout( - cb: Promise, - timeout: number -): Promise { +export function pTimeout(promise: Promise, timeout: number): Promise { return new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { reject(Error('Promise timed out')) }, timeout) - cb.then((val) => { - clearTimeout(timeoutId) - return resolve(val) - }).catch(reject) + promise + .then((val) => { + clearTimeout(timeoutId) + return resolve(val) + }) + .catch(reject) }) } -function sleep(timeoutInMs: number): Promise { +export function sleep(timeoutInMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 525ff7d97..8e6a384d4 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -4,6 +4,7 @@ export * from './plugins' export * from './plugins/middleware' export * from './events/interfaces' export * from './events' +export * from './callback' export * from './priority-queue' export * from './context' export * from './queue/event-queue' diff --git a/packages/node/README.md b/packages/node/README.md index 0173ebc25..2f701800c 100644 --- a/packages/node/README.md +++ b/packages/node/README.md @@ -5,7 +5,7 @@ https://segment.com/docs/connections/sources/catalog/libraries/server/node/ NOTE: @segment/analytics-node is unstable! do not use. -## Basic Usage +## Quick Start ```ts // analytics.ts import { AnalyticsNode } from '@segment/analytics-node' @@ -21,7 +21,34 @@ analytics.track('hello world', {}, { userId: "123456" }) ``` -# Event Emitter (Advanced Usage) +## Graceful Shutdown +### Avoid losing events on exit! + * Call `.closeAndFlush()` to stop collecting new events and flush all existing events. + * If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved. +```ts +await analytics.closeAndFlush() +``` +### Graceful Shutdown: Advanced Example +```ts +import express from 'express' +const app = express() + +const server = app.listen(3000) +app.get('/', (req, res) => res.send('Hello World!')); + +const onExit = async () => { + await analytics.closeAndFlush() // flush all existing events + setTimeout(() => { + server.close(() => process.exit()) + }, 0); +}; + +process.on('SIGINT', onExit) +process.on('SIGTERM', onExit); + +``` + +## Event Emitter ```ts import { analytics } from './analytics' import { ContextCancelation, CoreContext } from '@segment/analytics-node' @@ -31,13 +58,12 @@ analytics.on('identify', (ctx) => console.log(ctx.event)) // listen for errors (if needed) analytics.on('error', (err) => { - if (err instanceof ContextCancelation) { - console.error('event cancelled', err.logs()) - } else if (err instanceof CoreContext) { - console.error('event failed', err.logs()) + if (err.code === 'http_delivery') { + console.error(err.response) } else { console.error(err) } }) ``` + diff --git a/packages/node/src/__tests__/graceful-shutdown.test.ts b/packages/node/src/__tests__/graceful-shutdown.test.ts new file mode 100644 index 000000000..1fb7e8c19 --- /dev/null +++ b/packages/node/src/__tests__/graceful-shutdown.test.ts @@ -0,0 +1,188 @@ +import { createSuccess } from './test-helpers/factories' + +const fetcher = jest.fn().mockReturnValue(createSuccess()) +jest.mock('node-fetch', () => fetcher) + +import { AnalyticsNode } from '../app/analytics-node' +import { sleep } from './test-helpers/sleep' +import { CoreContext, CorePlugin } from '@segment/analytics-core' + +const testPlugin: CorePlugin = { + type: 'after', + load: () => Promise.resolve(), + name: 'foo', + version: 'bar', + isLoaded: () => true, +} + +describe('Ability for users to exit without losing events', () => { + let ajs!: AnalyticsNode + beforeEach(async () => { + jest.resetAllMocks() + ajs = new AnalyticsNode({ + writeKey: 'abc123', + drainedDelay: 200, + }) + await ajs.ready + }) + const _helpers = { + makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => { + analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb }) + }, + listenOnDrain: (): Promise => { + return new Promise((resolve) => { + ajs.once('drained', () => resolve(undefined)) + }) + }, + } + + describe('drained emitted event', () => { + test('Analytics should emit a drained event and respect the drained delay', async () => { + _helpers.makeTrackCall() + const startTime = Date.now() + const drainedCbArgs = await _helpers.listenOnDrain() + const drainedTime = Date.now() - startTime + expect(drainedTime).toBeGreaterThan(200) + expect(drainedTime).toBeLessThan(250) + + expect(drainedCbArgs).toBeUndefined() + }) + + test('delay should be customizable', async () => { + ajs = new AnalyticsNode({ + writeKey: 'abc123', + drainedDelay: 500, + }) + _helpers.makeTrackCall(undefined) + const startTime = Date.now() + await _helpers.listenOnDrain() + const drainedTime = Date.now() - startTime + expect(drainedTime).toBeGreaterThan(500) + expect(drainedTime).toBeLessThan(550) + }) + + test('every time a new event enters the queue, the timeout should be reset (like debounce)', async () => { + const DRAINED_DELAY = 250 + ajs = new AnalyticsNode({ + writeKey: 'abc123', + drainedDelay: DRAINED_DELAY, + }) + await ajs.register({ + ...testPlugin, + track: async (ctx) => { + await sleep(50) // should be + return ctx + }, + }) + await new Promise((resolve) => + _helpers.makeTrackCall(undefined, () => resolve(undefined)) + ) + _helpers.makeTrackCall() + + const startTime = Date.now() + await _helpers.listenOnDrain() + const drainedTime = Date.now() - startTime + expect(drainedTime).toBeGreaterThan(DRAINED_DELAY) + expect(drainedTime).toBeLessThan(DRAINED_DELAY + 200) + }) + + test('all callbacks should be called ', async () => { + const cb = jest.fn() + ajs.track({ userId: 'foo', event: 'bar', callback: cb }) + expect(cb).not.toHaveBeenCalled() + await ajs.closeAndFlush() + expect(cb).toBeCalled() + }) + + test('all async callbacks should be called', async () => { + const trackCall = new Promise((resolve) => + ajs.track({ + userId: 'abc', + event: 'def', + callback: (ctx) => { + return sleep(100).then(() => resolve(ctx)) + }, + }) + ) + const res = await Promise.race([ajs.closeAndFlush(), trackCall]) + expect(res instanceof CoreContext).toBe(true) + }) + }) + + describe('.closeAndFlush()', () => { + test('should auto resolve after a certain timeout', async () => { + await ajs.register({ + ...testPlugin, + track: async (ctx) => { + await sleep(1000) + return ctx + }, + }) + _helpers.makeTrackCall(ajs) + const startTime = Date.now() + await ajs.closeAndFlush({ timeout: 500 }) + const elapsedTime = Math.round(Date.now() - startTime) + expect(elapsedTime).toBeLessThanOrEqual(510) + expect(elapsedTime).toBeGreaterThan(490) + }) + + test('no new events should be accepted (but existing ones should be flushed)', async () => { + let trackCallCount = 0 + ajs.on('track', () => { + // track should only happen after successful dispatch + trackCallCount += 1 + }) + _helpers.makeTrackCall() + const closed = ajs.closeAndFlush() + _helpers.makeTrackCall() // should not trigger + _helpers.makeTrackCall() // should not trigger + await closed + expect(fetcher).toBeCalledTimes(1) + expect(trackCallCount).toBe(1) + }) + + test('if queue has multiple track events, all of those items should be dispatched, and drain and track events should be emitted', async () => { + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + let trackCalls = 0 + ajs.on('track', () => { + trackCalls++ + }) + await ajs.register({ + ...testPlugin, + track: async (ctx) => { + await sleep(300) + return ctx + }, + }) + _helpers.makeTrackCall() + _helpers.makeTrackCall() + + await ajs.closeAndFlush() + + expect(fetcher.mock.calls.length).toBe(2) + + expect(trackCalls).toBe(2) + + expect(drainedCalls).toBe(1) + }) + + test('if no pending events, resolves immediately', async () => { + const startTime = Date.now() + await ajs.closeAndFlush() + const elapsedTime = startTime - Date.now() + expect(elapsedTime).toBeLessThan(20) + }) + + test('if no pending events, drained should not be emitted an extra time when close is called', async () => { + let called = false + ajs.on('drained', () => { + called = true + }) + await ajs.closeAndFlush() + expect(called).toBeFalsy() + }) + }) +}) diff --git a/packages/node/src/__tests__/http-integration.test.ts b/packages/node/src/__tests__/http-integration.test.ts index 0ef162210..8a7737b97 100644 --- a/packages/node/src/__tests__/http-integration.test.ts +++ b/packages/node/src/__tests__/http-integration.test.ts @@ -88,8 +88,8 @@ describe('Analytics Node', () => { }) test('Track: Fires http requests to the correct endoint', async () => { - ajs.track({ event: 'track', userId: 'foo' }) - ajs.track({ event: 'track', userId: 'foo', properties: { foo: 'bar' } }) + ajs.track({ event: 'foo', userId: 'foo' }) + ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } }) await resolveCtx(ajs, 'track') expect(fetcher).toHaveBeenCalledWith( 'https://api.segment.io/v1/track', diff --git a/packages/node/src/__tests__/test-helpers/sleep.ts b/packages/node/src/__tests__/test-helpers/sleep.ts new file mode 100644 index 000000000..44990d558 --- /dev/null +++ b/packages/node/src/__tests__/test-helpers/sleep.ts @@ -0,0 +1,3 @@ +export function sleep(timeoutInMs: number): Promise { + return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) +} diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index 3baaae1df..04cb81735 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -14,9 +14,10 @@ import { bindAll, PriorityQueue, CoreEmitterContract, + pTimeout, } from '@segment/analytics-core' import { AnalyticsNodeSettings, validateSettings } from './settings' -import { analyticsNode, AnalyticsNodePluginSettings } from './plugin' +import { analyticsNode } from './plugin' import { version } from '../../package.json' import { NodeEmittedError } from './emitted-errors' @@ -42,6 +43,7 @@ export interface NodeSegmentEventOptions { */ type NodeEmitterEvents = CoreEmitterContract & { initialize: [AnalyticsNodeSettings] + drained: [] } class NodePriorityQueue extends PriorityQueue { @@ -70,25 +72,25 @@ export class AnalyticsNode implements CoreAnalytics { private _eventFactory: EventFactory + private _drainedTimeout?: ReturnType + private _drainedDelay: number + private _isClosed = false + private _pendingEvents = 0 queue: EventQueue ready: Promise constructor(settings: AnalyticsNodeSettings) { - validateSettings(settings) super() + validateSettings(settings) + this._drainedDelay = settings.drainedDelay ?? 500 this._eventFactory = new EventFactory() this.queue = new EventQueue(new NodePriorityQueue(3)) - const nodeSettings: AnalyticsNodePluginSettings = { - name: 'analytics-node-next', - type: 'after', - version: 'latest', - writeKey: settings.writeKey, - } - - this.ready = this.register(analyticsNode(nodeSettings, this)) + this.ready = this.register( + analyticsNode({ writeKey: settings.writeKey }, this) + ) .then(() => undefined) .catch((err) => { console.error(err) @@ -103,11 +105,57 @@ export class AnalyticsNode return version } + /** + * Call this method to stop collecting new events and flush all existing events. + * If a callback on an event call is incluced, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved. + */ + public closeAndFlush({ + timeout, + }: { + /** Maximum time permitted to wait before resolving. */ + timeout?: number + } = {}): Promise { + this._isClosed = true + const promise = new Promise((resolve) => { + if (!this._pendingEvents) { + resolve() + } else { + this.once('drained', () => resolve()) + } + }) + if (timeout) { + return pTimeout(promise, timeout).catch(() => undefined) + } else { + return promise + } + } + private _dispatch(segmentEvent: CoreSegmentEvent, callback?: Callback) { + if (this._isClosed) { + return undefined + } + + this._pendingEvents++ + + if (this._drainedTimeout) { + clearTimeout(this._drainedTimeout) + } + dispatchAndEmit(segmentEvent, this.queue, this, { callback: callback, - }).catch((err) => err) // we ignore errors, since we have an event emitter + }) + .catch((ctx) => ctx) + .finally(() => { + this._pendingEvents-- + + if (!this._pendingEvents) { + this._drainedTimeout = setTimeout(() => { + this.emit('drained') + }, this._drainedDelay) + } + }) } + /** * Combines two unassociated user identities. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#alias diff --git a/packages/node/src/app/plugin.ts b/packages/node/src/app/plugin.ts index f0bf2c5c4..372d4123e 100644 --- a/packages/node/src/app/plugin.ts +++ b/packages/node/src/app/plugin.ts @@ -1,20 +1,12 @@ import { CorePlugin, CoreSegmentEvent, - PluginType, CoreContext, } from '@segment/analytics-core' import fetch, { Response } from 'node-fetch' import { version } from '../../package.json' import { AnalyticsNode } from './analytics-node' -export interface AnalyticsNodePluginSettings { - writeKey: string - name: string - type: PluginType - version: string -} - const btoa = (val: string): string => Buffer.from(val).toString('base64') export async function post( @@ -39,7 +31,7 @@ export async function post( } export function analyticsNode( - settings: AnalyticsNodePluginSettings, + settings: { writeKey: string }, analytics: AnalyticsNode ): CorePlugin { const send = async (ctx: CoreContext): Promise => { @@ -60,10 +52,9 @@ export function analyticsNode( } return { - name: settings.name, - type: settings.type, - version: settings.version, - + name: 'analytics-node-next', + type: 'after', + version: '1.0.0', load: (ctx) => Promise.resolve(ctx), isLoaded: () => true, diff --git a/packages/node/src/app/settings.ts b/packages/node/src/app/settings.ts index fc5a229b2..ef602e9e3 100644 --- a/packages/node/src/app/settings.ts +++ b/packages/node/src/app/settings.ts @@ -4,6 +4,8 @@ export interface AnalyticsNodeSettings { writeKey: string timeout?: number plugins?: CorePlugin[] + /** Number of ms to wait for the queue to be empty before emitting a 'drained' event */ + drainedDelay?: number } export const validateSettings = (settings: AnalyticsNodeSettings) => { diff --git a/yarn.lock b/yarn.lock index bfca2277a..6b3700e0f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -955,6 +955,15 @@ __metadata: languageName: unknown linkType: soft +"@internal/test-helpers@workspace:internal/test-helpers": + version: 0.0.0-use.local + resolution: "@internal/test-helpers@workspace:internal/test-helpers" + dependencies: + "@internal/config": 0.0.0 + tslib: ^2.4.0 + languageName: unknown + linkType: soft + "@istanbuljs/load-nyc-config@npm:^1.0.0": version: 1.1.0 resolution: "@istanbuljs/load-nyc-config@npm:1.1.0"