diff --git a/internal/test-helpers/.eslintrc.js b/internal/test-helpers/.eslintrc.js new file mode 100644 index 000000000..7e8674ea7 --- /dev/null +++ b/internal/test-helpers/.eslintrc.js @@ -0,0 +1,7 @@ +/** @type { import('eslint').Linter.Config } */ +module.exports = { + extends: ['../../.eslintrc'], + env: { + node: true, + }, +} diff --git a/internal/test-helpers/.lintstagedrc.js b/internal/test-helpers/.lintstagedrc.js new file mode 100644 index 000000000..bc1f1c780 --- /dev/null +++ b/internal/test-helpers/.lintstagedrc.js @@ -0,0 +1 @@ +module.exports = require("@internal/config").lintStagedConfig diff --git a/internal/test-helpers/README.md b/internal/test-helpers/README.md new file mode 100644 index 000000000..7aff58836 --- /dev/null +++ b/internal/test-helpers/README.md @@ -0,0 +1,3 @@ +# This is for code that will used as part of testing + +There is no build step included because we use ts-jest, so this could gets compiled in-memory. diff --git a/internal/test-helpers/jest.config.js b/internal/test-helpers/jest.config.js new file mode 100644 index 000000000..43505965a --- /dev/null +++ b/internal/test-helpers/jest.config.js @@ -0,0 +1,3 @@ +const { createJestTSConfig } = require('@internal/config') + +module.exports = createJestTSConfig() diff --git a/internal/test-helpers/package.json b/internal/test-helpers/package.json new file mode 100644 index 000000000..88ddf7fcf --- /dev/null +++ b/internal/test-helpers/package.json @@ -0,0 +1,21 @@ +{ + "name": "@internal/test-helpers", + "version": "0.0.0", + "private": true, + "engines": { + "node": ">=12" + }, + "scripts": { + "lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'", + "tsc": "yarn run -T tsc", + "eslint": "yarn run -T eslint", + "concurrently": "yarn run -T concurrently" + }, + "dependencies": { + "tslib": "^2.4.0" + }, + "packageManager": "yarn@3.2.1", + "devDependencies": { + "@internal/config": "0.0.0" + } +} diff --git a/internal/test-helpers/tsconfig.json b/internal/test-helpers/tsconfig.json new file mode 100644 index 000000000..a79f0e174 --- /dev/null +++ b/internal/test-helpers/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "resolveJsonModule": true, + "module": "esnext", + "target": "ES5", + "moduleResolution": "node", + "lib": ["es2020"] + } +} diff --git a/packages/core/src/callback/index.ts b/packages/core/src/callback/index.ts index 1fca70a94..864f74802 100644 --- a/packages/core/src/callback/index.ts +++ b/packages/core/src/callback/index.ts @@ -1,23 +1,22 @@ import { CoreContext } from '../context' import type { Callback } from '../events' -export function pTimeout( - cb: Promise, - timeout: number -): Promise { +export function pTimeout(promise: Promise, timeout: number): Promise { return new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { reject(Error('Promise timed out')) }, timeout) - cb.then((val) => { - clearTimeout(timeoutId) - return resolve(val) - }).catch(reject) + promise + .then((val) => { + clearTimeout(timeoutId) + return resolve(val) + }) + .catch(reject) }) } -function sleep(timeoutInMs: number): Promise { +export function sleep(timeoutInMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 525ff7d97..8e6a384d4 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -4,6 +4,7 @@ export * from './plugins' export * from './plugins/middleware' export * from './events/interfaces' export * from './events' +export * from './callback' export * from './priority-queue' export * from './context' export * from './queue/event-queue' diff --git a/packages/node/README.md b/packages/node/README.md index 0173ebc25..52711242b 100644 --- a/packages/node/README.md +++ b/packages/node/README.md @@ -1,27 +1,94 @@ -# TODO: API Documentation is out of date -https://segment.com/docs/connections/sources/catalog/libraries/server/node/ +## Warning: Until 1.x release, use this library at your own risk! +While the API is very similar, the documentation for the legacy SDK (`analytics-node`) is here: https://segment.com/docs/connections/sources/catalog/libraries/server/node/ -NOTE: @segment/analytics-node is unstable! do not use. +## Quick Start +### Install library +```bash +# npm +npm install @segment/analytics-node +# yarn +yarn add @segment/analytics-node +# pnpm +pnpm install @segment/analytics-node +``` -## Basic Usage +### Usage (assuming some express-like web framework) ```ts -// analytics.ts import { AnalyticsNode } from '@segment/analytics-node' -export const analytics = new AnalyticsNode({ writeKey: '' }) +const analytics = new AnalyticsNode({ writeKey: '' }) -// app.ts -import { analytics } from './analytics' +app.post('/login', (req, res) => { + analytics.identify({ + userId: req.body.userId, + previousId: req.body.previousId + }) +}) + +app.post('/cart', (req, res) => { + analytics.track({ + userId: req.body.userId, + event: 'Add to cart', + properties: { productId: '123456' } + }) +}); +``` + +## Graceful Shutdown +### Avoid losing events on exit! + * Call `.closeAndFlush()` to stop collecting new events and flush all existing events. + * If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved. +```ts +await analytics.closeAndFlush() +// or +await analytics.closeAndFlush({ timeout: 5000 }) // force resolve after 5000ms +``` +### Graceful Shutdown: Advanced Example +```ts +import { AnalyticsNode } from '@segment/analytics-node' +import express from 'express' + +const analytics = new AnalyticsNode({ writeKey: '' }) -analytics.identify('Test User', { loggedIn: true }, { userId: "123456" }) -analytics.track('hello world', {}, { userId: "123456" }) +const app = express() +app.post('/cart', (req, res) => { + analytics.track({ + userId: req.body.userId, + event: 'Add to cart', + properties: { productId: '123456' } + }) +}); +const server = app.listen(3000) + + +const onExit = async () => { + console.log("Gracefully closing server..."); + await analytics.closeAndFlush() // flush all existing events + server.close(() => process.exit()); +}; + +process.on("SIGINT", onExit); +process.on("SIGTERM", onExit); ``` -# Event Emitter (Advanced Usage) +#### Collecting unflushed events +If you absolutely need to preserve all possible events in the event of a forced timeout, even ones that came in after `analytics.closeAndFlush()` was called, you can collect those events. +```ts +const unflushedEvents = [] + +analytics.on('call_after_close', (event) => unflushedEvents.push(events)) +await analytics.closeAndFlush() + +console.log(unflushedEvents) // all events that came in after closeAndFlush was called + +``` + + +## Event Emitter ```ts import { analytics } from './analytics' import { ContextCancelation, CoreContext } from '@segment/analytics-node' @@ -31,13 +98,12 @@ analytics.on('identify', (ctx) => console.log(ctx.event)) // listen for errors (if needed) analytics.on('error', (err) => { - if (err instanceof ContextCancelation) { - console.error('event cancelled', err.logs()) - } else if (err instanceof CoreContext) { - console.error('event failed', err.logs()) + if (err.code === 'http_delivery') { + console.error(err.response) } else { console.error(err) } }) ``` + diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts new file mode 100644 index 000000000..1a0cac06d --- /dev/null +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -0,0 +1,165 @@ +import { createSuccess } from './test-helpers/factories' + +const fetcher = jest.fn().mockReturnValue(createSuccess()) +jest.mock('node-fetch', () => fetcher) + +import { AnalyticsNode, NodeSegmentEvent } from '../app/analytics-node' +import { sleep } from './test-helpers/sleep' +import { CoreContext, CorePlugin } from '@segment/analytics-core' + +const testPlugin: CorePlugin = { + type: 'after', + load: () => Promise.resolve(), + name: 'foo', + version: 'bar', + isLoaded: () => true, +} + +describe('Ability for users to exit without losing events', () => { + let ajs!: AnalyticsNode + beforeEach(async () => { + jest.resetAllMocks() + ajs = new AnalyticsNode({ + writeKey: 'abc123', + }) + }) + const _helpers = { + makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => { + analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb }) + }, + } + + describe('drained emitted event', () => { + test('emits a drained event if only one event is dispatched', async () => { + _helpers.makeTrackCall() + return expect( + new Promise((resolve) => ajs.once('drained', () => resolve(undefined))) + ).resolves.toBe(undefined) + }) + + test('emits a drained event if multiple events are dispatched', async () => { + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + _helpers.makeTrackCall() + _helpers.makeTrackCall() + _helpers.makeTrackCall() + await sleep(200) + expect(drainedCalls).toBe(1) + }) + + test('all callbacks should be called ', async () => { + const cb = jest.fn() + ajs.track({ userId: 'foo', event: 'bar', callback: cb }) + expect(cb).not.toHaveBeenCalled() + await ajs.closeAndFlush() + expect(cb).toBeCalled() + }) + + test('all async callbacks should be called', async () => { + const trackCall = new Promise((resolve) => + ajs.track({ + userId: 'abc', + event: 'def', + callback: (ctx) => { + return sleep(100).then(() => resolve(ctx)) + }, + }) + ) + const res = await Promise.race([ajs.closeAndFlush(), trackCall]) + expect(res instanceof CoreContext).toBe(true) + }) + }) + + describe('.closeAndFlush()', () => { + test('should force resolve if method call execution time exceeds specified timeout', async () => { + const TIMEOUT = 300 + await ajs.register({ + ...testPlugin, + track: async (ctx) => { + await sleep(1000) + return ctx + }, + }) + _helpers.makeTrackCall(ajs) + const startTime = Date.now() + await ajs.closeAndFlush({ timeout: TIMEOUT }) + const elapsedTime = Math.round(Date.now() - startTime) + expect(elapsedTime).toBeLessThanOrEqual(TIMEOUT + 10) + expect(elapsedTime).toBeGreaterThan(TIMEOUT - 10) + }) + + test('no new events should be accepted (but existing ones should be flushed)', async () => { + let trackCallCount = 0 + ajs.on('track', () => { + // track should only happen after successful dispatch + trackCallCount += 1 + }) + _helpers.makeTrackCall() + const closed = ajs.closeAndFlush() + _helpers.makeTrackCall() // should not trigger + _helpers.makeTrackCall() // should not trigger + await closed + expect(fetcher).toBeCalledTimes(1) + expect(trackCallCount).toBe(1) + }) + test('any events created after close should be emitted', async () => { + const events: NodeSegmentEvent[] = [] + ajs.on('call_after_close', (event) => { + events.push(event) + }) + _helpers.makeTrackCall() + const closed = ajs.closeAndFlush() + _helpers.makeTrackCall() // should be emitted + _helpers.makeTrackCall() // should be emitted + expect(events.length).toBe(2) + expect(events.every((e) => e.type === 'track')).toBeTruthy() + await closed + }) + + test('if queue has multiple track events, all of those items should be dispatched, and drain and track events should be emitted', async () => { + let drainedCalls = 0 + ajs.on('drained', () => { + drainedCalls++ + }) + let trackCalls = 0 + ajs.on('track', () => { + trackCalls++ + }) + await ajs.register({ + ...testPlugin, + track: async (ctx) => { + await sleep(300) + return ctx + }, + }) + _helpers.makeTrackCall() + _helpers.makeTrackCall() + + await ajs.closeAndFlush() + + expect(fetcher.mock.calls.length).toBe(2) + + expect(trackCalls).toBe(2) + + expect(drainedCalls).toBe(1) + }) + + test('if no pending events, resolves immediately', async () => { + const startTime = Date.now() + await ajs.closeAndFlush() + const elapsedTime = startTime - Date.now() + expect(elapsedTime).toBeLessThan(20) + }) + + test('if no pending events, drained should not be emitted an extra time when close is called', async () => { + let called = false + ajs.on('drained', () => { + called = true + }) + await ajs.closeAndFlush() + expect(called).toBeFalsy() + }) + }) +}) diff --git a/packages/node/src/__tests__/http-integration.test.ts b/packages/node/src/__tests__/http-integration.test.ts index 0ef162210..8a7737b97 100644 --- a/packages/node/src/__tests__/http-integration.test.ts +++ b/packages/node/src/__tests__/http-integration.test.ts @@ -88,8 +88,8 @@ describe('Analytics Node', () => { }) test('Track: Fires http requests to the correct endoint', async () => { - ajs.track({ event: 'track', userId: 'foo' }) - ajs.track({ event: 'track', userId: 'foo', properties: { foo: 'bar' } }) + ajs.track({ event: 'foo', userId: 'foo' }) + ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } }) await resolveCtx(ajs, 'track') expect(fetcher).toHaveBeenCalledWith( 'https://api.segment.io/v1/track', diff --git a/packages/node/src/__tests__/test-helpers/sleep.ts b/packages/node/src/__tests__/test-helpers/sleep.ts new file mode 100644 index 000000000..44990d558 --- /dev/null +++ b/packages/node/src/__tests__/test-helpers/sleep.ts @@ -0,0 +1,3 @@ +export function sleep(timeoutInMs: number): Promise { + return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) +} diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index 3baaae1df..5d8f3760f 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -14,9 +14,10 @@ import { bindAll, PriorityQueue, CoreEmitterContract, + pTimeout, } from '@segment/analytics-core' import { AnalyticsNodeSettings, validateSettings } from './settings' -import { analyticsNode, AnalyticsNodePluginSettings } from './plugin' +import { analyticsNode } from './plugin' import { version } from '../../package.json' import { NodeEmittedError } from './emitted-errors' @@ -42,6 +43,8 @@ export interface NodeSegmentEventOptions { */ type NodeEmitterEvents = CoreEmitterContract & { initialize: [AnalyticsNodeSettings] + call_after_close: [NodeSegmentEvent] // any event that did not get dispatched due to close + drained: [] } class NodePriorityQueue extends PriorityQueue { @@ -70,25 +73,22 @@ export class AnalyticsNode implements CoreAnalytics { private _eventFactory: EventFactory + private _isClosed = false + private _pendingEvents = 0 queue: EventQueue ready: Promise constructor(settings: AnalyticsNodeSettings) { - validateSettings(settings) super() + validateSettings(settings) this._eventFactory = new EventFactory() this.queue = new EventQueue(new NodePriorityQueue(3)) - const nodeSettings: AnalyticsNodePluginSettings = { - name: 'analytics-node-next', - type: 'after', - version: 'latest', - writeKey: settings.writeKey, - } - - this.ready = this.register(analyticsNode(nodeSettings, this)) + this.ready = this.register( + analyticsNode({ writeKey: settings.writeKey }, this) + ) .then(() => undefined) .catch((err) => { console.error(err) @@ -103,11 +103,49 @@ export class AnalyticsNode return version } + /** + * Call this method to stop collecting new events and flush all existing events. + * This method also waits for any event method-specific callbacks to be triggered, + * and any of their subsequent promises to be resolved/rejected. + */ + public closeAndFlush({ + timeout, + }: { + /** Set a maximum time permitted to wait before resolving. Default = no maximum. */ + timeout?: number + } = {}): Promise { + this._isClosed = true + const promise = new Promise((resolve) => { + if (!this._pendingEvents) { + resolve() + } else { + this.once('drained', () => resolve()) + } + }) + return timeout ? pTimeout(promise, timeout).catch(() => undefined) : promise + } + private _dispatch(segmentEvent: CoreSegmentEvent, callback?: Callback) { + if (this._isClosed) { + this.emit('call_after_close', segmentEvent as NodeSegmentEvent) + return undefined + } + + this._pendingEvents++ + dispatchAndEmit(segmentEvent, this.queue, this, { callback: callback, - }).catch((err) => err) // we ignore errors, since we have an event emitter + }) + .catch((ctx) => ctx) + .finally(() => { + this._pendingEvents-- + + if (!this._pendingEvents) { + this.emit('drained') + } + }) } + /** * Combines two unassociated user identities. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#alias diff --git a/packages/node/src/app/plugin.ts b/packages/node/src/app/plugin.ts index f0bf2c5c4..372d4123e 100644 --- a/packages/node/src/app/plugin.ts +++ b/packages/node/src/app/plugin.ts @@ -1,20 +1,12 @@ import { CorePlugin, CoreSegmentEvent, - PluginType, CoreContext, } from '@segment/analytics-core' import fetch, { Response } from 'node-fetch' import { version } from '../../package.json' import { AnalyticsNode } from './analytics-node' -export interface AnalyticsNodePluginSettings { - writeKey: string - name: string - type: PluginType - version: string -} - const btoa = (val: string): string => Buffer.from(val).toString('base64') export async function post( @@ -39,7 +31,7 @@ export async function post( } export function analyticsNode( - settings: AnalyticsNodePluginSettings, + settings: { writeKey: string }, analytics: AnalyticsNode ): CorePlugin { const send = async (ctx: CoreContext): Promise => { @@ -60,10 +52,9 @@ export function analyticsNode( } return { - name: settings.name, - type: settings.type, - version: settings.version, - + name: 'analytics-node-next', + type: 'after', + version: '1.0.0', load: (ctx) => Promise.resolve(ctx), isLoaded: () => true, diff --git a/yarn.lock b/yarn.lock index bfca2277a..6b3700e0f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -955,6 +955,15 @@ __metadata: languageName: unknown linkType: soft +"@internal/test-helpers@workspace:internal/test-helpers": + version: 0.0.0-use.local + resolution: "@internal/test-helpers@workspace:internal/test-helpers" + dependencies: + "@internal/config": 0.0.0 + tslib: ^2.4.0 + languageName: unknown + linkType: soft + "@istanbuljs/load-nyc-config@npm:^1.0.0": version: 1.1.0 resolution: "@istanbuljs/load-nyc-config@npm:1.1.0"