Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add graceful shutdown #604

Merged
merged 19 commits into from
Oct 4, 2022
Merged
7 changes: 7 additions & 0 deletions internal/test-helpers/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/** @type { import('eslint').Linter.Config } */
module.exports = {
extends: ['../../.eslintrc'],
env: {
node: true,
},
}
1 change: 1 addition & 0 deletions internal/test-helpers/.lintstagedrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("@internal/config").lintStagedConfig
3 changes: 3 additions & 0 deletions internal/test-helpers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is for code that will used as part of testing

There is no build step included because we use ts-jest, so this could gets compiled in-memory.
3 changes: 3 additions & 0 deletions internal/test-helpers/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const { createJestTSConfig } = require('@internal/config')

module.exports = createJestTSConfig()
21 changes: 21 additions & 0 deletions internal/test-helpers/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "@internal/test-helpers",
"version": "0.0.0",
"private": true,
"engines": {
"node": ">=12"
},
"scripts": {
"lint": "yarn concurrently 'yarn:eslint .' 'yarn:tsc --noEmit'",
"tsc": "yarn run -T tsc",
"eslint": "yarn run -T eslint",
"concurrently": "yarn run -T concurrently"
},
"dependencies": {
"tslib": "^2.4.0"
},
"packageManager": "[email protected]",
"devDependencies": {
"@internal/config": "0.0.0"
}
}
11 changes: 11 additions & 0 deletions internal/test-helpers/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"exclude": ["node_modules", "dist"],
"compilerOptions": {
"resolveJsonModule": true,
"module": "esnext",
"target": "ES5",
"moduleResolution": "node",
"lib": ["es2020"]
}
}
17 changes: 8 additions & 9 deletions packages/core/src/callback/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import { CoreContext } from '../context'
import type { Callback } from '../events'

export function pTimeout(
cb: Promise<unknown>,
timeout: number
): Promise<unknown> {
export function pTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(Error('Promise timed out'))
}, timeout)

cb.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
}).catch(reject)
promise
.then((val) => {
clearTimeout(timeoutId)
return resolve(val)
})
.catch(reject)
})
}

function sleep(timeoutInMs: number): Promise<void> {
export function sleep(timeoutInMs: number): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, didn't realize we had sleep defined here too. In the browser package we had it in lib/sleep.ts as well so wouldn't need it defined it its version of this core file like we do currently 😄

Copy link
Contributor Author

@silesky silesky Sep 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we define sleep everywhere!
I figured we would make core have the canonical version and remove the other ones — thoughts? Or maybe we create a utils package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me!

return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './plugins'
export * from './plugins/middleware'
export * from './events/interfaces'
export * from './events'
export * from './callback'
export * from './priority-queue'
export * from './context'
export * from './queue/event-queue'
Expand Down
96 changes: 81 additions & 15 deletions packages/node/README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,94 @@
# TODO: API Documentation is out of date

https://segment.com/docs/connections/sources/catalog/libraries/server/node/
## Warning: Until 1.x release, use this library at your own risk!
While the API is very similar, the documentation for the legacy SDK (`analytics-node`) is here: https://segment.com/docs/connections/sources/catalog/libraries/server/node/


NOTE: @segment/analytics-node is unstable! do not use.
## Quick Start
### Install library
```bash
# npm
npm install @segment/analytics-node
# yarn
yarn add @segment/analytics-node
# pnpm
pnpm install @segment/analytics-node
```

## Basic Usage
### Usage (assuming some express-like web framework)
```ts
// analytics.ts
import { AnalyticsNode } from '@segment/analytics-node'

export const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })
const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })


// app.ts
import { analytics } from './analytics'
app.post('/login', (req, res) => {
analytics.identify({
userId: req.body.userId,
previousId: req.body.previousId
})
})

app.post('/cart', (req, res) => {
analytics.track({
userId: req.body.userId,
event: 'Add to cart',
properties: { productId: '123456' }
})
});
```

## Graceful Shutdown
### Avoid losing events on exit!
* Call `.closeAndFlush()` to stop collecting new events and flush all existing events.
* If a callback on an event call is included, this also waits for all callbacks to be called, and any of their subsequent promises to be resolved.
```ts
await analytics.closeAndFlush()
// or
await analytics.closeAndFlush({ timeout: 5000 }) // force resolve after 5000ms
```
### Graceful Shutdown: Advanced Example
```ts
import { AnalyticsNode } from '@segment/analytics-node'
import express from 'express'

const analytics = new AnalyticsNode({ writeKey: '<MY_WRITE_KEY>' })

analytics.identify('Test User', { loggedIn: true }, { userId: "123456" })
analytics.track('hello world', {}, { userId: "123456" })
const app = express()
app.post('/cart', (req, res) => {
analytics.track({
userId: req.body.userId,
event: 'Add to cart',
properties: { productId: '123456' }
})
});

const server = app.listen(3000)


const onExit = async () => {
console.log("Gracefully closing server...");
await analytics.closeAndFlush() // flush all existing events
server.close(() => process.exit());
};

process.on("SIGINT", onExit);
process.on("SIGTERM", onExit);
```

# Event Emitter (Advanced Usage)
#### Collecting unflushed events
If you absolutely need to preserve all possible events in the event of a forced timeout, even ones that came in after `analytics.closeAndFlush()` was called, you can collect those events.
```ts
const unflushedEvents = []

analytics.on('call_after_close', (event) => unflushedEvents.push(events))
await analytics.closeAndFlush()

console.log(unflushedEvents) // all events that came in after closeAndFlush was called

```


## Event Emitter
```ts
import { analytics } from './analytics'
import { ContextCancelation, CoreContext } from '@segment/analytics-node'
Expand All @@ -31,13 +98,12 @@ analytics.on('identify', (ctx) => console.log(ctx.event))

// listen for errors (if needed)
analytics.on('error', (err) => {
if (err instanceof ContextCancelation) {
console.error('event cancelled', err.logs())
} else if (err instanceof CoreContext) {
console.error('event failed', err.logs())
if (err.code === 'http_delivery') {
console.error(err.response)
} else {
console.error(err)
}
})
```


165 changes: 165 additions & 0 deletions packages/node/src/__tests__/graceful-shutdown-integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { createSuccess } from './test-helpers/factories'

const fetcher = jest.fn().mockReturnValue(createSuccess())
jest.mock('node-fetch', () => fetcher)

import { AnalyticsNode, NodeSegmentEvent } from '../app/analytics-node'
import { sleep } from './test-helpers/sleep'
import { CoreContext, CorePlugin } from '@segment/analytics-core'

const testPlugin: CorePlugin = {
type: 'after',
load: () => Promise.resolve(),
name: 'foo',
version: 'bar',
isLoaded: () => true,
}

describe('Ability for users to exit without losing events', () => {
let ajs!: AnalyticsNode
beforeEach(async () => {
jest.resetAllMocks()
ajs = new AnalyticsNode({
writeKey: 'abc123',
})
})
const _helpers = {
makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => {
analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb })
},
}

describe('drained emitted event', () => {
test('emits a drained event if only one event is dispatched', async () => {
_helpers.makeTrackCall()
return expect(
new Promise((resolve) => ajs.once('drained', () => resolve(undefined)))
).resolves.toBe(undefined)
})

test('emits a drained event if multiple events are dispatched', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
_helpers.makeTrackCall()
_helpers.makeTrackCall()
_helpers.makeTrackCall()
await sleep(200)
expect(drainedCalls).toBe(1)
})

test('all callbacks should be called ', async () => {
const cb = jest.fn()
ajs.track({ userId: 'foo', event: 'bar', callback: cb })
expect(cb).not.toHaveBeenCalled()
await ajs.closeAndFlush()
expect(cb).toBeCalled()
})

test('all async callbacks should be called', async () => {
const trackCall = new Promise<CoreContext>((resolve) =>
ajs.track({
userId: 'abc',
event: 'def',
callback: (ctx) => {
return sleep(100).then(() => resolve(ctx))
},
})
)
const res = await Promise.race([ajs.closeAndFlush(), trackCall])
expect(res instanceof CoreContext).toBe(true)
})
})

