From b30b4f3b0307613edeab0856482c5d63e3f453b4 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 6 May 2024 14:23:12 -0400 Subject: [PATCH 01/13] In progress tests for batch and fetch retry improvements --- .../__tests__/batched-dispatcher.test.ts | 21 ++++++++ .../segmentio/__tests__/retries.test.ts | 53 +++++++++++++++++++ .../plugins/segmentio/batched-dispatcher.ts | 41 +++++++++++--- .../src/plugins/segmentio/fetch-dispatcher.ts | 16 ++++++ .../browser/src/plugins/segmentio/index.ts | 10 +++- .../src/plugins/segmentio/ratelimit-error.ts | 9 ++++ packages/core/src/priority-queue/index.ts | 23 ++++++++ 7 files changed, 165 insertions(+), 8 deletions(-) create mode 100644 packages/browser/src/plugins/segmentio/ratelimit-error.ts diff --git a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts index d45ddd32a..2c920e2d6 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts @@ -4,6 +4,7 @@ jest.mock('unfetch', () => { return fetch }) +import { createError, createSuccess } from '../../../test-helpers/factories' import batch from '../batched-dispatcher' const fatEvent = { @@ -52,6 +53,7 @@ describe('Batching', () => { jest.useFakeTimers({ now: new Date('9 Jun 1993 00:00:00Z').getTime(), }) + fetch.mockReturnValue(createSuccess({})) }) afterEach(() => { @@ -292,3 +294,22 @@ describe('Batching', () => { }) }) }) +// describe('on error', () => { +// it('retries on 500', async () => { +// fetch.mockReturnValueOnce( +// createError({ +// status: 500, +// }) +// ) +// }) +// it('retries on 429 after delay', async () => { +// fetch.mockReturnValueOnce( +// createError({ +// status: 429, +// headers: { +// 'x-ratelimit-reset': '123', +// }, +// }) +// ) +// }) +// }) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 0360b8aa1..55ae79675 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -8,11 +8,64 @@ import { scheduleFlush } from '../schedule-flush' import * as PPQ from '../../../lib/priority-queue/persisted' import * as PQ from '../../../lib/priority-queue' import { Context } from '../../../core/context' +import { createError, createSuccess } from '../../../test-helpers/factories' +import unfetch from 'unfetch' jest.mock('../schedule-flush') type QueueType = 'priority' | 'persisted' +jest.mock('unfetch', () => { + return jest.fn() +}) + +test('retries on 500', async () => { + const fetched = jest + .mocked(unfetch) + .mockImplementation(() => createError({ status: 500 })) + + const options = { apiKey: 'foo' } + const analytics = new Analytics( + { writeKey: options.apiKey }, + { + retryQueue: true, + } + ) + + const ctx = await analytics.track('event') + expect(ctx.attempts).toBe(1) + expect(fetched).toHaveBeenCalledTimes(2) +}) + +test('delays retry on 429', async () => { + const headers = new Headers() + const resetTime = 0.35 + headers.set('x-ratelimit-reset', resetTime.toString()) + const fetched = jest + .mocked(unfetch) + .mockReturnValueOnce( + createError({ + status: 429, + statusText: 'Too Many Requests', + ...headers, + }) + ) + .mockReturnValue(createSuccess({})) + + const options = { apiKey: 'foo' } + const analytics = new Analytics( + { writeKey: options.apiKey }, + { + retryQueue: true, + } + ) + + const ctx = await analytics.track('event') + expect(ctx.attempts).toBe(1) + + expect(fetched).toHaveBeenCalledTimes(2) +}) + describe('Segment.io retries', () => { let options: SegmentioSettings let analytics: Analytics diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index 1fb172d6a..abc7cd85d 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -1,6 +1,7 @@ import { SegmentEvent } from '../../core/events' import { fetch } from '../../lib/fetch' import { onPageChange } from '../../lib/on-page-change' +import { RateLimitError } from './ratelimit-error' export type BatchingDispatchConfig = { size?: number @@ -52,6 +53,7 @@ export default function batch( const limit = config?.size ?? 10 const timeout = config?.timeout ?? 5000 + let ratelimittimeout = 0 function sendBatch(batch: object[]) { if (batch.length === 0) { @@ -65,7 +67,7 @@ export default function batch( const { sentAt, ...newEvent } = event as SegmentEvent return newEvent }) - + console.log('sending batch', updatedBatch) return fetch(`https://${apiHost}/b`, { keepalive: pageUnloaded, headers: { @@ -77,6 +79,20 @@ export default function batch( batch: updatedBatch, sentAt: new Date().toISOString(), }), + }).then((res) => { + if (res.status >= 500) { + throw new Error(`Bad response from server: ${res.status}`) + } + if (res.status == 429) { + const retryTimeoutStringSecs = res.headers.get('x-ratelimit-reset') + const retryTimeoutMS = retryTimeoutStringSecs + ? parseInt(retryTimeoutStringSecs) * 1000 + : 0 + throw new RateLimitError( + `Rate limit exceeded: ${res.status}`, + retryTimeoutMS + ) + } }) } @@ -84,7 +100,15 @@ export default function batch( if (buffer.length) { const batch = buffer buffer = [] - return sendBatch(batch) + return sendBatch(batch)?.catch((error) => { + if (error instanceof RateLimitError) { + ratelimittimeout = error.retryTimeout + buffer.push(batch) + } else { + buffer.push(batch) + } + scheduleFlush() + }) } } @@ -95,10 +119,14 @@ export default function batch( return } - schedule = setTimeout(() => { - schedule = undefined - flush().catch(console.error) - }, timeout) + schedule = setTimeout( + () => { + schedule = undefined + flush().catch(console.error) + }, + ratelimittimeout ? ratelimittimeout : timeout + ) + ratelimittimeout = 0 } onPageChange((unloaded) => { @@ -107,6 +135,7 @@ export default function batch( if (pageUnloaded && buffer.length) { const reqs = chunks(buffer).map(sendBatch) Promise.all(reqs).catch(console.error) + // can we handle a retry here? } }) diff --git a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts index d4118680c..a644388c9 100644 --- a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts @@ -1,4 +1,5 @@ import { fetch } from '../../lib/fetch' +import { RateLimitError } from './ratelimit-error' export type Dispatcher = (url: string, body: object) => Promise @@ -10,11 +11,26 @@ export default function (config?: StandardDispatcherConfig): { dispatch: Dispatcher } { function dispatch(url: string, body: object): Promise { + console.log('dispatching', url, body) return fetch(url, { keepalive: config?.keepalive, headers: { 'Content-Type': 'text/plain' }, method: 'post', body: JSON.stringify(body), + }).then((res) => { + if (res.status >= 500) { + throw new Error(`Bad response from server: ${res.status}`) + } + if (res.status == 429) { + const retryTimeoutStringSecs = res.headers.get('x-ratelimit-reset') + const retryTimeoutMS = retryTimeoutStringSecs + ? parseInt(retryTimeoutStringSecs) * 1000 + : 0 + throw new RateLimitError( + `Rate limit exceeded: ${res.status}`, + retryTimeoutMS + ) + } }) } diff --git a/packages/browser/src/plugins/segmentio/index.ts b/packages/browser/src/plugins/segmentio/index.ts index b5b0bdd7f..b338764e3 100644 --- a/packages/browser/src/plugins/segmentio/index.ts +++ b/packages/browser/src/plugins/segmentio/index.ts @@ -7,6 +7,7 @@ import { Plugin } from '../../core/plugin' import { PriorityQueue } from '../../lib/priority-queue' import { PersistedPriorityQueue } from '../../lib/priority-queue/persisted' import { toFacade } from '../../lib/to-facade' +import { RateLimitError } from './ratelimit-error' import batch, { BatchingDispatchConfig } from './batched-dispatcher' import standard, { StandardDispatcherConfig } from './fetch-dispatcher' import { normalize } from './normalize' @@ -112,8 +113,13 @@ export function segmentio( normalize(analytics, json, settings, integrations) ) .then(() => ctx) - .catch(() => { - buffer.pushWithBackoff(ctx) + .catch((error) => { + if (error instanceof RateLimitError) { + const timeout = error.retryTimeout + buffer.pushWithTimeout(ctx, timeout) + } else { + buffer.pushWithBackoff(ctx) + } // eslint-disable-next-line @typescript-eslint/no-use-before-define scheduleFlush(flushing, buffer, segmentio, scheduleFlush) return ctx diff --git a/packages/browser/src/plugins/segmentio/ratelimit-error.ts b/packages/browser/src/plugins/segmentio/ratelimit-error.ts new file mode 100644 index 000000000..040bf91db --- /dev/null +++ b/packages/browser/src/plugins/segmentio/ratelimit-error.ts @@ -0,0 +1,9 @@ +export class RateLimitError extends Error { + retryTimeout: number + + constructor(message: string, retryTimeout: number) { + super(message) + this.retryTimeout = retryTimeout + this.name = 'RateLimitError' + } +} diff --git a/packages/core/src/priority-queue/index.ts b/packages/core/src/priority-queue/index.ts index 3be5eccf2..48bad8644 100644 --- a/packages/core/src/priority-queue/index.ts +++ b/packages/core/src/priority-queue/index.ts @@ -71,6 +71,29 @@ export class PriorityQueue extends Emitter { return true } + pushWithTimeout(item: Item, timeout: number): boolean { + if (this.getAttempts(item) === 0) { + return this.push(item)[0] + } + + const attempt = this.updateAttempts(item) + + if (attempt > this.maxAttempts || this.includes(item)) { + return false + } + + setTimeout(() => { + this.queue.push(item) + // remove from future list + this.future = this.future.filter((f) => f.id !== item.id) + // Lets listeners know that a 'future' message is now available in the queue + this.emit(ON_REMOVE_FROM_FUTURE) + }, timeout) + + this.future.push(item) + return true + } + public getAttempts(item: Item): number { return this.seen[item.id] ?? 0 } From af2947b4f0c89d1562192c97e33ae0d23e0829a8 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 7 May 2024 18:51:22 -0400 Subject: [PATCH 02/13] Adding requested field for number of retries --- packages/browser/src/plugins/segmentio/index.ts | 2 +- .../browser/src/plugins/segmentio/normalize.ts | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/index.ts b/packages/browser/src/plugins/segmentio/index.ts index b338764e3..a0031c253 100644 --- a/packages/browser/src/plugins/segmentio/index.ts +++ b/packages/browser/src/plugins/segmentio/index.ts @@ -110,7 +110,7 @@ export function segmentio( return client .dispatch( `${remote}/${path}`, - normalize(analytics, json, settings, integrations) + normalize(analytics, json, settings, integrations, ctx) ) .then(() => ctx) .catch((error) => { diff --git a/packages/browser/src/plugins/segmentio/normalize.ts b/packages/browser/src/plugins/segmentio/normalize.ts index afb22cf3d..be27b40b1 100644 --- a/packages/browser/src/plugins/segmentio/normalize.ts +++ b/packages/browser/src/plugins/segmentio/normalize.ts @@ -2,12 +2,14 @@ import { Analytics } from '../../core/analytics' import { LegacySettings } from '../../browser' import { SegmentFacade } from '../../lib/to-facade' import { SegmentioSettings } from './index' +import { Context } from '../../core/context' export function normalize( analytics: Analytics, json: ReturnType, settings?: SegmentioSettings, - integrations?: LegacySettings['integrations'] + integrations?: LegacySettings['integrations'], + ctx?: Context ): object { const user = analytics.user() @@ -25,6 +27,16 @@ export function normalize( json._metadata = { failedInitializations: failed } } + if (ctx != null) { + const retryCount = analytics.queue.queue.getAttempts(ctx) + if (retryCount > 1) { + json._metadata = { + ...json._metadata, + retryCount, + } + } + } + const bundled: string[] = [] const unbundled: string[] = [] From 59fd2946e568de7631706a69d99dcad98c6311af Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 8 May 2024 17:13:11 -0400 Subject: [PATCH 03/13] Adding segmentio plugin to analytics in retry tests --- .../segmentio/__tests__/retries.test.ts | 92 ++++++++++--------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 55ae79675..14253e5f5 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -19,51 +19,61 @@ jest.mock('unfetch', () => { return jest.fn() }) -test('retries on 500', async () => { - const fetched = jest - .mocked(unfetch) - .mockImplementation(() => createError({ status: 500 })) - - const options = { apiKey: 'foo' } - const analytics = new Analytics( - { writeKey: options.apiKey }, - { - retryQueue: true, - } - ) - - const ctx = await analytics.track('event') - expect(ctx.attempts).toBe(1) - expect(fetched).toHaveBeenCalledTimes(2) -}) - -test('delays retry on 429', async () => { - const headers = new Headers() - const resetTime = 0.35 - headers.set('x-ratelimit-reset', resetTime.toString()) - const fetched = jest - .mocked(unfetch) - .mockReturnValueOnce( - createError({ - status: 429, - statusText: 'Too Many Requests', - ...headers, - }) +describe('Segment.io retries 500s and 429', () => { + let options: SegmentioSettings + let analytics: Analytics + let segment: Plugin + beforeEach(async () => { + jest.resetAllMocks() + jest.restoreAllMocks() + + // @ts-expect-error reassign import + isOffline = jest.fn().mockImplementation(() => true) + + options = { apiKey: 'foo' } + analytics = new Analytics( + { writeKey: options.apiKey }, + { + retryQueue: true, + } ) - .mockReturnValue(createSuccess({})) + segment = await segmentio(analytics, options, {}) - const options = { apiKey: 'foo' } - const analytics = new Analytics( - { writeKey: options.apiKey }, - { - retryQueue: true, - } - ) + await analytics.register(segment, envEnrichment) + }) + + test('retries on 500', async () => { + const fetched = jest + .mocked(unfetch) + .mockImplementation(() => createError({ status: 500 })) - const ctx = await analytics.track('event') - expect(ctx.attempts).toBe(1) + const ctx = await analytics.track('event') - expect(fetched).toHaveBeenCalledTimes(2) + expect(ctx.attempts).toBe(1) + expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) + expect(fetched).toHaveBeenCalledTimes(2) + }) + + test('delays retry on 429', async () => { + const headers = new Headers() + const resetTime = 0.35 + headers.set('x-ratelimit-reset', resetTime.toString()) + const fetched = jest + .mocked(unfetch) + .mockReturnValueOnce( + createError({ + status: 429, + statusText: 'Too Many Requests', + ...headers, + }) + ) + .mockReturnValue(createSuccess({})) + + const ctx = await analytics.track('event') + expect(ctx.attempts).toBe(1) + + expect(fetched).toHaveBeenCalledTimes(2) + }) }) describe('Segment.io retries', () => { From 15dbeac40bbaa6ac7199fe11464ea5eba0788d89 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Wed, 8 May 2024 14:29:15 -0700 Subject: [PATCH 04/13] fix mocking --- .../segmentio/__tests__/retries.test.ts | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 14253e5f5..11d1a74a8 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -1,7 +1,10 @@ +const fetch = jest.fn() +jest.mock('unfetch', () => { + return fetch +}) + import { segmentio, SegmentioSettings } from '..' import { Analytics } from '../../../core/analytics' -// @ts-ignore isOffline mocked dependency is accused as unused -import { isOffline } from '../../../core/connection' import { Plugin } from '../../../core/plugin' import { envEnrichment } from '../../env-enrichment' import { scheduleFlush } from '../schedule-flush' @@ -9,16 +12,11 @@ import * as PPQ from '../../../lib/priority-queue/persisted' import * as PQ from '../../../lib/priority-queue' import { Context } from '../../../core/context' import { createError, createSuccess } from '../../../test-helpers/factories' -import unfetch from 'unfetch' jest.mock('../schedule-flush') type QueueType = 'priority' | 'persisted' -jest.mock('unfetch', () => { - return jest.fn() -}) - describe('Segment.io retries 500s and 429', () => { let options: SegmentioSettings let analytics: Analytics @@ -27,9 +25,6 @@ describe('Segment.io retries 500s and 429', () => { jest.resetAllMocks() jest.restoreAllMocks() - // @ts-expect-error reassign import - isOffline = jest.fn().mockImplementation(() => true) - options = { apiKey: 'foo' } analytics = new Analytics( { writeKey: options.apiKey }, @@ -38,28 +33,24 @@ describe('Segment.io retries 500s and 429', () => { } ) segment = await segmentio(analytics, options, {}) - await analytics.register(segment, envEnrichment) }) test('retries on 500', async () => { - const fetched = jest - .mocked(unfetch) - .mockImplementation(() => createError({ status: 500 })) + fetch.mockImplementation(() => createError({ status: 500 })) const ctx = await analytics.track('event') expect(ctx.attempts).toBe(1) expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) - expect(fetched).toHaveBeenCalledTimes(2) + expect(fetch).toHaveBeenCalledTimes(2) }) test('delays retry on 429', async () => { const headers = new Headers() const resetTime = 0.35 headers.set('x-ratelimit-reset', resetTime.toString()) - const fetched = jest - .mocked(unfetch) + fetch .mockReturnValueOnce( createError({ status: 429, @@ -72,7 +63,7 @@ describe('Segment.io retries 500s and 429', () => { const ctx = await analytics.track('event') expect(ctx.attempts).toBe(1) - expect(fetched).toHaveBeenCalledTimes(2) + expect(fetch).toHaveBeenCalledTimes(2) }) }) From 1250f53837e32390e4563148802046cd576f6301 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Wed, 8 May 2024 14:31:04 -0700 Subject: [PATCH 05/13] skip broken tests --- .../browser/src/plugins/segmentio/__tests__/retries.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 11d1a74a8..bfbf68c8c 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -5,6 +5,8 @@ jest.mock('unfetch', () => { import { segmentio, SegmentioSettings } from '..' import { Analytics } from '../../../core/analytics' +// @ts-ignore isOffline mocked dependency is accused as unused +import { isOffline } from '../../../core/connection' import { Plugin } from '../../../core/plugin' import { envEnrichment } from '../../env-enrichment' import { scheduleFlush } from '../schedule-flush' @@ -17,7 +19,7 @@ jest.mock('../schedule-flush') type QueueType = 'priority' | 'persisted' -describe('Segment.io retries 500s and 429', () => { +describe.skip('Segment.io retries 500s and 429', () => { let options: SegmentioSettings let analytics: Analytics let segment: Plugin From d4d002bc8aacdf5dc6d35403642e7c4505e2e8d1 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 17 May 2024 11:57:47 -0400 Subject: [PATCH 06/13] Batch tests and fixes --- .../__tests__/batched-dispatcher.test.ts | 19 ----- .../segmentio/__tests__/retries.test.ts | 76 ++++++++++++++++++- .../plugins/segmentio/batched-dispatcher.ts | 26 ++++--- .../src/plugins/segmentio/fetch-dispatcher.ts | 2 +- 4 files changed, 89 insertions(+), 34 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts index 2c920e2d6..b292a4755 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts @@ -294,22 +294,3 @@ describe('Batching', () => { }) }) }) -// describe('on error', () => { -// it('retries on 500', async () => { -// fetch.mockReturnValueOnce( -// createError({ -// status: 500, -// }) -// ) -// }) -// it('retries on 429 after delay', async () => { -// fetch.mockReturnValueOnce( -// createError({ -// status: 429, -// headers: { -// 'x-ratelimit-reset': '123', -// }, -// }) -// ) -// }) -// }) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index bfbf68c8c..7be22761d 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -19,7 +19,7 @@ jest.mock('../schedule-flush') type QueueType = 'priority' | 'persisted' -describe.skip('Segment.io retries 500s and 429', () => { +describe('Segment.io retries 500s and 429', () => { let options: SegmentioSettings let analytics: Analytics let segment: Plugin @@ -45,7 +45,8 @@ describe.skip('Segment.io retries 500s and 429', () => { expect(ctx.attempts).toBe(1) expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) - expect(fetch).toHaveBeenCalledTimes(2) + expect(fetch).toHaveBeenCalledTimes(1) + expect(scheduleFlush).toHaveBeenCalled() }) test('delays retry on 429', async () => { @@ -64,8 +65,79 @@ describe.skip('Segment.io retries 500s and 429', () => { const ctx = await analytics.track('event') expect(ctx.attempts).toBe(1) + expect(fetch).toHaveBeenCalledTimes(1) + expect(scheduleFlush).toHaveBeenCalled() + }) +}) + +describe('Batches retry 500s and 429', () => { + let options: SegmentioSettings + let analytics: Analytics + let segment: Plugin + beforeEach(async () => { + jest.resetAllMocks() + jest.restoreAllMocks() + + options = { + apiKey: 'foo', + deliveryStrategy: { + strategy: 'batching', + // timeout is set very low to get consistent behavior out of scheduleflush + config: { size: 3, timeout: 1, retryattempts: 3 }, + }, + } + analytics = new Analytics( + { writeKey: options.apiKey }, + { + retryQueue: true, + } + ) + segment = await segmentio(analytics, options, {}) + await analytics.register(segment, envEnrichment) + }) + + test('retries on 500', async () => { + fetch + .mockReturnValueOnce(() => createError({ status: 500 })) + .mockReturnValue(createSuccess({})) + + const ctx1 = await analytics.track('event1') + const ctx2 = await analytics.track('event2') + // wait a bit for retries - timeout is only 1 ms + await new Promise((resolve) => setTimeout(resolve, 100)) + + expect(ctx1.attempts).toBe(1) + expect(analytics.queue.queue.getAttempts(ctx1)).toBe(1) + expect(fetch).toHaveBeenCalledTimes(2) + }) + + test('delays retry on 429', async () => { + const headers = new Headers() + const resetTime = 1 + headers.set('x-ratelimit-reset', resetTime.toString()) + fetch.mockReturnValue( + createError({ + status: 429, + statusText: 'Too Many Requests', + headers: headers, + }) + ) + + const ctx1 = await analytics.track('event1') + const ctx2 = await analytics.track('event2') + + await new Promise((resolve) => setTimeout(resolve, 100)) + + expect(ctx1.attempts).toBe(1) + expect(fetch).toHaveBeenCalledTimes(1) + expect(fetch).toHaveBeenCalledTimes(1) + await new Promise((resolve) => setTimeout(resolve, 1000)) expect(fetch).toHaveBeenCalledTimes(2) + await new Promise((resolve) => setTimeout(resolve, 1000)) + expect(fetch).toHaveBeenCalledTimes(3) + await new Promise((resolve) => setTimeout(resolve, 1000)) + expect(fetch).toHaveBeenCalledTimes(3) // capped at 3 retries }) }) diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index abc7cd85d..781cfe559 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -6,6 +6,7 @@ import { RateLimitError } from './ratelimit-error' export type BatchingDispatchConfig = { size?: number timeout?: number + retryattempts?: number } const MAX_PAYLOAD_SIZE = 500 @@ -84,10 +85,11 @@ export default function batch( throw new Error(`Bad response from server: ${res.status}`) } if (res.status == 429) { - const retryTimeoutStringSecs = res.headers.get('x-ratelimit-reset') - const retryTimeoutMS = retryTimeoutStringSecs - ? parseInt(retryTimeoutStringSecs) * 1000 - : 0 + const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset') + const retryTimeoutMS = + retryTimeoutStringSecs != null + ? parseInt(retryTimeoutStringSecs) * 1000 + : timeout throw new RateLimitError( `Rate limit exceeded: ${res.status}`, retryTimeoutMS @@ -96,25 +98,25 @@ export default function batch( }) } - async function flush(): Promise { + async function flush(attempt = 1): Promise { if (buffer.length) { const batch = buffer buffer = [] return sendBatch(batch)?.catch((error) => { - if (error instanceof RateLimitError) { - ratelimittimeout = error.retryTimeout - buffer.push(batch) - } else { + if (attempt < (config?.retryattempts ?? 10)) { + if (error.name == 'RateLimitError') { + ratelimittimeout = error.retryTimeout + } buffer.push(batch) + scheduleFlush(attempt + 1) } - scheduleFlush() }) } } let schedule: NodeJS.Timeout | undefined - function scheduleFlush(): void { + function scheduleFlush(attempt = 1): void { if (schedule) { return } @@ -122,7 +124,7 @@ export default function batch( schedule = setTimeout( () => { schedule = undefined - flush().catch(console.error) + flush(attempt).catch(console.error) }, ratelimittimeout ? ratelimittimeout : timeout ) diff --git a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts index a644388c9..ec63ff5fc 100644 --- a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts @@ -25,7 +25,7 @@ export default function (config?: StandardDispatcherConfig): { const retryTimeoutStringSecs = res.headers.get('x-ratelimit-reset') const retryTimeoutMS = retryTimeoutStringSecs ? parseInt(retryTimeoutStringSecs) * 1000 - : 0 + : 5000 throw new RateLimitError( `Rate limit exceeded: ${res.status}`, retryTimeoutMS From ec1773f904b2fa10843c82b63efe3a46b8ecd3e8 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 17 May 2024 12:30:52 -0400 Subject: [PATCH 07/13] Fixed some bugs, removed redundant code --- .../segmentio/__tests__/retries.test.ts | 4 +-- .../plugins/segmentio/batched-dispatcher.ts | 1 + .../src/plugins/segmentio/fetch-dispatcher.ts | 2 +- .../browser/src/plugins/segmentio/index.ts | 6 ++-- packages/core/src/priority-queue/index.ts | 28 +++---------------- 5 files changed, 11 insertions(+), 30 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 7be22761d..4057bab9e 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -51,14 +51,14 @@ describe('Segment.io retries 500s and 429', () => { test('delays retry on 429', async () => { const headers = new Headers() - const resetTime = 0.35 + const resetTime = 1 headers.set('x-ratelimit-reset', resetTime.toString()) fetch .mockReturnValueOnce( createError({ status: 429, statusText: 'Too Many Requests', - ...headers, + headers: headers, }) ) .mockReturnValue(createSuccess({})) diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index 781cfe559..96098b047 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -103,6 +103,7 @@ export default function batch( const batch = buffer buffer = [] return sendBatch(batch)?.catch((error) => { + console.error('Error sending batch', error) if (attempt < (config?.retryattempts ?? 10)) { if (error.name == 'RateLimitError') { ratelimittimeout = error.retryTimeout diff --git a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts index ec63ff5fc..c2f52e015 100644 --- a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts @@ -22,7 +22,7 @@ export default function (config?: StandardDispatcherConfig): { throw new Error(`Bad response from server: ${res.status}`) } if (res.status == 429) { - const retryTimeoutStringSecs = res.headers.get('x-ratelimit-reset') + const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset') const retryTimeoutMS = retryTimeoutStringSecs ? parseInt(retryTimeoutStringSecs) * 1000 : 5000 diff --git a/packages/browser/src/plugins/segmentio/index.ts b/packages/browser/src/plugins/segmentio/index.ts index a0031c253..322ecebd3 100644 --- a/packages/browser/src/plugins/segmentio/index.ts +++ b/packages/browser/src/plugins/segmentio/index.ts @@ -7,7 +7,6 @@ import { Plugin } from '../../core/plugin' import { PriorityQueue } from '../../lib/priority-queue' import { PersistedPriorityQueue } from '../../lib/priority-queue/persisted' import { toFacade } from '../../lib/to-facade' -import { RateLimitError } from './ratelimit-error' import batch, { BatchingDispatchConfig } from './batched-dispatcher' import standard, { StandardDispatcherConfig } from './fetch-dispatcher' import { normalize } from './normalize' @@ -114,9 +113,10 @@ export function segmentio( ) .then(() => ctx) .catch((error) => { - if (error instanceof RateLimitError) { + console.error('Error sending event', error) + if (error.name == 'RateLimitError') { const timeout = error.retryTimeout - buffer.pushWithTimeout(ctx, timeout) + buffer.pushWithBackoff(ctx, timeout) } else { buffer.pushWithBackoff(ctx) } diff --git a/packages/core/src/priority-queue/index.ts b/packages/core/src/priority-queue/index.ts index 48bad8644..d4a436ef8 100644 --- a/packages/core/src/priority-queue/index.ts +++ b/packages/core/src/priority-queue/index.ts @@ -46,7 +46,7 @@ export class PriorityQueue extends Emitter { return accepted } - pushWithBackoff(item: Item): boolean { + pushWithBackoff(item: Item, minTimeout = 0): boolean { if (this.getAttempts(item) === 0) { return this.push(item)[0] } @@ -57,29 +57,9 @@ export class PriorityQueue extends Emitter { return false } - const timeout = backoff({ attempt: attempt - 1 }) - - setTimeout(() => { - this.queue.push(item) - // remove from future list - this.future = this.future.filter((f) => f.id !== item.id) - // Lets listeners know that a 'future' message is now available in the queue - this.emit(ON_REMOVE_FROM_FUTURE) - }, timeout) - - this.future.push(item) - return true - } - - pushWithTimeout(item: Item, timeout: number): boolean { - if (this.getAttempts(item) === 0) { - return this.push(item)[0] - } - - const attempt = this.updateAttempts(item) - - if (attempt > this.maxAttempts || this.includes(item)) { - return false + let timeout = backoff({ attempt: attempt - 1 }) + if (timeout < minTimeout) { + timeout = minTimeout } setTimeout(() => { From d22acb4853f38962ea22fa9c9ba3873a6daf8f1c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 20 May 2024 14:55:21 -0400 Subject: [PATCH 08/13] Cleaning up --- .../src/plugins/segmentio/__tests__/retries.test.ts | 2 +- .../src/plugins/segmentio/batched-dispatcher.ts | 11 +++++------ .../browser/src/plugins/segmentio/fetch-dispatcher.ts | 3 +-- packages/browser/src/plugins/segmentio/index.ts | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 4057bab9e..cbba40ca4 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -83,7 +83,7 @@ describe('Batches retry 500s and 429', () => { deliveryStrategy: { strategy: 'batching', // timeout is set very low to get consistent behavior out of scheduleflush - config: { size: 3, timeout: 1, retryattempts: 3 }, + config: { size: 3, timeout: 1, retryAttempts: 3 }, }, } analytics = new Analytics( diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index 96098b047..77760ddcf 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -6,7 +6,7 @@ import { RateLimitError } from './ratelimit-error' export type BatchingDispatchConfig = { size?: number timeout?: number - retryattempts?: number + retryAttempts?: number } const MAX_PAYLOAD_SIZE = 500 @@ -68,7 +68,7 @@ export default function batch( const { sentAt, ...newEvent } = event as SegmentEvent return newEvent }) - console.log('sending batch', updatedBatch) + return fetch(`https://${apiHost}/b`, { keepalive: pageUnloaded, headers: { @@ -84,7 +84,7 @@ export default function batch( if (res.status >= 500) { throw new Error(`Bad response from server: ${res.status}`) } - if (res.status == 429) { + if (res.status === 429) { const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset') const retryTimeoutMS = retryTimeoutStringSecs != null @@ -104,8 +104,8 @@ export default function batch( buffer = [] return sendBatch(batch)?.catch((error) => { console.error('Error sending batch', error) - if (attempt < (config?.retryattempts ?? 10)) { - if (error.name == 'RateLimitError') { + if (attempt < (config?.retryAttempts ?? 10)) { + if (error.name === 'RateLimitError') { ratelimittimeout = error.retryTimeout } buffer.push(batch) @@ -138,7 +138,6 @@ export default function batch( if (pageUnloaded && buffer.length) { const reqs = chunks(buffer).map(sendBatch) Promise.all(reqs).catch(console.error) - // can we handle a retry here? } }) diff --git a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts index c2f52e015..0beddcc10 100644 --- a/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/fetch-dispatcher.ts @@ -11,7 +11,6 @@ export default function (config?: StandardDispatcherConfig): { dispatch: Dispatcher } { function dispatch(url: string, body: object): Promise { - console.log('dispatching', url, body) return fetch(url, { keepalive: config?.keepalive, headers: { 'Content-Type': 'text/plain' }, @@ -21,7 +20,7 @@ export default function (config?: StandardDispatcherConfig): { if (res.status >= 500) { throw new Error(`Bad response from server: ${res.status}`) } - if (res.status == 429) { + if (res.status === 429) { const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset') const retryTimeoutMS = retryTimeoutStringSecs ? parseInt(retryTimeoutStringSecs) * 1000 diff --git a/packages/browser/src/plugins/segmentio/index.ts b/packages/browser/src/plugins/segmentio/index.ts index 322ecebd3..b9332f0c4 100644 --- a/packages/browser/src/plugins/segmentio/index.ts +++ b/packages/browser/src/plugins/segmentio/index.ts @@ -114,7 +114,7 @@ export function segmentio( .then(() => ctx) .catch((error) => { console.error('Error sending event', error) - if (error.name == 'RateLimitError') { + if (error.name === 'RateLimitError') { const timeout = error.retryTimeout buffer.pushWithBackoff(ctx, timeout) } else { From 89993fb2f2a858aba89e9d7692a0586b0b33bee5 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 20 May 2024 15:08:03 -0400 Subject: [PATCH 09/13] Fixing lint problems --- .../segmentio/__tests__/batched-dispatcher.test.ts | 2 +- .../plugins/segmentio/__tests__/retries.test.ts | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts index b292a4755..2942b4c56 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts @@ -4,7 +4,7 @@ jest.mock('unfetch', () => { return fetch }) -import { createError, createSuccess } from '../../../test-helpers/factories' +import { createSuccess } from '../../../test-helpers/factories' import batch from '../batched-dispatcher' const fatEvent = { diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index cbba40ca4..7cf091356 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -101,13 +101,13 @@ describe('Batches retry 500s and 429', () => { .mockReturnValueOnce(() => createError({ status: 500 })) .mockReturnValue(createSuccess({})) - const ctx1 = await analytics.track('event1') - const ctx2 = await analytics.track('event2') + await analytics.track('event1') + const ctx = await analytics.track('event2') // wait a bit for retries - timeout is only 1 ms await new Promise((resolve) => setTimeout(resolve, 100)) - expect(ctx1.attempts).toBe(1) - expect(analytics.queue.queue.getAttempts(ctx1)).toBe(1) + expect(ctx.attempts).toBe(1) + expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) expect(fetch).toHaveBeenCalledTimes(2) }) @@ -123,12 +123,12 @@ describe('Batches retry 500s and 429', () => { }) ) - const ctx1 = await analytics.track('event1') - const ctx2 = await analytics.track('event2') + await analytics.track('event1') + const ctx = await analytics.track('event2') await new Promise((resolve) => setTimeout(resolve, 100)) - expect(ctx1.attempts).toBe(1) + expect(ctx.attempts).toBe(1) expect(fetch).toHaveBeenCalledTimes(1) expect(fetch).toHaveBeenCalledTimes(1) From f4d089f38bbe6d7d097e5dffe67ec006a48616c3 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 2 Aug 2024 10:38:40 -0400 Subject: [PATCH 10/13] Adding changeset --- .changeset/flat-dryers-wink.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/flat-dryers-wink.md diff --git a/.changeset/flat-dryers-wink.md b/.changeset/flat-dryers-wink.md new file mode 100644 index 000000000..940d3b178 --- /dev/null +++ b/.changeset/flat-dryers-wink.md @@ -0,0 +1,6 @@ +--- +'@segment/analytics-next': minor +'@segment/analytics-core': minor +--- + +Adding support for 429 response from the server From 8b2eca9e8d792670d589930e4820bf1db7ec5d5e Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 12 Sep 2024 13:13:53 -0400 Subject: [PATCH 11/13] PR fixes and attempting to fully test retries and retry count reporting --- .../segmentio/__tests__/retries.test.ts | 43 +++++++++++-------- .../plugins/segmentio/batched-dispatcher.ts | 31 +++++++++---- .../browser/src/plugins/segmentio/index.ts | 2 +- .../src/plugins/segmentio/normalize.ts | 6 +-- packages/core/src/priority-queue/index.ts | 5 ++- 5 files changed, 55 insertions(+), 32 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 7cf091356..108399be1 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -15,7 +15,7 @@ import * as PQ from '../../../lib/priority-queue' import { Context } from '../../../core/context' import { createError, createSuccess } from '../../../test-helpers/factories' -jest.mock('../schedule-flush') +//jest.mock('../schedule-flush') type QueueType = 'priority' | 'persisted' @@ -24,6 +24,7 @@ describe('Segment.io retries 500s and 429', () => { let analytics: Analytics let segment: Plugin beforeEach(async () => { + jest.useRealTimers() jest.resetAllMocks() jest.restoreAllMocks() @@ -39,20 +40,25 @@ describe('Segment.io retries 500s and 429', () => { }) test('retries on 500', async () => { - fetch.mockImplementation(() => createError({ status: 500 })) - + jest.useFakeTimers({ advanceTimers: true }) + fetch.mockReturnValue(createError({ status: 500 })) + // .mockReturnValue(createSuccess({})) const ctx = await analytics.track('event') + jest.runAllTimers() + jest.runAllTimers() + jest.runAllTimers() - expect(ctx.attempts).toBe(1) + expect(ctx.attempts).toBe(3) expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) - expect(fetch).toHaveBeenCalledTimes(1) - expect(scheduleFlush).toHaveBeenCalled() + expect(fetch).toHaveBeenCalledTimes(2) + expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2') }) test('delays retry on 429', async () => { const headers = new Headers() const resetTime = 1 headers.set('x-ratelimit-reset', resetTime.toString()) + jest.useFakeTimers({ advanceTimers: true }) fetch .mockReturnValueOnce( createError({ @@ -64,9 +70,11 @@ describe('Segment.io retries 500s and 429', () => { .mockReturnValue(createSuccess({})) const ctx = await analytics.track('event') - expect(ctx.attempts).toBe(1) - expect(fetch).toHaveBeenCalledTimes(1) - expect(scheduleFlush).toHaveBeenCalled() + + jest.runAllTimers() + + expect(ctx.attempts).toBe(3) + expect(fetch).toHaveBeenCalledTimes(2) }) }) @@ -75,6 +83,7 @@ describe('Batches retry 500s and 429', () => { let analytics: Analytics let segment: Plugin beforeEach(async () => { + jest.useRealTimers() jest.resetAllMocks() jest.restoreAllMocks() @@ -83,7 +92,7 @@ describe('Batches retry 500s and 429', () => { deliveryStrategy: { strategy: 'batching', // timeout is set very low to get consistent behavior out of scheduleflush - config: { size: 3, timeout: 1, retryAttempts: 3 }, + config: { size: 3, timeout: 1, maxRetries: 2 }, }, } analytics = new Analytics( @@ -98,7 +107,7 @@ describe('Batches retry 500s and 429', () => { test('retries on 500', async () => { fetch - .mockReturnValueOnce(() => createError({ status: 500 })) + .mockReturnValueOnce(createError({ status: 500 })) .mockReturnValue(createSuccess({})) await analytics.track('event1') @@ -106,7 +115,7 @@ describe('Batches retry 500s and 429', () => { // wait a bit for retries - timeout is only 1 ms await new Promise((resolve) => setTimeout(resolve, 100)) - expect(ctx.attempts).toBe(1) + expect(ctx.attempts).toBe(2) expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) expect(fetch).toHaveBeenCalledTimes(2) }) @@ -128,16 +137,16 @@ describe('Batches retry 500s and 429', () => { await new Promise((resolve) => setTimeout(resolve, 100)) - expect(ctx.attempts).toBe(1) - expect(fetch).toHaveBeenCalledTimes(1) - + expect(ctx.attempts).toBe(2) expect(fetch).toHaveBeenCalledTimes(1) await new Promise((resolve) => setTimeout(resolve, 1000)) expect(fetch).toHaveBeenCalledTimes(2) await new Promise((resolve) => setTimeout(resolve, 1000)) expect(fetch).toHaveBeenCalledTimes(3) await new Promise((resolve) => setTimeout(resolve, 1000)) - expect(fetch).toHaveBeenCalledTimes(3) // capped at 3 retries + expect(fetch).toHaveBeenCalledTimes(3) // capped at 2 retries (+ intial attempt) + // Check the metadata about retry count + expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2') }) }) @@ -151,6 +160,7 @@ describe('Segment.io retries', () => { ;[false, true].forEach((persistenceIsDisabled) => { describe(`disableClientPersistence: ${persistenceIsDisabled}`, () => { beforeEach(async () => { + jest.useRealTimers() jest.resetAllMocks() jest.restoreAllMocks() @@ -186,7 +196,6 @@ describe('Segment.io retries', () => { } segment = await segmentio(analytics, options, {}) - await analytics.register(segment, envEnrichment) }) diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index cc3b0e494..81d8e933a 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -1,12 +1,15 @@ +import { event } from 'jquery' import { SegmentEvent } from '../../core/events' import { fetch } from '../../lib/fetch' import { onPageChange } from '../../lib/on-page-change' +import { SegmentFacade } from '../../lib/to-facade' import { RateLimitError } from './ratelimit-error' +import { Context } from '../../core/context' export type BatchingDispatchConfig = { size?: number timeout?: number - retryAttempts?: number + maxRetries?: number keepalive?: boolean } @@ -65,7 +68,7 @@ export default function batch( const limit = config?.size ?? 10 const timeout = config?.timeout ?? 5000 - let ratelimittimeout = 0 + let rateLimitTimeout = 0 function sendBatch(batch: object[]) { if (batch.length === 0) { @@ -98,7 +101,7 @@ export default function batch( if (res.status === 429) { const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset') const retryTimeoutMS = - retryTimeoutStringSecs != null + typeof retryTimeoutStringSecs == 'string' ? parseInt(retryTimeoutStringSecs) * 1000 : timeout throw new RateLimitError( @@ -114,12 +117,22 @@ export default function batch( const batch = buffer buffer = [] return sendBatch(batch)?.catch((error) => { - console.error('Error sending batch', error) - if (attempt < (config?.retryAttempts ?? 10)) { + const ctx = Context.system() + ctx.log('error', 'Error sending batch', error) + if (attempt <= (config?.maxRetries ?? 10)) { if (error.name === 'RateLimitError') { - ratelimittimeout = error.retryTimeout + rateLimitTimeout = error.retryTimeout } - buffer.push(batch) + buffer.push(...batch) + buffer.map((event) => { + if ('_metadata' in event) { + const segmentEvent = event as ReturnType + segmentEvent._metadata = { + ...segmentEvent._metadata, + retryCount: attempt, + } + } + }) scheduleFlush(attempt + 1) } }) @@ -138,9 +151,9 @@ export default function batch( schedule = undefined flush(attempt).catch(console.error) }, - ratelimittimeout ? ratelimittimeout : timeout + rateLimitTimeout ? rateLimitTimeout : timeout ) - ratelimittimeout = 0 + rateLimitTimeout = 0 } onPageChange((unloaded) => { diff --git a/packages/browser/src/plugins/segmentio/index.ts b/packages/browser/src/plugins/segmentio/index.ts index e48eb46b1..08116b923 100644 --- a/packages/browser/src/plugins/segmentio/index.ts +++ b/packages/browser/src/plugins/segmentio/index.ts @@ -113,7 +113,7 @@ export function segmentio( ) .then(() => ctx) .catch((error) => { - console.error('Error sending event', error) + ctx.log('error', 'Error sending event', error) if (error.name === 'RateLimitError') { const timeout = error.retryTimeout buffer.pushWithBackoff(ctx, timeout) diff --git a/packages/browser/src/plugins/segmentio/normalize.ts b/packages/browser/src/plugins/segmentio/normalize.ts index fa5732dc0..0182e4b40 100644 --- a/packages/browser/src/plugins/segmentio/normalize.ts +++ b/packages/browser/src/plugins/segmentio/normalize.ts @@ -28,13 +28,13 @@ export function normalize( } if (ctx != null) { - const retryCount = analytics.queue.queue.getAttempts(ctx) - if (retryCount > 1) { + if (ctx.attempts > 1) { json._metadata = { ...json._metadata, - retryCount, + retryCount: ctx.attempts, } } + ctx.attempts++ } const bundled: string[] = [] diff --git a/packages/core/src/priority-queue/index.ts b/packages/core/src/priority-queue/index.ts index d4a436ef8..272c67cba 100644 --- a/packages/core/src/priority-queue/index.ts +++ b/packages/core/src/priority-queue/index.ts @@ -47,7 +47,8 @@ export class PriorityQueue extends Emitter { } pushWithBackoff(item: Item, minTimeout = 0): boolean { - if (this.getAttempts(item) === 0) { + // One immediate retry unless we have a minimum timeout (e.g. for rate limiting) + if (minTimeout == 0 && this.getAttempts(item) === 0) { return this.push(item)[0] } @@ -58,7 +59,7 @@ export class PriorityQueue extends Emitter { } let timeout = backoff({ attempt: attempt - 1 }) - if (timeout < minTimeout) { + if (minTimeout > 0 && timeout < minTimeout) { timeout = minTimeout } From 6fd593cbc868c50524c41e39ab95595eef6d066c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 12 Sep 2024 19:09:08 -0400 Subject: [PATCH 12/13] Cleaning up tests --- .../segmentio/__tests__/retries.test.ts | 26 +++++++------------ .../plugins/segmentio/batched-dispatcher.ts | 1 - 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 108399be1..507ed9939 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -45,20 +45,16 @@ describe('Segment.io retries 500s and 429', () => { // .mockReturnValue(createSuccess({})) const ctx = await analytics.track('event') jest.runAllTimers() - jest.runAllTimers() - jest.runAllTimers() - expect(ctx.attempts).toBe(3) - expect(analytics.queue.queue.getAttempts(ctx)).toBe(1) - expect(fetch).toHaveBeenCalledTimes(2) - expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2') + expect(ctx.attempts).toBeGreaterThanOrEqual(3) // Gets incremented after use + expect(fetch.mock.calls.length).toBeGreaterThanOrEqual(2) + expect(fetch.mock.lastCall[1].body).toContain('"retryCount":') }) - test('delays retry on 429', async () => { + test.only('delays retry on 429', async () => { const headers = new Headers() - const resetTime = 1 + const resetTime = 1234 headers.set('x-ratelimit-reset', resetTime.toString()) - jest.useFakeTimers({ advanceTimers: true }) fetch .mockReturnValueOnce( createError({ @@ -68,13 +64,9 @@ describe('Segment.io retries 500s and 429', () => { }) ) .mockReturnValue(createSuccess({})) - - const ctx = await analytics.track('event') - - jest.runAllTimers() - - expect(ctx.attempts).toBe(3) - expect(fetch).toHaveBeenCalledTimes(2) + const spy = jest.spyOn(PQ.PriorityQueue.prototype, 'pushWithBackoff') + await analytics.track('event') + expect(spy).toHaveBeenLastCalledWith(expect.anything(), resetTime * 1000) }) }) @@ -166,6 +158,8 @@ describe('Segment.io retries', () => { // @ts-expect-error reassign import isOffline = jest.fn().mockImplementation(() => true) + // @ts-expect-error reassign import + scheduleFlush = jest.fn().mockImplementation(() => {}) options = { apiKey: 'foo' } analytics = new Analytics( diff --git a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts index 81d8e933a..6ab8b8769 100644 --- a/packages/browser/src/plugins/segmentio/batched-dispatcher.ts +++ b/packages/browser/src/plugins/segmentio/batched-dispatcher.ts @@ -1,4 +1,3 @@ -import { event } from 'jquery' import { SegmentEvent } from '../../core/events' import { fetch } from '../../lib/fetch' import { onPageChange } from '../../lib/on-page-change' From a6352bb4ba1d87d7e7ccf86fcd389b66c34ced58 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 12 Sep 2024 20:23:45 -0400 Subject: [PATCH 13/13] Left in a .only that doesn't belong --- .../browser/src/plugins/segmentio/__tests__/retries.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts index 507ed9939..a3ed608f0 100644 --- a/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts +++ b/packages/browser/src/plugins/segmentio/__tests__/retries.test.ts @@ -51,7 +51,7 @@ describe('Segment.io retries 500s and 429', () => { expect(fetch.mock.lastCall[1].body).toContain('"retryCount":') }) - test.only('delays retry on 429', async () => { + test('delays retry on 429', async () => { const headers = new Headers() const resetTime = 1234 headers.set('x-ratelimit-reset', resetTime.toString())