From ecb4b8db0194e06a3ee3c8cae57d4f327d15dc02 Mon Sep 17 00:00:00 2001 From: Seth Silesky <5115498+silesky@users.noreply.github.com> Date: Mon, 28 Nov 2022 12:59:45 -0600 Subject: [PATCH] Update cb behavior (#692) --- .changeset/cyan-rocks-happen.md | 5 + packages/core/src/analytics/dispatch-emit.ts | 34 ---- packages/core/src/emitter/interface.ts | 34 ++-- packages/core/src/index.ts | 1 - packages/node/README.md | 100 ++++------ packages/node/src/__tests__/callback.test.ts | 49 +++++ .../graceful-shutdown-integration.test.ts | 21 +- .../node/src/__tests__/integration.test.ts | 2 +- packages/node/src/app/analytics-node.ts | 181 +++++++++--------- packages/node/src/app/dispatch-emit.ts | 43 +++++ packages/node/src/app/emitter.ts | 14 ++ 11 files changed, 262 insertions(+), 222 deletions(-) create mode 100644 .changeset/cyan-rocks-happen.md delete mode 100644 packages/core/src/analytics/dispatch-emit.ts create mode 100644 packages/node/src/__tests__/callback.test.ts create mode 100644 packages/node/src/app/dispatch-emit.ts create mode 100644 packages/node/src/app/emitter.ts diff --git a/.changeset/cyan-rocks-happen.md b/.changeset/cyan-rocks-happen.md new file mode 100644 index 000000000..e2c0f93c1 --- /dev/null +++ b/.changeset/cyan-rocks-happen.md @@ -0,0 +1,5 @@ +--- +'@segment/analytics-core': patch +--- + +Move code out of core and into analytics-node. Tweak emitter error contract. diff --git a/packages/core/src/analytics/dispatch-emit.ts b/packages/core/src/analytics/dispatch-emit.ts deleted file mode 100644 index 7d91f4c17..000000000 --- a/packages/core/src/analytics/dispatch-emit.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { CoreContext } from '../context' -import { dispatch } from './dispatch' - -type DispatchAndEmitFn = ( - ...args: Parameters -) => Promise - -/* Dispatch function, but swallow promise rejections and use event emitter instead */ -export const dispatchAndEmit: DispatchAndEmitFn = async ( - event, - queue, - emitter, - options -) => { - try { - const ctx = await dispatch(event, queue, emitter, options) - if (ctx.failedDelivery()) { - emitter.emit('error', { - code: 'delivery_failure', - message: 'failed to deliver event', - ctx: ctx, - }) - } else { - emitter.emit(event.type, ctx) - return ctx - } - } catch (err) { - emitter.emit('error', { - code: 'unknown', - message: 'an unknown error occurred when dispatching an event.', - err: err, - }) - } -} diff --git a/packages/core/src/emitter/interface.ts b/packages/core/src/emitter/interface.ts index e256941aa..2c7b345b5 100644 --- a/packages/core/src/emitter/interface.ts +++ b/packages/core/src/emitter/interface.ts @@ -1,29 +1,23 @@ import { CoreContext } from '../context' -export type EmittedUnknownError = { - code: 'unknown' - message: string - ctx?: Ctx - err?: any -} - -export type EmittedDeliveryFailureError = { - code: 'delivery_failure' - message: string - ctx: Ctx - data?: any -} - /** - * Discriminated of all errors with a discriminant key of "code" + * This is the base contract for all emitted errors. This interface may be extended. */ -export type CoreEmittedError = - | EmittedUnknownError - | EmittedDeliveryFailureError +export interface CoreEmittedError { + /** + * e.g. 'delivery_failure' + */ + code: string + /** + * Why the error occurred. This can be an actual error object or a just a message. + */ + reason?: unknown + ctx?: Ctx +} export type CoreEmitterContract< Ctx extends CoreContext, - AdditionalErrors = CoreEmittedError + Err extends CoreEmittedError = CoreEmittedError > = { alias: [ctx: Ctx] track: [ctx: Ctx] @@ -33,5 +27,5 @@ export type CoreEmitterContract< group: [ctx: Ctx] register: [pluginNames: string[]] deregister: [pluginNames: string[]] - error: [CoreEmittedError | AdditionalErrors] + error: [error: Err] } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index d91feb256..c743819e5 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -10,7 +10,6 @@ export * from './context' export * from './queue/event-queue' export * from './analytics' export * from './analytics/dispatch' -export * from './analytics/dispatch-emit' export * from './validation/helpers' export * from './validation/assertions' export * from './utils/bind-all' diff --git a/packages/node/README.md b/packages/node/README.md index 4f831121e..4b4025ec0 100644 --- a/packages/node/README.md +++ b/packages/node/README.md @@ -36,7 +36,7 @@ app.post('/cart', (req, res) => { event: 'Add to cart', properties: { productId: '123456' } }) - res.sendStatus(200) + res.sendStatus(201) }); ``` ## Regional configuration @@ -48,8 +48,9 @@ Dublin — events.eu1.segmentapis.com An example of setting the host to the EU endpoint using the Node library would be: ```ts -const analytics = new Analytics('YOUR_WRITE_KEY', { - host: "https://events.eu1.segmentapis.com" +const analytics = new Analytics({ + ... + host: "https://events.eu1.segmentapis.com" }); ``` @@ -66,7 +67,6 @@ const analytics = new Analytics({ flushInterval: 10000, // ... and more! }) - ``` ## Batching @@ -84,16 +84,22 @@ There is a maximum of 500KB per batch request and 32KB per call. If you don’t want to batch messages, you can turn batching off by setting the `maxEventsInBatch` setting to 1, like so: ```ts -const analytics = new Analytics({ '', { maxEventsInBatch: 1 }); +const analytics = new Analytics({ + ... + maxEventsInBatch: 1 +}); ``` Batching means that your message might not get sent right away. But every method call takes an optional callback, which you can use to know when a particular message is flushed from the queue, like so: ```ts analytics.track({ - userId: '019mr8mf4r', - event: 'Ultimate Played' - callback: (ctx) => console.log(ctx) -}) + userId: '019mr8mf4r', + event: 'Ultimate Played', + }, + (err, ctx) => { + ... + } +) ``` ## Error Handling Subscribe and log all event delivery errors. @@ -143,7 +149,6 @@ const onExit = async () => { } ['SIGINT', 'SIGTERM'].forEach((code) => process.on(code, onExit)) - ``` #### Collecting unflushed events @@ -175,8 +180,8 @@ Different parts of your application may require different types of batching, or ```ts import { Analytics } from '@segment/analytics-node' -const marketingAnalytics = new Analytics('MARKETING_WRITE_KEY'); -const appAnalytics = new Analytics('APP_WRITE_KEY'); +const marketingAnalytics = new Analytics({ writeKey: 'MARKETING_WRITE_KEY' }); +const appAnalytics = new Analytics({ writeKey: 'APP_WRITE_KEY' }); ``` ## Troubleshooting @@ -184,7 +189,7 @@ const appAnalytics = new Analytics('APP_WRITE_KEY'); 2. Make sure that you’re calling a Segment API method once the library is successfully installed: identify, track, etc. -3. Log events and errors the event emitter: +3. Log events. ```js ['initialize', 'call_after_close', 'screen', 'identify', 'group', @@ -194,6 +199,20 @@ const appAnalytics = new Analytics('APP_WRITE_KEY'); ``` +## Development: Disabling Analytics for Tests +- If you want to intercept / disable analytics for integration tests, you can use something like [nock](https://github.com/nock/nock) + +```ts +// Note: nock will _not_ work if polyfill fetch with something like undici, as nock uses the http module. Undici has its own interception method. +import nock from 'nock' + +nock('https://api.segment.io') + .post('/v1/batch') + .reply(201) + .persist() +``` + + ## Differences from legacy analytics-node / Migration Guide @@ -206,15 +225,13 @@ import Analytics from 'analytics-node' import { Analytics } from '@segment/analytics-next' ``` -- Instantiation requires an object +- Instantiation now requires an _object_ as the first argument. ```ts // old +var analytics = new Analytics('YOUR_WRITE_KEY'); // not supported -var analytics = new Analytics('YOUR_WRITE_KEY'); - -// new -const analytics = new Analytics({ writeKey: 'YOUR_WRITE_KEY' }); - +// new! +const analytics = new Analytics({ writeKey: '' }) ``` - Graceful shutdown (See Graceful Shutdown section) ```ts @@ -232,49 +249,10 @@ Other Differences: - The `enable` configuration option has been removed-- see "Disabling Analytics" section - the `errorHandler` configuration option has been remove -- see "Error Handling" section - `flushAt` configuration option -> `maxEventsInBatch`. -- `callback` option is moved to configuration +- `callback` call signature is different ```ts // old -analytics.track({ - userId: '019mr8mf4r', - event: 'Ultimate Played' -}), function(err, batch){ - if (err) { - console.error(err) - } -}); - +(err, batch) => void // new -analytics.track({ - userId: '019mr8mf4r', - event: 'Ultimate Played', - callback: (ctx) => { - if (ctx.failedDelivery()) { - console.error(ctx) - } - } -}) - -``` - - -## Development / Disabling Analytics -- If you want to disable analytics for unit tests, you can use something like [nock](https://github.com/nock/nock) or [jest mocks](https://jestjs.io/docs/manual-mocks). - -You should prefer mocking. However, if you need to intercept the request, you can do: - -```ts - // Note: nock will _not_ work if polyfill fetch with something like undici, as nock uses the http module. Undici has its own interception method. - import nock from 'nock' - - const mockApiHost = 'https://foo.bar' - const mockPath = '/foo' - - nock(mockApiHost) // using regex matching in nock changes the perf profile quite a bit - .post(mockPath, (body) => true) - .reply(201) - .persist() - -const analytics = new Analytics({ host: mockApiHost, path: mockPath }) - +(err, ctx) => void ``` diff --git a/packages/node/src/__tests__/callback.test.ts b/packages/node/src/__tests__/callback.test.ts new file mode 100644 index 000000000..3ab61c7cd --- /dev/null +++ b/packages/node/src/__tests__/callback.test.ts @@ -0,0 +1,49 @@ +const fetcher = jest.fn() +jest.mock('../lib/fetch', () => ({ fetch: fetcher })) + +import { createError, createSuccess } from './test-helpers/factories' +import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { Context } from '../app/analytics-node' + +describe('Callback behavior', () => { + beforeEach(() => { + fetcher.mockReturnValue(createSuccess()) + }) + + it('should handle success', async () => { + const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const ctx = await new Promise((resolve, reject) => + ajs.track( + { + anonymousId: 'bar', + event: 'event name', + }, + (err, ctx) => { + if (err) reject('test fail') + resolve(ctx!) + } + ) + ) + expect(ctx.event.event).toBe('event name') + expect(ctx.event.anonymousId).toBe('bar') + }) + + it('should handle errors', async () => { + fetcher.mockReturnValue(createError()) + const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const [err, ctx] = await new Promise<[any, Context]>((resolve) => + ajs.track( + { + anonymousId: 'bar', + event: 'event name', + }, + (err, ctx) => { + resolve([err!, ctx!]) + } + ) + ) + expect(ctx.event.event).toBe('event name') + expect(ctx.event.anonymousId).toBe('bar') + expect(err).toEqual(new Error('[404] Not Found')) + }) +}) diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts index f7b909c6c..637c7ad16 100644 --- a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -26,7 +26,7 @@ describe('Ability for users to exit without losing events', () => { }) const _helpers = { makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => { - analytics.track({ userId: 'foo', event: 'Thing Updated', callback: cb }) + analytics.track({ userId: 'foo', event: 'Thing Updated' }, cb) }, } @@ -52,22 +52,23 @@ describe('Ability for users to exit without losing events', () => { test('all callbacks should be called ', async () => { const cb = jest.fn() - ajs.track({ userId: 'foo', event: 'bar', callback: cb }) + ajs.track({ userId: 'foo', event: 'bar' }, cb) expect(cb).not.toHaveBeenCalled() await ajs.closeAndFlush() expect(cb).toBeCalled() }) test('all async callbacks should be called', async () => { - const trackCall = new Promise((resolve) => - ajs.track({ - userId: 'abc', - event: 'def', - callback: (ctx) => { - return sleep(100).then(() => resolve(ctx)) + const trackCall = new Promise((resolve) => { + ajs.track( + { + userId: 'abc', + event: 'def', }, - }) - ) + (_, ctx) => sleep(200).then(() => resolve(ctx!)) + ) + }) + const res = await Promise.race([ajs.closeAndFlush(), trackCall]) expect(res instanceof CoreContext).toBe(true) }) diff --git a/packages/node/src/__tests__/integration.test.ts b/packages/node/src/__tests__/integration.test.ts index 6c8085a2f..9d1bf08af 100644 --- a/packages/node/src/__tests__/integration.test.ts +++ b/packages/node/src/__tests__/integration.test.ts @@ -62,7 +62,7 @@ describe('Error handling', () => { await promise throw new Error('fail') } catch (err: any) { - expect(err.message).toMatch(/fail/) + expect(err.reason).toEqual(new Error('[503] Service Unavailable')) expect(err.code).toMatch(/delivery_failure/) } }) diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index a777f91a3..b25980d02 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -1,25 +1,23 @@ import { EventProperties, Traits, - Emitter, CoreAnalytics, CoreContext, CorePlugin, EventFactory, EventQueue, - dispatchAndEmit, CoreOptions, - Callback, CoreSegmentEvent, bindAll, PriorityQueue, - CoreEmitterContract, pTimeout, } from '@segment/analytics-core' import { AnalyticsSettings, validateSettings } from './settings' import { version } from '../../package.json' import { configureNodePlugin } from '../plugins/segmentio' import { createNodeEventFactory } from '../lib/create-node-event-factory' +import { Callback, dispatchAndEmit } from './dispatch-emit' +import { NodeEmitter } from './emitter' // create a derived class since we may want to add node specific things to Context later export class Context extends CoreContext {} @@ -37,15 +35,6 @@ export interface SegmentEventOptions { timestamp?: CoreOptions['timestamp'] } -/** - * Map of emitter event names to method args. - */ -type NodeEmitterEvents = CoreEmitterContract & { - initialize: [AnalyticsSettings] - call_after_close: [SegmentEvent] // any event that did not get dispatched due to close - drained: [] -} - class NodePriorityQueue extends PriorityQueue { constructor() { super(1, []) @@ -67,10 +56,7 @@ export interface SegmentEvent extends CoreSegmentEvent { options?: SegmentEventOptions } -export class Analytics - extends Emitter - implements CoreAnalytics -{ +export class Analytics extends NodeEmitter implements CoreAnalytics { private _eventFactory: EventFactory private _isClosed = false private _pendingEvents = 0 @@ -141,9 +127,7 @@ export class Analytics this._pendingEvents++ - dispatchAndEmit(segmentEvent, this.queue, this, { - callback: callback, - }) + dispatchAndEmit(segmentEvent, this.queue, this, callback) .catch((ctx) => ctx) .finally(() => { this._pendingEvents-- @@ -158,19 +142,20 @@ export class Analytics * Combines two unassociated user identities. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#alias */ - alias({ - userId, - previousId, - options, - callback, - }: { - /* The new user id you want to associate with the user. */ - userId: string - /* The previous id that the user was recognized by (this can be either a userId or an anonymousId). */ - previousId: string - options?: SegmentEventOptions + alias( + { + userId, + previousId, + options, + }: { + /* The new user id you want to associate with the user. */ + userId: string + /* The previous id that the user was recognized by (this can be either a userId or an anonymousId). */ + previousId: string + options?: SegmentEventOptions + }, callback?: Callback - }): void { + ): void { const segmentEvent = this._eventFactory.alias(userId, previousId, options) this._dispatch(segmentEvent, callback) } @@ -179,19 +164,20 @@ export class Analytics * Associates an identified user with a collective. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#group */ - group({ - groupId, - userId, - anonymousId, - traits = {}, - options = {}, - callback, - }: IdentityOptions & { - groupId: string - traits?: Traits - options?: SegmentEventOptions + group( + { + groupId, + userId, + anonymousId, + traits = {}, + options = {}, + }: IdentityOptions & { + groupId: string + traits?: Traits + options?: SegmentEventOptions + }, callback?: Callback - }): void { + ): void { const segmentEvent = this._eventFactory.group(groupId, traits, { ...options, anonymousId, @@ -205,17 +191,18 @@ export class Analytics * Includes a unique userId and (maybe anonymousId) and any optional traits you know about them. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#identify */ - identify({ - userId, - anonymousId, - traits = {}, - options, - callback, - }: IdentityOptions & { - traits?: Traits - options?: SegmentEventOptions + identify( + { + userId, + anonymousId, + traits = {}, + options, + }: IdentityOptions & { + traits?: Traits + options?: SegmentEventOptions + }, callback?: Callback - }): void { + ): void { const segmentEvent = this._eventFactory.identify(userId, traits, { ...options, anonymousId, @@ -228,26 +215,27 @@ export class Analytics * The page method lets you record page views on your website, along with optional extra information about the page being viewed. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#page */ - page({ - userId, - anonymousId, - category, - name, - properties, - options, - timestamp, - callback, - }: IdentityOptions & { - /* The category of the page. Useful for cases like ecommerce where many pages might live under a single category. */ - category?: string - /* The name of the page.*/ - name?: string - /* A dictionary of properties of the page. */ - properties?: EventProperties + page( + { + userId, + anonymousId, + category, + name, + properties, + options, + timestamp, + }: IdentityOptions & { + /* The category of the page. Useful for cases like ecommerce where many pages might live under a single category. */ + category?: string + /* The name of the page.*/ + name?: string + /* A dictionary of properties of the page. */ + properties?: EventProperties + timestamp?: string | Date + options?: SegmentEventOptions + }, callback?: Callback - timestamp?: string | Date - options?: SegmentEventOptions - }): void { + ): void { const segmentEvent = this._eventFactory.page( category ?? null, name ?? null, @@ -263,16 +251,18 @@ export class Analytics * * TODO: This is not documented on the segment docs ATM (for node). */ - screen({ - userId, - anonymousId, - category, - name, - properties, - options, - callback, - timestamp, - }: Parameters[0]): void { + screen( + { + userId, + anonymousId, + category, + name, + properties, + options, + timestamp, + }: Parameters[0], + callback?: Callback + ): void { const segmentEvent = this._eventFactory.screen( category ?? null, name ?? null, @@ -287,19 +277,20 @@ export class Analytics * Records actions your users perform. * @link https://segment.com/docs/connections/sources/catalog/libraries/server/node/#track */ - track({ - userId, - anonymousId, - event, - properties, - options, - callback, - }: IdentityOptions & { - event: string - properties?: EventProperties - options?: SegmentEventOptions + track( + { + userId, + anonymousId, + event, + properties, + options, + }: IdentityOptions & { + event: string + properties?: EventProperties + options?: SegmentEventOptions + }, callback?: Callback - }): void { + ): void { const segmentEvent = this._eventFactory.track(event, properties, { ...options, userId, diff --git a/packages/node/src/app/dispatch-emit.ts b/packages/node/src/app/dispatch-emit.ts new file mode 100644 index 000000000..45665f239 --- /dev/null +++ b/packages/node/src/app/dispatch-emit.ts @@ -0,0 +1,43 @@ +import { dispatch } from '@segment/analytics-core' +import type { + CoreContext, + CoreSegmentEvent, + EventQueue, +} from '@segment/analytics-core' +import type { NodeEmitter } from './emitter' + +export type Callback = (err?: unknown, ctx?: CoreContext) => void + +const normalizeDispatchCb = (cb: Callback) => (ctx: CoreContext) => { + const failedDelivery = ctx.failedDelivery() + return failedDelivery ? cb(failedDelivery.reason, ctx) : cb(undefined, ctx) +} + +/* Dispatch function, but swallow promise rejections and use event emitter instead */ +export const dispatchAndEmit = async ( + event: CoreSegmentEvent, + queue: EventQueue, + emitter: NodeEmitter, + callback?: Callback +): Promise => { + try { + const ctx = await dispatch(event, queue, emitter, { + ...(callback ? { callback: normalizeDispatchCb(callback) } : {}), + }) + const failedDelivery = ctx.failedDelivery() + if (failedDelivery) { + emitter.emit('error', { + code: 'delivery_failure', + reason: failedDelivery.reason, + ctx: ctx, + }) + } else { + emitter.emit(event.type, ctx) + } + } catch (err) { + emitter.emit('error', { + code: 'unknown', + reason: err, + }) + } +} diff --git a/packages/node/src/app/emitter.ts b/packages/node/src/app/emitter.ts new file mode 100644 index 000000000..15c8b4704 --- /dev/null +++ b/packages/node/src/app/emitter.ts @@ -0,0 +1,14 @@ +import { CoreEmitterContract, Emitter } from '@segment/analytics-core' +import type { SegmentEvent, Context } from './analytics-node' +import type { AnalyticsSettings } from './settings' + +/** + * Map of emitter event names to method args. + */ +type NodeEmitterEvents = CoreEmitterContract & { + initialize: [AnalyticsSettings] + call_after_close: [SegmentEvent] // any event that did not get dispatched due to close + drained: [] +} + +export class NodeEmitter extends Emitter {}