Skip to content

Commit

Permalink
add graceful shutdown
Browse files Browse the repository at this point in the history
 Conflicts:
	packages/node/src/app/analytics-node.ts
  • Loading branch information
silesky committed Sep 29, 2022
1 parent deadc7c commit 1ebb284
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 41 deletions.
7 changes: 7 additions & 0 deletions internal/test-helpers/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/** @type { import('eslint').Linter.Config } */
module.exports = {
extends: ['../../.eslintrc'],
env: {
node: true,
},
}
1 change: 1 addition & 0 deletions internal/test-helpers/.lintstagedrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("@internal/config").lintStagedConfig
3 changes: 3 additions & 0 deletions internal/test-helpers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is for code that will used as part of testing

There is no build step included because we use ts-jest, so this could gets compiled in-memory.
3 changes: 3 additions & 0 deletions internal/test-helpers/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const { createJestTSConfig } = require('@internal/config')

module.exports = createJestTSConfig()
21 changes: 21 additions & 0 deletions internal/test-helpers/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "@internal/test-helpers",
"version": "0.0.0",
"private": true,
"engines": {
"node": ">=12"
},
"scripts": {
"lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'",
"tsc": "yarn run -T tsc",
"eslint": "yarn run -T eslint",
"concurrently": "yarn run -T concurrently"
},
"dependencies": {
"tslib": "^2.4.0"
},
"packageManager": "[email protected]",
"devDependencies": {
"@internal/config": "0.0.0"
}
}
11 changes: 11 additions & 0 deletions internal/test-helpers/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"exclude": ["node_modules", "dist"],
"compilerOptions": {
"resolveJsonModule": true,
"module": "esnext",
"target": "ES5",
"moduleResolution": "node",
"lib": ["es2020"]
}
}
17 changes: 8 additions & 9 deletions packages/core/src/callback/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import { CoreContext } from '../context'
import type { Callback } from '../events'

export function pTimeout(
cb: Promise<unknown>,
timeout: number
): Promise<unknown> {
export function pTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(Error('Promise timed out'))
}, timeout)

cb.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
}).catch(reject)
promise
.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
})
.catch(reject)
})
}

function sleep(timeoutInMs: number): Promise<void> {
export function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './plugins'
export * from './plugins/middleware'
export * from './events/interfaces'
export * from './events'
export * from './callback'
export * from './priority-queue'
export * from './context'
export * from './queue/event-queue'
Expand Down
38 changes: 32 additions & 6 deletions packages/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ https://segment.com/docs/connections/sources/catalog/libraries/server/node/

NOTE: @segment/analytics-node is unstable! do not use.

## Basic Usage
## Quick Start
```ts
// analytics.ts
import { AnalyticsNode } from '@segment/analytics-node'
Expand All @@ -21,7 +21,34 @@ analytics.track('hello world', {}, { userId: "123456" })

```

# Event Emitter (Advanced Usage)
## Graceful Shutdown
### Avoid losing events on exit!
* Call `.closeAndFlush()` to stop collecting new events and flush all existing events.
* If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved.
```ts
await analytics.closeAndFlush()
```
### Graceful Shutdown: Advanced Example
```ts
import express from 'express'
const app = express()

const server = app.listen(3000)
app.get('/', (req, res) => res.send('Hello World!'));

const onExit = async () => {
await analytics.closeAndFlush() // flush all existing events
setTimeout(() => {
server.close(() => process.exit())
}, 0);
};

process.on('SIGINT', onExit)
process.on('SIGTERM', onExit);

```

## Event Emitter
```ts
import { analytics } from './analytics'
import { ContextCancelation, CoreContext } from '@segment/analytics-node'
Expand All @@ -31,13 +58,12 @@ analytics.on('identify', (ctx) => console.log(ctx.event))

// listen for errors (if needed)
analytics.on('error', (err) => {
if (err instanceof ContextCancelation) {
console.error('event cancelled', err.logs())
} else if (err instanceof CoreContext) {
console.error('event failed', err.logs())
if (err.code === 'http_delivery') {
console.error(err.response)
} else {
console.error(err)
}
})
```


188 changes: 188 additions & 0 deletions packages/node/src/__tests__/graceful-shutdown.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { createSuccess } from './test-helpers/factories'

const fetcher = jest.fn().mockReturnValue(createSuccess())
jest.mock('node-fetch', () => fetcher)

import { AnalyticsNode } from '../app/analytics-node'
import { sleep } from './test-helpers/sleep'
import { CoreContext, CorePlugin } from '@segment/analytics-core'

const testPlugin: CorePlugin = {
type: 'after',
load: () => Promise.resolve(),
name: 'foo',
version: 'bar',
isLoaded: () => true,
}

describe('Ability for users to exit without losing events', () => {
let ajs!: AnalyticsNode
beforeEach(async () => {
jest.resetAllMocks()
ajs = new AnalyticsNode({
writeKey: 'abc123',
drainedDelay: 200,
})
await ajs.ready
})
const _helpers = {
makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => {
analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb })
},
listenOnDrain: (): Promise<undefined> => {
return new Promise((resolve) => {
ajs.once('drained', () => resolve(undefined))
})
},
}

describe('drained emitted event', () => {
test('Analytics should emit a drained event and respect the drained delay', async () => {
_helpers.makeTrackCall()
const startTime = Date.now()
const drainedCbArgs = await _helpers.listenOnDrain()
const drainedTime = Date.now() - startTime
expect(drainedTime).toBeGreaterThan(200)
expect(drainedTime).toBeLessThan(250)

expect(drainedCbArgs).toBeUndefined()
})

test('delay should be customizable', async () => {
ajs = new AnalyticsNode({
writeKey: 'abc123',
drainedDelay: 500,
})
_helpers.makeTrackCall(undefined)
const startTime = Date.now()
await _helpers.listenOnDrain()
const drainedTime = Date.now() - startTime
expect(drainedTime).toBeGreaterThan(500)
expect(drainedTime).toBeLessThan(550)
})

test('every time a new event enters the queue, the timeout should be reset (like debounce)', async () => {
const DRAINED_DELAY = 250
ajs = new AnalyticsNode({
writeKey: 'abc123',
drainedDelay: DRAINED_DELAY,
})
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(50) // should be
return ctx
},
})
await new Promise((resolve) =>
_helpers.makeTrackCall(undefined, () => resolve(undefined))
)
_helpers.makeTrackCall()

const startTime = Date.now()
await _helpers.listenOnDrain()
const drainedTime = Date.now() - startTime
expect(drainedTime).toBeGreaterThan(DRAINED_DELAY)
expect(drainedTime).toBeLessThan(DRAINED_DELAY + 200)
})