describe('.closeAndFlush()', () => {
test('should force resolve if method call execution time exceeds specified timeout', async () => {
const TIMEOUT = 300
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(1000)
return ctx
},
})
_helpers.makeTrackCall(ajs)
const startTime = Date.now()
await ajs.closeAndFlush({ timeout: TIMEOUT })
const elapsedTime = Math.round(Date.now() - startTime)
expect(elapsedTime).toBeLessThanOrEqual(TIMEOUT + 10)
expect(elapsedTime).toBeGreaterThan(TIMEOUT - 10)
})

test('no new events should be accepted (but existing ones should be flushed)', async () => {
let trackCallCount = 0
ajs.on('track', () => {
// track should only happen after successful dispatch
trackCallCount += 1
})
_helpers.makeTrackCall()
const closed = ajs.closeAndFlush()
_helpers.makeTrackCall() // should not trigger
_helpers.makeTrackCall() // should not trigger
await closed
expect(fetcher).toBeCalledTimes(1)
expect(trackCallCount).toBe(1)
})
test('any events created after close should be emitted', async () => {
const events: NodeSegmentEvent[] = []
ajs.on('call_after_close', (event) => {
events.push(event)
})
_helpers.makeTrackCall()
const closed = ajs.closeAndFlush()
_helpers.makeTrackCall() // should be emitted
_helpers.makeTrackCall() // should be emitted
expect(events.length).toBe(2)
expect(events.every((e) => e.type === 'track')).toBeTruthy()
await closed
})

test('if queue has multiple track events, all of those items should be dispatched, and drain and track events should be emitted', async () => {
let drainedCalls = 0
ajs.on('drained', () => {
drainedCalls++
})
let trackCalls = 0
ajs.on('track', () => {
trackCalls++
})
await ajs.register({
...testPlugin,
track: async (ctx) => {
await sleep(300)
return ctx
},
})
_helpers.makeTrackCall()
_helpers.makeTrackCall()

await ajs.closeAndFlush()

expect(fetcher.mock.calls.length).toBe(2)

expect(trackCalls).toBe(2)

expect(drainedCalls).toBe(1)
})

test('if no pending events, resolves immediately', async () => {
const startTime = Date.now()
await ajs.closeAndFlush()
const elapsedTime = startTime - Date.now()
expect(elapsedTime).toBeLessThan(20)
})

test('if no pending events, drained should not be emitted an extra time when close is called', async () => {
let called = false
ajs.on('drained', () => {
called = true
})
await ajs.closeAndFlush()
expect(called).toBeFalsy()
})
})
})
4 changes: 2 additions & 2 deletions packages/node/src/__tests__/http-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ describe('Analytics Node', () => {
})

test('Track: Fires http requests to the correct endoint', async () => {
ajs.track({ event: 'track', userId: 'foo' })
ajs.track({ event: 'track', userId: 'foo', properties: { foo: 'bar' } })
ajs.track({ event: 'foo', userId: 'foo' })
ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } })
await resolveCtx(ajs, 'track')
expect(fetcher).toHaveBeenCalledWith(
'https://api.segment.io/v1/track',
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/__tests__/test-helpers/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}
Loading