Skip to content

Commit

Permalink
add graceful shutdown (#604)
Browse files Browse the repository at this point in the history
* add graceful shutdown

Co-authored-by: Seth Silesky <[email protected]>
  • Loading branch information
silesky and silesky authored Oct 4, 2022
1 parent 9486cff commit a23ccee
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 50 deletions.
7 changes: 7 additions & 0 deletions internal/test-helpers/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/** @type { import('eslint').Linter.Config } */
module.exports = {
extends: ['../../.eslintrc'],
env: {
node: true,
},
}
1 change: 1 addition & 0 deletions internal/test-helpers/.lintstagedrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("@internal/config").lintStagedConfig
3 changes: 3 additions & 0 deletions internal/test-helpers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is for code that will used as part of testing

There is no build step included because we use ts-jest, so this could gets compiled in-memory.
3 changes: 3 additions & 0 deletions internal/test-helpers/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const { createJestTSConfig } = require('@internal/config')

module.exports = createJestTSConfig()
21 changes: 21 additions & 0 deletions internal/test-helpers/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "@internal/test-helpers",
"version": "0.0.0",
"private": true,
"engines": {
"node": ">=12"
},
"scripts": {
"lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'",
"tsc": "yarn run -T tsc",
"eslint": "yarn run -T eslint",
"concurrently": "yarn run -T concurrently"
},
"dependencies": {
"tslib": "^2.4.0"
},
"packageManager": "[email protected]",
"devDependencies": {
"@internal/config": "0.0.0"
}
}
11 changes: 11 additions & 0 deletions internal/test-helpers/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"exclude": ["node_modules", "dist"],
"compilerOptions": {
"resolveJsonModule": true,
"module": "esnext",
"target": "ES5",
"moduleResolution": "node",
"lib": ["es2020"]
}
}
17 changes: 8 additions & 9 deletions packages/core/src/callback/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import { CoreContext } from '../context'
import type { Callback } from '../events'

export function pTimeout(
cb: Promise<unknown>,
timeout: number
): Promise<unknown> {
export function pTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(Error('Promise timed out'))
}, timeout)

cb.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
}).catch(reject)
promise
.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
})
.catch(reject)
})
}

function sleep(timeoutInMs: number): Promise<void> {
export function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './plugins'
export * from './plugins/middleware'
export * from './events/interfaces'
export * from './events'
export * from './callback'
export * from './priority-queue'
export * from './context'
export * from './queue/event-queue'
Expand Down
96 changes: 81 additions & 15 deletions packages/node/README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,94 @@
# TODO: API Documentation is out of date

https://segment.com/docs/connections/sources/catalog/libraries/server/node/
## Warning: Until 1.x release, use this library at your own risk!
While the API is very similar, the documentation for the legacy SDK (`analytics-node`) is here: https://segment.com/docs/connections/sources/catalog/libraries/server/node/


NOTE: @segment/analytics-node is unstable! do not use.
## Quick Start
### Install library
```bash
# npm
npm install @segment/analytics-node
# yarn
yarn add @segment/analytics-node
# pnpm
pnpm install @segment/analytics-node
```

## Basic Usage
### Usage (assuming some express-like web framework)
```ts
// analytics.ts
import { AnalyticsNode } from '@segment/analytics-node'

export const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })
const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })


// app.ts
import { analytics } from './analytics'
app.post('/login', (req, res) => {
analytics.identify({
userId: req.body.userId,
previousId: req.body.previousId
})
})

app.post('/cart', (req, res) => {
analytics.track({
userId: req.body.userId,
event: 'Add to cart',
properties: { productId: '123456' }
})
});
```

## Graceful Shutdown
### Avoid losing events on exit!
* Call `.closeAndFlush()` to stop collecting new events and flush all existing events.
* If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved.
```ts
await analytics.closeAndFlush()
// or
await analytics.closeAndFlush({ timeout: 5000 }) // force resolve after 5000ms
```
### Graceful Shutdown: Advanced Example
```ts
import { AnalyticsNode } from '@segment/analytics-node'
import express from 'express'

const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })

analytics.identify('Test User', { loggedIn: true }, { userId: "123456" })
analytics.track('hello world', {}, { userId: "123456" })
const app = express()
app.post('/cart', (req, res) => {
analytics.track({
userId: req.body.userId,
event: 'Add to cart',
properties: { productId: '123456' }
})
});

const server = app.listen(3000)


const onExit = async () => {
console.log("Gracefully closing server...");
await analytics.closeAndFlush() // flush all existing events
server.close(() => process.exit());
};

process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);
```

# Event Emitter (Advanced Usage)
#### Collecting unflushed events
If you absolutely need to preserve all possible events in the event of a forced timeout, even ones that came in after `analytics.closeAndFlush()` was called, you can collect those events.
```ts
const unflushedEvents = []

analytics.on('call_after_close', (event) => unflushedEvents.push(events))
await analytics.closeAndFlush()

console.log(unflushedEvents) // all events that came in after closeAndFlush was called

```


## Event Emitter
```ts
import { analytics } from './analytics'
import { ContextCancelation, CoreContext } from '@segment/analytics-node'
Expand All @@ -31,13 +98,12 @@ analytics.on('identify', (ctx) => console.log(ctx.event))

// listen for errors (if needed)
analytics.on('error', (err) => {
if (err instanceof ContextCancelation) {
console.error('event cancelled', err.logs())
} else if (err instanceof CoreContext) {
console.error('event failed', err.logs())
if (err.code === 'http_delivery') {
console.error(err.response)
} else {
console.error(err)
}
})
```


165 changes: 165 additions & 0 deletions packages/node/src/__tests__/graceful-shutdown-integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { createSuccess } from './test-helpers/factories'

const fetcher = jest.fn().mockReturnValue(createSuccess())
jest.mock('node-fetch', () => fetcher)

import { AnalyticsNode, NodeSegmentEvent } from '../app/analytics-node'
import { sleep } from './test-helpers/sleep'
import { CoreContext, CorePlugin } from '@segment/analytics-core'

const testPlugin: CorePlugin = {
type: 'after',
load: () => Promise.resolve(),
name: 'foo',
version: 'bar',
isLoaded: () => true,
}

describe('Ability for users to exit without losing events', () => {
let ajs!: AnalyticsNode
beforeEach(async () => {
jest.resetAllMocks()
ajs = new AnalyticsNode({
writeKey: 'abc123',
})
})
const _helpers = {
makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => {
analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb })
},
}

describe('drained emitted event', () => {
test('emits a drained event if only one event is dispatched', async () => {
_helpers.makeTrackCall()
return expect(
new Promise((resolve) => ajs.once('drained', () => resolve(undefined)))
).resolves.toBe(undefined)
})

test('emits a drained event if multiple events are dispatched', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
_helpers.makeTrackCall()
_helpers.makeTrackCall()
_helpers.makeTrackCall()
await sleep(200)
expect(drainedCalls).toBe(1)
})

test('all callbacks should be called ', async () => {
const cb = jest.fn()
ajs.track({ userId: 'foo', event: 'bar', callback: cb })
expect(cb).not.toHaveBeenCalled()
await ajs.closeAndFlush()
expect(cb).toBeCalled()
})

test('all async callbacks should be called', async () => {
const trackCall = new Promise<CoreContext>((resolve) =>
ajs.track({
userId: 'abc',
event: 'def',
callback: (ctx) => {
return sleep(100).then(() => resolve(ctx))
},
})
)
const res = await Promise.race([ajs.closeAndFlush(), trackCall])
expect(res instanceof CoreContext).toBe(true)
})
})

