From 5f5036332a3b21d5eb5324c2ed332190b42b2318 Mon Sep 17 00:00:00 2001 From: Seth Silesky <5115498+silesky@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:23:12 -0500 Subject: [PATCH] Add ability to pass custom http client into node (#880) --- .changeset/nervous-chefs-talk.md | 5 + packages/node/package.json | 3 +- packages/node/src/__tests__/callback.test.ts | 20 ++- .../src/__tests__/disable.integration.test.ts | 25 ++-- .../src/__tests__/emitter.integration.test.ts | 21 +-- .../graceful-shutdown-integration.test.ts | 29 ++-- .../__tests__/http-client.integration.test.ts | 69 +++++++++ .../src/__tests__/http-integration.test.ts | 13 +- .../node/src/__tests__/integration.test.ts | 30 ++-- packages/node/src/__tests__/plugins.test.ts | 8 -- .../assert-shape/http-request-event.ts | 2 +- .../test-helpers/assert-shape/index.ts | 1 + .../assert-shape/segment-http-api.ts | 43 ++++++ .../test-helpers/create-test-analytics.ts | 38 ++++- packages/node/src/__tests__/typedef-tests.ts | 63 ++++++++- packages/node/src/app/analytics-node.ts | 5 + packages/node/src/app/emitter.ts | 2 +- packages/node/src/app/settings.ts | 8 +- packages/node/src/index.ts | 9 ++ packages/node/src/lib/__tests__/abort.test.ts | 4 +- packages/node/src/lib/abort.ts | 2 +- packages/node/src/lib/fetch.ts | 10 +- packages/node/src/lib/http-client.ts | 95 +++++++++++++ .../segmentio/__tests__/methods.test.ts | 132 ++++++++---------- .../segmentio/__tests__/publisher.test.ts | 84 ++++------- .../test-helpers/segment-http-api.ts | 41 ------ .../node/src/plugins/segmentio/publisher.ts | 41 +++--- yarn.lock | 16 ++- 28 files changed, 543 insertions(+), 276 deletions(-) create mode 100644 .changeset/nervous-chefs-talk.md create mode 100644 packages/node/src/__tests__/http-client.integration.test.ts create mode 100644 packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts create mode 100644 packages/node/src/lib/http-client.ts delete mode 100644 packages/node/src/plugins/segmentio/__tests__/test-helpers/segment-http-api.ts diff --git a/.changeset/nervous-chefs-talk.md b/.changeset/nervous-chefs-talk.md new file mode 100644 index 000000000..0a410b893 --- /dev/null +++ b/.changeset/nervous-chefs-talk.md @@ -0,0 +1,5 @@ +--- +'@segment/analytics-node': minor +--- + +Add `httpClient` setting. This allow users to override default HTTP client with a custom one. diff --git a/packages/node/package.json b/packages/node/package.json index 47d411b5c..3bebc6d96 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -42,7 +42,8 @@ }, "devDependencies": { "@internal/config": "0.0.0", - "@types/node": "^16" + "@types/node": "^16", + "axios": "^1.4.0" }, "packageManager": "yarn@3.4.1" } diff --git a/packages/node/src/__tests__/callback.test.ts b/packages/node/src/__tests__/callback.test.ts index 426171ec8..c9aa6d80f 100644 --- a/packages/node/src/__tests__/callback.test.ts +++ b/packages/node/src/__tests__/callback.test.ts @@ -1,17 +1,11 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createError, createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' import { Context } from '../app/context' describe('Callback behavior', () => { - beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) - }) - it('should handle success', async () => { - const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const ajs = createTestAnalytics({ + maxEventsInBatch: 1, + }) const ctx = await new Promise((resolve, reject) => ajs.track( { @@ -29,8 +23,12 @@ describe('Callback behavior', () => { }) it('should handle errors', async () => { - fetcher.mockReturnValue(createError()) - const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const ajs = createTestAnalytics( + { + maxEventsInBatch: 1, + }, + { withError: true } + ) const [err, ctx] = await new Promise<[any, Context]>((resolve) => ajs.track( { diff --git a/packages/node/src/__tests__/disable.integration.test.ts b/packages/node/src/__tests__/disable.integration.test.ts index 730170069..c5d1dba41 100644 --- a/packages/node/src/__tests__/disable.integration.test.ts +++ b/packages/node/src/__tests__/disable.integration.test.ts @@ -1,10 +1,13 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { + createTestAnalytics, + TestFetchClient, +} from './test-helpers/create-test-analytics' describe('disable', () => { - it('should dispatch callbacks and emit an http request, even if disabled', async () => { + const httpClient = new TestFetchClient() + const makeReqSpy = jest.spyOn(httpClient, 'makeRequest') + + it('should not emit an http request if disabled', async () => { const analytics = createTestAnalytics({ disable: true, }) @@ -13,25 +16,27 @@ describe('disable', () => { await new Promise((resolve) => analytics.track({ anonymousId: 'foo', event: 'bar' }, resolve) ) - expect(emitterCb).toBeCalledTimes(1) + expect(emitterCb).not.toBeCalled() }) - it('should call fetch if disabled is false', async () => { + it('should call .send if disabled is false', async () => { const analytics = createTestAnalytics({ disable: false, + httpClient: httpClient, }) await new Promise((resolve) => analytics.track({ anonymousId: 'foo', event: 'bar' }, resolve) ) - expect(fetcher).toBeCalled() + expect(makeReqSpy).toBeCalledTimes(1) }) - it('should not call fetch if disabled is true', async () => { + it('should not call .send if disabled is true', async () => { const analytics = createTestAnalytics({ disable: true, + httpClient: httpClient, }) await new Promise((resolve) => analytics.track({ anonymousId: 'foo', event: 'bar' }, resolve) ) - expect(fetcher).not.toBeCalled() + expect(makeReqSpy).not.toBeCalled() }) }) diff --git a/packages/node/src/__tests__/emitter.integration.test.ts b/packages/node/src/__tests__/emitter.integration.test.ts index 0282e29d8..3f86e59ac 100644 --- a/packages/node/src/__tests__/emitter.integration.test.ts +++ b/packages/node/src/__tests__/emitter.integration.test.ts @@ -1,13 +1,8 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createError, createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' import { assertHttpRequestEmittedEvent } from './test-helpers/assert-shape' describe('http_request', () => { it('emits an http_request event if success', async () => { - fetcher.mockReturnValue(createSuccess()) const analytics = createTestAnalytics() const fn = jest.fn() analytics.on('http_request', fn) @@ -19,8 +14,12 @@ describe('http_request', () => { }) it('emits an http_request event if error', async () => { - fetcher.mockReturnValue(createError()) - const analytics = createTestAnalytics({ maxRetries: 0 }) + const analytics = createTestAnalytics( + { + maxRetries: 0, + }, + { withError: true } + ) const fn = jest.fn() analytics.on('http_request', fn) await new Promise((resolve) => @@ -30,8 +29,12 @@ describe('http_request', () => { }) it('if error, emits an http_request event on every retry', async () => { - fetcher.mockReturnValue(createError()) - const analytics = createTestAnalytics({ maxRetries: 2 }) + const analytics = createTestAnalytics( + { + maxRetries: 2, + }, + { withError: true } + ) const fn = jest.fn() analytics.on('http_request', fn) await new Promise((resolve) => diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts index bd942ab3a..e82889c99 100644 --- a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -1,9 +1,5 @@ -import { createSuccess } from './test-helpers/factories' +import { TestFetchClient } from './test-helpers/create-test-analytics' import { performance as perf } from 'perf_hooks' - -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - import { Analytics } from '../app/analytics-node' import { sleep } from './test-helpers/sleep' import { Plugin, SegmentEvent } from '../app/types' @@ -17,22 +13,26 @@ const testPlugin: Plugin = { isLoaded: () => true, } +let testClient: TestFetchClient + describe('Ability for users to exit without losing events', () => { let ajs!: Analytics + testClient = new TestFetchClient() + const makeReqSpy = jest.spyOn(testClient, 'makeRequest') beforeEach(async () => { - fetcher.mockReturnValue(createSuccess()) ajs = new Analytics({ writeKey: 'abc123', maxEventsInBatch: 1, + httpClient: testClient, }) }) const _helpers = { - getFetchCalls: (mockedFetchFn = fetcher) => - mockedFetchFn.mock.calls.map(([url, request]) => ({ + getFetchCalls: () => + makeReqSpy.mock.calls.map(([{ url, method, data, headers }]) => ({ url, - method: request.method, - headers: request.headers, - body: JSON.parse(request.body), + method, + headers, + data, })), makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => { analytics.track({ userId: 'foo', event: 'Thing Updated' }, cb) @@ -89,6 +89,7 @@ describe('Ability for users to exit without losing events', () => { ajs = new Analytics({ writeKey: 'abc123', flushInterval, + httpClient: testClient, }) const closeAndFlushTimeout = ajs['_closeAndFlushDefaultTimeout'] expect(closeAndFlushTimeout).toBe(flushInterval * 1.25) @@ -190,6 +191,7 @@ describe('Ability for users to exit without losing events', () => { writeKey: 'foo', flushInterval: 10000, maxEventsInBatch: 15, + httpClient: testClient, }) _helpers.makeTrackCall(analytics) _helpers.makeTrackCall(analytics) @@ -204,7 +206,7 @@ describe('Ability for users to exit without losing events', () => { expect(elapsedTime).toBeLessThan(100) const calls = _helpers.getFetchCalls() expect(calls.length).toBe(1) - expect(calls[0].body.batch.length).toBe(2) + expect(calls[0].data.batch.length).toBe(2) }) test('should wait to flush if close is called and an event has not made it to the segment.io plugin yet', async () => { @@ -220,6 +222,7 @@ describe('Ability for users to exit without losing events', () => { writeKey: 'foo', flushInterval: 10000, maxEventsInBatch: 15, + httpClient: testClient, }) await analytics.register(_testPlugin) _helpers.makeTrackCall(analytics) @@ -235,7 +238,7 @@ describe('Ability for users to exit without losing events', () => { expect(elapsedTime).toBeLessThan(TRACK_DELAY * 2) const calls = _helpers.getFetchCalls() expect(calls.length).toBe(1) - expect(calls[0].body.batch.length).toBe(2) + expect(calls[0].data.batch.length).toBe(2) }) }) }) diff --git a/packages/node/src/__tests__/http-client.integration.test.ts b/packages/node/src/__tests__/http-client.integration.test.ts new file mode 100644 index 000000000..d84562fd2 --- /dev/null +++ b/packages/node/src/__tests__/http-client.integration.test.ts @@ -0,0 +1,69 @@ +import { FetchHTTPClient, HTTPFetchFn } from '..' +import { AbortSignal as AbortSignalShim } from '../lib/abort' +import { httpClientOptionsBodyMatcher } from './test-helpers/assert-shape/segment-http-api' +import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { createSuccess } from './test-helpers/factories' + +const testFetch: jest.MockedFn = jest + .fn() + .mockResolvedValue(createSuccess()) + +let analytics: ReturnType + +const helpers = { + makeTrackCall: () => + new Promise((resolve) => + analytics.track({ event: 'foo', userId: 'bar' }, resolve) + ), + assertFetchCallRequest: ( + ...[url, options]: typeof testFetch['mock']['lastCall'] + ) => { + expect(url).toBe('https://api.segment.io/v1/batch') + expect(options.headers).toEqual({ + Authorization: 'Basic Zm9vOg==', + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-node-next/latest', + }) + expect(options.method).toBe('POST') + const getLastBatch = (): object[] => { + const [, options] = testFetch.mock.lastCall + const batch = JSON.parse(options.body!).batch + return batch + } + const batch = getLastBatch() + expect(batch.length).toBe(1) + expect(batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, + timestamp: expect.stringMatching( + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/ + ), + properties: {}, + event: 'foo', + type: 'track', + userId: 'bar', + }) + // @ts-ignore + expect(options.signal).toBeInstanceOf( + typeof AbortSignal !== 'undefined' ? AbortSignal : AbortSignalShim + ) + }, +} + +describe('httpClient option', () => { + it('can be a regular custom HTTP client', async () => { + analytics = createTestAnalytics({ + httpClient: new FetchHTTPClient(testFetch), + }) + expect(testFetch).toHaveBeenCalledTimes(0) + await helpers.makeTrackCall() + expect(testFetch).toHaveBeenCalledTimes(1) + helpers.assertFetchCallRequest(...testFetch.mock.lastCall) + }) + it('can be a simple function that matches the fetch interface', async () => { + analytics = createTestAnalytics({ httpClient: testFetch }) + expect(testFetch).toHaveBeenCalledTimes(0) + await helpers.makeTrackCall() + expect(testFetch).toHaveBeenCalledTimes(1) + helpers.assertFetchCallRequest(...testFetch.mock.lastCall) + }) +}) diff --git a/packages/node/src/__tests__/http-integration.test.ts b/packages/node/src/__tests__/http-integration.test.ts index e23e360dd..f52884dec 100644 --- a/packages/node/src/__tests__/http-integration.test.ts +++ b/packages/node/src/__tests__/http-integration.test.ts @@ -33,7 +33,7 @@ describe('Method Smoke Tests', () => { let scope: nock.Scope let ajs: Analytics beforeEach(async () => { - ajs = createTestAnalytics() + ajs = createTestAnalytics({}, { useRealHTTPClient: true }) }) describe('Metadata', () => { @@ -333,10 +333,13 @@ describe('Client: requestTimeout', () => { }) it('should timeout immediately if request timeout is set to 0', async () => { jest.useRealTimers() - const ajs = createTestAnalytics({ - maxEventsInBatch: 1, - httpRequestTimeout: 0, - }) + const ajs = createTestAnalytics( + { + maxEventsInBatch: 1, + httpRequestTimeout: 0, + }, + { useRealHTTPClient: true } + ) ajs.track({ event: 'foo', userId: 'foo', properties: { hello: 'world' } }) try { await resolveCtx(ajs, 'track') diff --git a/packages/node/src/__tests__/integration.test.ts b/packages/node/src/__tests__/integration.test.ts index 9c5ef61d0..1169dab0b 100644 --- a/packages/node/src/__tests__/integration.test.ts +++ b/packages/node/src/__tests__/integration.test.ts @@ -1,19 +1,18 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - import { Plugin } from '../app/types' import { resolveCtx } from './test-helpers/resolve-ctx' import { testPlugin } from './test-helpers/test-plugin' -import { createSuccess, createError } from './test-helpers/factories' -import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { createError } from './test-helpers/factories' +import { + createTestAnalytics, + TestFetchClient, +} from './test-helpers/create-test-analytics' const writeKey = 'foo' jest.setTimeout(10000) const timestamp = new Date() -beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) -}) +const testClient = new TestFetchClient() +const makeReqSpy = jest.spyOn(testClient, 'makeRequest') describe('Settings / Configuration Init', () => { it('throws if no writeKey', () => { @@ -28,11 +27,12 @@ describe('Settings / Configuration Init', () => { const analytics = createTestAnalytics({ host: 'http://foo.com', path: '/bar', + httpClient: testClient, }) const track = resolveCtx(analytics, 'track') analytics.track({ event: 'foo', userId: 'sup' }) await track - expect(fetcher.mock.calls[0][0]).toBe('http://foo.com/bar') + expect(makeReqSpy.mock.calls[0][0].url).toBe('http://foo.com/bar') }) it('throws if host / path is bad', async () => { @@ -53,10 +53,14 @@ describe('Error handling', () => { }) it('should emit on an error', async () => { - const analytics = createTestAnalytics({ maxRetries: 0 }) - fetcher.mockReturnValue( - createError({ statusText: 'Service Unavailable', status: 503 }) - ) + const err = createError({ + statusText: 'Service Unavailable', + status: 503, + }) + const analytics = createTestAnalytics({ + maxRetries: 0, + httpClient: new TestFetchClient({ response: err }), + }) try { const promise = resolveCtx(analytics, 'track') analytics.track({ event: 'foo', userId: 'sup' }) diff --git a/packages/node/src/__tests__/plugins.test.ts b/packages/node/src/__tests__/plugins.test.ts index 4efa6b95e..18a3ef8c9 100644 --- a/packages/node/src/__tests__/plugins.test.ts +++ b/packages/node/src/__tests__/plugins.test.ts @@ -1,14 +1,6 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' describe('Plugins', () => { - beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) - }) - describe('Initialize', () => { it('loads analytics-node-next plugin', async () => { const analytics = createTestAnalytics() diff --git a/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts b/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts index 7c0b733ce..684a32dc2 100644 --- a/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts +++ b/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts @@ -4,7 +4,7 @@ type HttpRequestEmitterEvent = NodeEmitterEvents['http_request'][0] export const assertHttpRequestEmittedEvent = ( event: HttpRequestEmitterEvent ) => { - const body = JSON.parse(event.body) + const body = event.body expect(Array.isArray(body.batch)).toBeTruthy() expect(body.batch.length).toBe(1) expect(typeof event.headers).toBe('object') diff --git a/packages/node/src/__tests__/test-helpers/assert-shape/index.ts b/packages/node/src/__tests__/test-helpers/assert-shape/index.ts index 91df20a72..0ae1c3172 100644 --- a/packages/node/src/__tests__/test-helpers/assert-shape/index.ts +++ b/packages/node/src/__tests__/test-helpers/assert-shape/index.ts @@ -1 +1,2 @@ export * from './http-request-event' +export * from './segment-http-api' diff --git a/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts b/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts new file mode 100644 index 000000000..4c6a02743 --- /dev/null +++ b/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts @@ -0,0 +1,43 @@ +import { Context } from '../../../app/context' +import { HTTPClientRequest } from '../../../lib/http-client' + +/** + * These map to the data properties of the HTTPClient options (the input value of 'makeRequest') + */ +export const httpClientOptionsBodyMatcher = { + messageId: expect.stringMatching(/^node-next-\d*-\w*-\w*-\w*-\w*-\w*/), + timestamp: expect.any(Date), + _metadata: expect.any(Object), + context: { + library: { + name: '@segment/analytics-node', + version: expect.any(String), + }, + }, + integrations: {}, +} + +export function assertHTTPRequestOptions( + { data, headers, method, url }: HTTPClientRequest, + contexts: Context[] +) { + expect(url).toBe('https://api.segment.io/v1/batch') + expect(method).toBe('POST') + expect(headers).toMatchInlineSnapshot(` + Object { + "Authorization": "Basic Og==", + "Content-Type": "application/json", + "User-Agent": "analytics-node-next/latest", + } + `) + + expect(data.batch).toHaveLength(contexts.length) + let idx = 0 + for (const context of contexts) { + expect(data.batch[idx]).toEqual({ + ...context.event, + ...httpClientOptionsBodyMatcher, + }) + idx += 1 + } +} diff --git a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts index e97e35f5a..6d91535a2 100644 --- a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts +++ b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts @@ -1,8 +1,42 @@ import { Analytics } from '../../app/analytics-node' import { AnalyticsSettings } from '../../app/settings' +import { FetchHTTPClient, HTTPFetchFn } from '../../lib/http-client' +import { createError, createSuccess } from './factories' export const createTestAnalytics = ( - settings: Partial = {} + settings: Partial = {}, + { + withError, + useRealHTTPClient, + }: TestFetchClientOptions & { useRealHTTPClient?: boolean } = {} ) => { - return new Analytics({ writeKey: 'foo', flushInterval: 100, ...settings }) + return new Analytics({ + writeKey: 'foo', + flushInterval: 100, + ...(useRealHTTPClient + ? {} + : { httpClient: new TestFetchClient({ withError }) }), + ...settings, + }) +} + +export type TestFetchClientOptions = { + withError?: boolean + /** override response (if needed) */ + response?: Response | Promise +} + +/** + * Test http client. By default, it will return a successful response. + */ +export class TestFetchClient extends FetchHTTPClient { + constructor({ withError, response }: TestFetchClientOptions = {}) { + const _mockFetch: HTTPFetchFn = (..._args) => { + if (response) { + return Promise.resolve(response) + } + return Promise.resolve(withError ? createError() : createSuccess()) + } + super(_mockFetch) + } } diff --git a/packages/node/src/__tests__/typedef-tests.ts b/packages/node/src/__tests__/typedef-tests.ts index 01baa9564..2f8e626f6 100644 --- a/packages/node/src/__tests__/typedef-tests.ts +++ b/packages/node/src/__tests__/typedef-tests.ts @@ -1,4 +1,16 @@ -import { Analytics, Context, Plugin, UserTraits, GroupTraits } from '../' +/* eslint-disable @typescript-eslint/no-var-requires */ +import axios from 'axios' +import { + Analytics, + Context, + Plugin, + UserTraits, + GroupTraits, + HTTPClient, + FetchHTTPClient, + HTTPFetchFn, + HTTPClientRequest, +} from '../' /** * These are general typescript definition tests; @@ -14,6 +26,27 @@ export default { analytics.VERSION = 'foo' }, + 'Analytics should accept an entire HTTP Client': () => { + class CustomClient implements HTTPClient { + makeRequest = () => Promise.resolve({} as Response) + } + + new Analytics({ + writeKey: 'foo', + httpClient: new CustomClient(), + }) + + new Analytics({ + writeKey: 'foo', + httpClient: new FetchHTTPClient(globalThis.fetch), + }) + + new Analytics({ + writeKey: 'foo', + httpClient: new FetchHTTPClient(), + }) + }, + 'track/id/pg/screen/grp calls should require either userId or anonymousId': () => { const analytics = new Analytics({ writeKey: 'abc' }) @@ -56,4 +89,32 @@ export default { console.log({} as GroupTraits) console.log({} as UserTraits) }, + + 'HTTPFetchFn should be compatible with standard fetch and node-fetch interface, as well as functions': + () => { + const fetch: HTTPFetchFn = require('node-fetch') + new Analytics({ writeKey: 'foo', httpClient: fetch }) + new Analytics({ writeKey: 'foo', httpClient: globalThis.fetch }) + }, + + 'HTTPFetchFn options should be the expected type': () => { + type BadFetch = (url: string, requestInit: { _bad_object?: string }) => any + + // @ts-expect-error + new Analytics({ writeKey: 'foo', httpClient: {} as BadFetch }) + }, + + 'httpClient setting should be compatible with axios': () => { + new (class implements HTTPClient { + async makeRequest(options: HTTPClientRequest) { + return axios({ + url: options.url, + method: options.method, + data: options.data, + headers: options.headers, + timeout: options.httpRequestTimeout, + }) + } + })() + }, } diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index 25f7f4d42..35edf8ffb 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -16,6 +16,7 @@ import { } from './types' import { Context } from './context' import { NodeEventQueue } from './event-queue' +import { FetchHTTPClient } from '../lib/http-client' export class Analytics extends NodeEmitter implements CoreAnalytics { private readonly _eventFactory: NodeEventFactory @@ -51,6 +52,10 @@ export class Analytics extends NodeEmitter implements CoreAnalytics { httpRequestTimeout: settings.httpRequestTimeout, disable: settings.disable, flushInterval, + httpClient: + typeof settings.httpClient === 'function' + ? new FetchHTTPClient(settings.httpClient) + : settings.httpClient ?? new FetchHTTPClient(), }, this as NodeEmitter ) diff --git a/packages/node/src/app/emitter.ts b/packages/node/src/app/emitter.ts index f68f9d19a..b1d9abf2f 100644 --- a/packages/node/src/app/emitter.ts +++ b/packages/node/src/app/emitter.ts @@ -14,7 +14,7 @@ export type NodeEmitterEvents = CoreEmitterContract & { url: string method: string headers: Record - body: string + body: Record } ] drained: [] diff --git a/packages/node/src/app/settings.ts b/packages/node/src/app/settings.ts index 8088bc56e..0f2ee02f6 100644 --- a/packages/node/src/app/settings.ts +++ b/packages/node/src/app/settings.ts @@ -1,11 +1,11 @@ import { ValidationError } from '@segment/analytics-core' +import { HTTPClient, HTTPFetchFn } from '../lib/http-client' export interface AnalyticsSettings { /** * Key that corresponds to your Segment.io project */ writeKey: string - /** /** * The base URL of the API. Default: "https://api.segment.io" */ @@ -34,6 +34,12 @@ export interface AnalyticsSettings { * Disable the analytics library. All calls will be a noop. Default: false. */ disable?: boolean + /** + * Supply a default http client implementation (such as one supporting proxy). + * Accepts either an HTTPClient instance or a fetch function. + * Default: an HTTP client that uses globalThis.fetch, with node-fetch as a fallback. + */ + httpClient?: HTTPFetchFn | HTTPClient } export const validateSettings = (settings: AnalyticsSettings) => { diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index d6294449a..e1facda3a 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -1,5 +1,14 @@ export { Analytics } from './app/analytics-node' export { Context } from './app/context' +export { + HTTPClient, + FetchHTTPClient, + HTTPFetchRequest, + HTTPResponse, + HTTPFetchFn, + HTTPClientRequest, +} from './lib/http-client' + export type { Plugin, GroupTraits, diff --git a/packages/node/src/lib/__tests__/abort.test.ts b/packages/node/src/lib/__tests__/abort.test.ts index ecc4af764..54f350252 100644 --- a/packages/node/src/lib/__tests__/abort.test.ts +++ b/packages/node/src/lib/__tests__/abort.test.ts @@ -1,7 +1,9 @@ import { abortSignalAfterTimeout } from '../abort' import nock from 'nock' -import { fetch } from '../fetch' import { sleep } from '@segment/analytics-core' +import { fetch as _fetch } from '../fetch' + +const fetch = _fetch as typeof globalThis.fetch describe(abortSignalAfterTimeout, () => { const HOST = 'https://foo.com' diff --git a/packages/node/src/lib/abort.ts b/packages/node/src/lib/abort.ts index 3a60b3cd4..dae0dd341 100644 --- a/packages/node/src/lib/abort.ts +++ b/packages/node/src/lib/abort.ts @@ -7,7 +7,7 @@ import { detectRuntime } from './env' /** * adapted from: https://www.npmjs.com/package/node-abort-controller */ -class AbortSignal { +export class AbortSignal { onabort: globalThis.AbortSignal['onabort'] = null aborted = false eventEmitter = new Emitter() diff --git a/packages/node/src/lib/fetch.ts b/packages/node/src/lib/fetch.ts index 35e65d69f..46b764cb1 100644 --- a/packages/node/src/lib/fetch.ts +++ b/packages/node/src/lib/fetch.ts @@ -1,11 +1,13 @@ -export const fetch: typeof globalThis.fetch = async (...args) => { +import type { HTTPFetchFn } from './http-client' + +export const fetch: HTTPFetchFn = async (...args) => { if (globalThis.fetch) { return globalThis.fetch(...args) - } // @ts-ignore + } // This guard causes is important, as it causes dead-code elimination to be enabled inside this block. + // @ts-ignore else if (typeof EdgeRuntime !== 'string') { - // @ts-ignore - return (await import('node-fetch')).default(...args) as Response + return (await import('node-fetch')).default(...args) } else { throw new Error( 'Invariant: an edge runtime that does not support fetch should not exist' diff --git a/packages/node/src/lib/http-client.ts b/packages/node/src/lib/http-client.ts new file mode 100644 index 000000000..93c1c5a96 --- /dev/null +++ b/packages/node/src/lib/http-client.ts @@ -0,0 +1,95 @@ +import { abortSignalAfterTimeout } from './abort' +import { fetch as defaultFetch } from './fetch' + +/** + * This interface is meant to be compatible with different fetch implementations (node and browser). + * Using the ambient fetch type is not possible because the AbortSignal type is not compatible with node-fetch. + * @link https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API + */ +export interface HTTPFetchFn { + (url: string, requestInit: HTTPFetchRequest): Promise +} + +/** + * This interface is meant to be compatible with the Request interface. + * @link https://developer.mozilla.org/en-US/docs/Web/API/Request + */ +export interface HTTPFetchRequest { + headers: Record + body: string + method: HTTPClientRequest['method'] + signal: any // AbortSignal type does not play nicely with node-fetch +} + +/** + * This interface is meant to very minimally conform to the Response interface. + * @link https://developer.mozilla.org/en-US/docs/Web/API/Response + */ +export interface HTTPResponse { + status: number + statusText: string +} + +/** + * This interface is meant to be a generic interface for making HTTP requests. + * While it may overlap with fetch's Request interface, it is not coupled to it. + */ +export interface HTTPClientRequest { + /** + * URL to be used for the request + * @example 'https://api.segment.io/v1/batch' + */ + url: string + /** + * HTTP method to be used for the request. This will always be a 'POST' request. + **/ + method: 'POST' + /** + * Headers to be sent with the request + */ + headers: Record + /** + * JSON data to be sent with the request (will be stringified) + * @example { "batch": [ ... ]} + */ + data: Record + /** + * Specifies the timeout (in milliseconds) for an HTTP client to get an HTTP response from the server + * @example 10000 + */ + httpRequestTimeout: number +} + +/** + * HTTP client interface for making requests + */ +export interface HTTPClient { + makeRequest(_options: HTTPClientRequest): Promise +} + +/** + * Default HTTP client implementation using fetch + */ +export class FetchHTTPClient implements HTTPClient { + private _fetch: HTTPFetchFn + constructor(fetchFn?: HTTPFetchFn) { + this._fetch = fetchFn ?? defaultFetch + } + async makeRequest(options: HTTPClientRequest): Promise { + const [signal, timeoutId] = abortSignalAfterTimeout( + options.httpRequestTimeout + ) + + const requestInit = { + url: options.url, + method: options.method, + headers: options.headers, + body: JSON.stringify(options.data), + signal: signal, + } + + return this._fetch(options.url, requestInit).finally(() => + clearTimeout(timeoutId) + ) + } +} diff --git a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts index beedd411f..f1ed369b3 100644 --- a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts @@ -1,5 +1,3 @@ -const fetcher = jest.fn() -jest.mock('../../../lib/fetch', () => ({ fetch: fetcher })) import { NodeEventFactory } from '../../../app/event-factory' import { createSuccess } from '../../../__tests__/test-helpers/factories' import { createConfiguredNodePlugin } from '../index' @@ -7,17 +5,31 @@ import { PublisherProps } from '../publisher' import { Context } from '../../../app/context' import { Emitter } from '@segment/analytics-core' import { - bodyPropertyMatchers, - assertSegmentApiBody, -} from './test-helpers/segment-http-api' + assertHTTPRequestOptions, + httpClientOptionsBodyMatcher, +} from '../../../__tests__/test-helpers/assert-shape' +import { TestFetchClient } from '../../../__tests__/test-helpers/create-test-analytics' let emitter: Emitter -const createTestNodePlugin = (props: PublisherProps) => - createConfiguredNodePlugin(props, emitter) +const testClient = new TestFetchClient() +const fetcher = jest.spyOn(testClient, 'makeRequest') + +const createTestNodePlugin = (props: Partial = {}) => + createConfiguredNodePlugin( + { + maxRetries: 3, + maxEventsInBatch: 1, + flushInterval: 1000, + writeKey: '', + httpClient: testClient, + ...props, + }, + emitter + ) const validateFetcherInputs = (...contexts: Context[]) => { - const [url, request] = fetcher.mock.lastCall - return assertSegmentApiBody(url, request, contexts) + const [request] = fetcher.mock.lastCall + return assertHTTPRequestOptions(request, contexts) } const eventFactory = new NodeEventFactory() @@ -29,12 +41,7 @@ beforeEach(() => { }) test('alias', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.alias('to', 'from') const context = new Context(event) @@ -45,12 +52,12 @@ test('alias', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) + const [request] = fetcher.mock.lastCall + const data = request.data - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, type: 'alias', previousId: 'from', userId: 'to', @@ -58,12 +65,7 @@ test('alias', async () => { }) test('group', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.group( 'foo-group-id', @@ -80,12 +82,12 @@ test('group', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) + const [request] = fetcher.mock.lastCall + const data = request.data - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, traits: { name: 'libraries', }, @@ -96,12 +98,7 @@ test('group', async () => { }) test('identify', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.identify('foo-user-id', { name: 'Chris Radek', @@ -114,11 +111,11 @@ test('identify', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + const [request] = fetcher.mock.lastCall + const data = request.data + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, traits: { name: 'Chris Radek', }, @@ -128,12 +125,7 @@ test('identify', async () => { }) test('page', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.page( 'Category', @@ -149,12 +141,12 @@ test('page', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) + const [request] = fetcher.mock.lastCall + const data = request.data - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, type: 'page', userId: 'foo-user-id', name: 'Home', @@ -167,12 +159,7 @@ test('page', async () => { }) test('screen', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.screen( 'Category', @@ -188,12 +175,12 @@ test('screen', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) + const [request] = fetcher.mock.lastCall + const data = request.data - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, type: 'screen', userId: 'foo-user-id', name: 'Home', @@ -205,12 +192,7 @@ test('screen', async () => { }) test('track', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.track( 'test event', @@ -225,12 +207,12 @@ test('track', async () => { expect(fetcher).toHaveBeenCalledTimes(1) validateFetcherInputs(context) - const [, request] = fetcher.mock.lastCall - const body = JSON.parse(request.body) + const [request] = fetcher.mock.lastCall + const data = request.data - expect(body.batch).toHaveLength(1) - expect(body.batch[0]).toEqual({ - ...bodyPropertyMatchers, + expect(data.batch).toHaveLength(1) + expect(data.batch[0]).toEqual({ + ...httpClientOptionsBodyMatcher, type: 'track', event: 'test event', userId: 'foo-user-id', diff --git a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts index 149d6ccb3..c95462976 100644 --- a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts @@ -1,5 +1,3 @@ -const fetcher = jest.fn() -jest.mock('../../../lib/fetch', () => ({ fetch: fetcher })) import { Emitter } from '@segment/analytics-core' import { range } from 'lodash' import { createConfiguredNodePlugin } from '..' @@ -10,16 +8,30 @@ import { createSuccess, createError, } from '../../../__tests__/test-helpers/factories' +import { TestFetchClient } from '../../../__tests__/test-helpers/create-test-analytics' import { PublisherProps } from '../publisher' -import { assertSegmentApiBody } from './test-helpers/segment-http-api' +import { assertHTTPRequestOptions } from '../../../__tests__/test-helpers/assert-shape/segment-http-api' let emitter: Emitter -const createTestNodePlugin = (props: PublisherProps) => - createConfiguredNodePlugin(props, emitter) +const testClient = new TestFetchClient() +const fetcher = jest.spyOn(testClient, 'makeRequest') + +const createTestNodePlugin = (props: Partial = {}) => + createConfiguredNodePlugin( + { + maxEventsInBatch: 1, + httpClient: testClient, + writeKey: '', + flushInterval: 1000, + maxRetries: 3, + ...props, + }, + emitter + ) const validateFetcherInputs = (...contexts: Context[]) => { - const [url, request] = fetcher.mock.lastCall - return assertSegmentApiBody(url, request, contexts) + const [request] = fetcher.mock.lastCall + return assertHTTPRequestOptions(request, contexts) } const eventFactory = new NodeEventFactory() @@ -34,8 +46,6 @@ it('supports multiple events in a batch', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) // Create 3 events of mixed types to send. @@ -62,8 +72,6 @@ it('supports waiting a max amount of time before sending', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -90,8 +98,6 @@ it('sends as soon as batch fills up or max time is reached', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 2, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -127,7 +133,6 @@ it('sends if batch will exceed max size in bytes when adding event', async () => maxRetries: 3, maxEventsInBatch: 20, flushInterval: 100, - writeKey: '', }) const contexts: Context[] = [] @@ -180,10 +185,7 @@ describe('flushAfterClose', () => { ) const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 20, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(3) @@ -197,10 +199,7 @@ describe('flushAfterClose', () => { it('continues to flush on each event if batch size is 1', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(3) @@ -213,10 +212,7 @@ describe('flushAfterClose', () => { it('sends immediately once there are no pending items, even if pending events exceeds batch size', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(5) @@ -228,10 +224,7 @@ describe('flushAfterClose', () => { it('works if there are previous items in the batch', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 7, - flushInterval: 1000, - writeKey: '', }) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush @@ -244,10 +237,7 @@ describe('flushAfterClose', () => { it('works if there are previous items in the batch AND pending items > batch size', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 7, - flushInterval: 1000, - writeKey: '', }) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush @@ -266,10 +256,7 @@ describe('flushAfterClose', () => { describe('error handling', () => { it('excludes events that are too large', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context( @@ -302,10 +289,7 @@ describe('error handling', () => { ) const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -324,19 +308,19 @@ describe('error handling', () => { `) }) - it('retries non-400 errors', async () => { + it.each([ + { status: 500, statusText: 'Internal Server Error' }, + { status: 300, statusText: 'Multiple Choices' }, + { status: 100, statusText: 'Continue' }, + ])('retries non-400 errors: %p', async (response) => { // Jest kept timing out when using fake timers despite advancing time. jest.useRealTimers() - fetcher.mockReturnValue( - createError({ status: 500, statusText: 'Internal Server Error' }) - ) + fetcher.mockReturnValue(createError(response)) const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 2, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -349,13 +333,12 @@ describe('error handling', () => { expect(updatedContext).toBe(context) expect(updatedContext.failedDelivery()).toBeTruthy() - expect(updatedContext.failedDelivery()).toMatchInlineSnapshot(` - Object { - "reason": [Error: [500] Internal Server Error], - } - `) + const err = updatedContext.failedDelivery()?.reason as Error + expect(err).toBeInstanceOf(Error) + expect(err.message).toEqual( + expect.stringContaining(response.status.toString()) + ) }) - it('retries fetch errors', async () => { // Jest kept timing out when using fake timers despite advancing time. jest.useRealTimers() @@ -365,8 +348,6 @@ describe('error handling', () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 2, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('my', 'from')) @@ -394,8 +375,6 @@ describe('error handling', () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 0, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const fn = jest.fn() @@ -417,10 +396,7 @@ describe('error handling', () => { describe('http_request emitter event', () => { it('should emit an http_request object', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) fetcher.mockReturnValueOnce(createSuccess()) diff --git a/packages/node/src/plugins/segmentio/__tests__/test-helpers/segment-http-api.ts b/packages/node/src/plugins/segmentio/__tests__/test-helpers/segment-http-api.ts deleted file mode 100644 index 97847ccb5..000000000 --- a/packages/node/src/plugins/segmentio/__tests__/test-helpers/segment-http-api.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { Context } from '../../../../app/context' - -export const bodyPropertyMatchers = { - messageId: expect.stringMatching(/^node-next-\d*-\w*-\w*-\w*-\w*-\w*/), - timestamp: expect.stringMatching( - /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/ - ), - _metadata: expect.any(Object), - context: { - library: { - name: '@segment/analytics-node', - version: expect.any(String), - }, - }, - integrations: {}, -} - -export function assertSegmentApiBody( - url: string, - request: RequestInit, - contexts: Context[] -) { - const body = JSON.parse(request.body as string) - expect(url).toBe('https://api.segment.io/v1/batch') - expect(request.method).toBe('POST') - expect(request.headers).toMatchInlineSnapshot(` - Object { - "Authorization": "Basic Og==", - "Content-Type": "application/json", - "User-Agent": "analytics-node-next/latest", - } - `) - - expect(body.batch).toHaveLength(contexts.length) - for (let i = 0; i < contexts.length; i++) { - expect(body.batch[i]).toEqual({ - ...contexts[i].event, - ...bodyPropertyMatchers, - }) - } -} diff --git a/packages/node/src/plugins/segmentio/publisher.ts b/packages/node/src/plugins/segmentio/publisher.ts index 72f359ce9..8ad57934d 100644 --- a/packages/node/src/plugins/segmentio/publisher.ts +++ b/packages/node/src/plugins/segmentio/publisher.ts @@ -1,12 +1,11 @@ import { backoff } from '@segment/analytics-core' -import { abortSignalAfterTimeout } from '../../lib/abort' import type { Context } from '../../app/context' import { tryCreateFormattedUrl } from '../../lib/create-url' import { extractPromiseParts } from '../../lib/extract-promise-parts' -import { fetch } from '../../lib/fetch' import { ContextBatch } from './context-batch' import { NodeEmitter } from '../../app/emitter' import { b64encode } from '../../lib/base-64-encode' +import { HTTPClient, HTTPClientRequest } from '../../lib/http-client' function sleep(timeoutInMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) @@ -28,6 +27,7 @@ export interface PublisherProps { writeKey: string httpRequestTimeout?: number disable?: boolean + httpClient: HTTPClient } /** @@ -46,6 +46,7 @@ export class Publisher { private _httpRequestTimeout: number private _emitter: NodeEmitter private _disable: boolean + private _httpClient: HTTPClient constructor( { host, @@ -55,6 +56,7 @@ export class Publisher { flushInterval, writeKey, httpRequestTimeout, + httpClient, disable, }: PublisherProps, emitter: NodeEmitter @@ -70,6 +72,7 @@ export class Publisher { ) this._httpRequestTimeout = httpRequestTimeout ?? 10000 this._disable = Boolean(disable) + this._httpClient = httpClient } private createBatch(): ContextBatch { @@ -183,7 +186,6 @@ export class Publisher { this._closeAndFlushPendingItemsCount -= batch.length } const events = batch.getEvents() - const payload = JSON.stringify({ batch: events }) const maxAttempts = this._maxRetries + 1 let currentAttempt = 0 @@ -191,38 +193,33 @@ export class Publisher { currentAttempt++ let failureReason: unknown - const [signal, timeoutId] = abortSignalAfterTimeout( - this._httpRequestTimeout - ) try { - const requestInit = { - signal: signal, + if (this._disable) { + return batch.resolveEvents() + } + + const request: HTTPClientRequest = { + url: this._url, method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Basic ${this._auth}`, 'User-Agent': 'analytics-node-next/latest', }, - body: payload, + data: { batch: events }, + httpRequestTimeout: this._httpRequestTimeout, } this._emitter.emit('http_request', { - url: this._url, - method: requestInit.method, - headers: requestInit.headers, - body: requestInit.body, + body: request.data, + method: request.method, + url: request.url, + headers: request.headers, }) - if (this._disable) { - clearTimeout(timeoutId) - return batch.resolveEvents() - } - - const response = await fetch(this._url, requestInit) - - clearTimeout(timeoutId) + const response = await this._httpClient.makeRequest(request) - if (response.ok) { + if (response.status >= 200 && response.status < 300) { // Successfully sent events, so exit! batch.resolveEvents() return diff --git a/yarn.lock b/yarn.lock index fa031fa72..39cf2547c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1929,6 +1929,7 @@ __metadata: "@lukeed/uuid": ^2.0.0 "@segment/analytics-core": 1.3.0 "@types/node": ^16 + axios: ^1.4.0 buffer: ^6.0.3 node-fetch: ^2.6.7 tslib: ^2.4.1 @@ -5407,6 +5408,17 @@ __metadata: languageName: node linkType: hard +"axios@npm:^1.4.0": + version: 1.4.0 + resolution: "axios@npm:1.4.0" + dependencies: + follow-redirects: ^1.15.0 + form-data: ^4.0.0 + proxy-from-env: ^1.1.0 + checksum: 7fb6a4313bae7f45e89d62c70a800913c303df653f19eafec88e56cea2e3821066b8409bc68be1930ecca80e861c52aa787659df0ffec6ad4d451c7816b9386b + languageName: node + linkType: hard + "axobject-query@npm:^2.2.0": version: 2.2.0 resolution: "axobject-query@npm:2.2.0" @@ -9073,7 +9085,7 @@ __metadata: languageName: node linkType: hard -"follow-redirects@npm:^1.0.0, follow-redirects@npm:^1.14.9": +"follow-redirects@npm:^1.0.0, follow-redirects@npm:^1.14.9, follow-redirects@npm:^1.15.0": version: 1.15.2 resolution: "follow-redirects@npm:1.15.2" peerDependenciesMeta: @@ -14408,7 +14420,7 @@ __metadata: languageName: node linkType: hard -"proxy-from-env@npm:1.1.0": +"proxy-from-env@npm:1.1.0, proxy-from-env@npm:^1.1.0": version: 1.1.0 resolution: "proxy-from-env@npm:1.1.0" checksum: ed7fcc2ba0a33404958e34d95d18638249a68c430e30fcb6c478497d72739ba64ce9810a24f53a7d921d0c065e5b78e3822759800698167256b04659366ca4d4