test('all callbacks should be called ', async () => {
const cb = jest.fn()
ajs.track({ userId: 'foo', event: 'bar', callback: cb })
expect(cb).not.toHaveBeenCalled()
await ajs.closeAndFlush()
expect(cb).toBeCalled()
})

test('all async callbacks should be called', async () => {
const trackCall = new Promise<CoreContext>((resolve) =>
ajs.track({
userId: 'abc',
event: 'def',
callback: (ctx) => {
return sleep(100).then(() => resolve(ctx))
},
})
)
const res = await Promise.race([ajs.closeAndFlush(), trackCall])
expect(res instanceof CoreContext).toBe(true)
})
})

describe('.closeAndFlush()', () => {
test('should auto resolve after a certain timeout', async () => {
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(1000)
return ctx
},
})
_helpers.makeTrackCall(ajs)
const startTime = Date.now()
await ajs.closeAndFlush({ timeout: 500 })
const elapsedTime = Math.round(Date.now() - startTime)
expect(elapsedTime).toBeLessThanOrEqual(510)
expect(elapsedTime).toBeGreaterThan(490)
})

test('no new events should be accepted (but existing ones should be flushed)', async () => {
let trackCallCount = 0
ajs.on('track', () => {
// track should only happen after successful dispatch
trackCallCount += 1
})
_helpers.makeTrackCall()
const closed = ajs.closeAndFlush()
_helpers.makeTrackCall() // should not trigger
_helpers.makeTrackCall() // should not trigger
await closed
expect(fetcher).toBeCalledTimes(1)
expect(trackCallCount).toBe(1)
})

test('if queue has multiple track events, all of those items should be dispatched, and drain and track events should be emitted', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCalls = 0
ajs.on('track', () => {
trackCalls++
})
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(300)
return ctx
},
})
_helpers.makeTrackCall()
_helpers.makeTrackCall()

await ajs.closeAndFlush()

expect(fetcher.mock.calls.length).toBe(2)

expect(trackCalls).toBe(2)

expect(drainedCalls).toBe(1)
})

test('if no pending events, resolves immediately', async () => {
const startTime = Date.now()
await ajs.closeAndFlush()
const elapsedTime = startTime - Date.now()
expect(elapsedTime).toBeLessThan(20)
})

test('if no pending events, drained should not be emitted an extra time when close is called', async () => {
let called = false
ajs.on('drained', () => {
called = true
})
await ajs.closeAndFlush()
expect(called).toBeFalsy()
})
})
})
4 changes: 2 additions & 2 deletions packages/node/src/__tests__/http-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ describe('Analytics Node', () => {
})

test('Track: Fires http requests to the correct endoint', async () => {
ajs.track({ event: 'track', userId: 'foo' })
ajs.track({ event: 'track', userId: 'foo', properties: { foo: 'bar' } })
ajs.track({ event: 'foo', userId: 'foo' })
ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } })
await resolveCtx(ajs, 'track')
expect(fetcher).toHaveBeenCalledWith(
'https://api.segment.io/v1/track',
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/__tests__/test-helpers/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}
Loading

0 comments on commit 1ebb284

Please sign in to comment.