describe('.closeAndFlush()', () => {
test('should force resolve if method call execution time exceeds specified timeout', async () => {
const TIMEOUT = 300
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(1000)
return ctx
},
})
_helpers.makeTrackCall(ajs)
const startTime = Date.now()
await ajs.closeAndFlush({ timeout: TIMEOUT })
const elapsedTime = Math.round(Date.now() - startTime)
expect(elapsedTime).toBeLessThanOrEqual(TIMEOUT + 10)
expect(elapsedTime).toBeGreaterThan(TIMEOUT - 10)
})

test('no new events should be accepted (but existing ones should be flushed)', async () => {
let trackCallCount = 0
ajs.on('track', () => {
// track should only happen after successful dispatch
trackCallCount += 1
})
_helpers.makeTrackCall()
const closed = ajs.closeAndFlush()
_helpers.makeTrackCall() // should not trigger
_helpers.makeTrackCall() // should not trigger
await closed
expect(fetcher).toBeCalledTimes(1)
expect(trackCallCount).toBe(1)
})
test('any events created after close should be emitted', async () => {
const events: NodeSegmentEvent[] = []
ajs.on('call_after_close', (event) => {
events.push(event)
})
_helpers.makeTrackCall()
const closed = ajs.closeAndFlush()
_helpers.makeTrackCall() // should be emitted
_helpers.makeTrackCall() // should be emitted
expect(events.length).toBe(2)
expect(events.every((e) => e.type === 'track')).toBeTruthy()
await closed
})

test('if queue has multiple track events, all of those items should be dispatched, and drain and track events should be emitted', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCalls = 0
ajs.on('track', () => {
trackCalls++
})
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(300)
return ctx
},
})
_helpers.makeTrackCall()
_helpers.makeTrackCall()

await ajs.closeAndFlush()

expect(fetcher.mock.calls.length).toBe(2)

expect(trackCalls).toBe(2)

expect(drainedCalls).toBe(1)
})

test('if no pending events, resolves immediately', async () => {
const startTime = Date.now()
await ajs.closeAndFlush()
const elapsedTime = startTime - Date.now()
expect(elapsedTime).toBeLessThan(20)
})

test('if no pending events, drained should not be emitted an extra time when close is called', async () => {
let called = false
ajs.on('drained', () => {
called = true
})
await ajs.closeAndFlush()
expect(called).toBeFalsy()
})
})
})
4 changes: 2 additions & 2 deletions packages/node/src/__tests__/http-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ describe('Analytics Node', () => {
})

test('Track: Fires http requests to the correct endoint', async () => {
ajs.track({ event: 'track', userId: 'foo' })
ajs.track({ event: 'track', userId: 'foo', properties: { foo: 'bar' } })
ajs.track({ event: 'foo', userId: 'foo' })
ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } })
await resolveCtx(ajs, 'track')
expect(fetcher).toHaveBeenCalledWith(
'https://api.segment.io/v1/track',
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/__tests__/test-helpers/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}
Loading

0 comments on commit a23ccee

Please sign in to comment.