Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJS Retry improvements for 500 and 429, normal and batch #1084

Merged
merged 17 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/flat-dryers-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@segment/analytics-next': minor
'@segment/analytics-core': minor
---

Adding support for 429 response from the server
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ jest.mock('unfetch', () => {
return fetch
})

import { createSuccess } from '../../../test-helpers/factories'
import batch from '../batched-dispatcher'

const fatEvent = {
Expand Down Expand Up @@ -52,6 +53,7 @@ describe('Batching', () => {
jest.useFakeTimers({
now: new Date('9 Jun 1993 00:00:00Z').getTime(),
})
fetch.mockReturnValue(createSuccess({}))
})

afterEach(() => {
Expand Down
135 changes: 133 additions & 2 deletions packages/browser/src/plugins/segmentio/__tests__/retries.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
const fetch = jest.fn()
jest.mock('unfetch', () => {
return fetch
})

import { segmentio, SegmentioSettings } from '..'
import { Analytics } from '../../../core/analytics'
// @ts-ignore isOffline mocked dependency is accused as unused
Expand All @@ -8,11 +13,135 @@ import { scheduleFlush } from '../schedule-flush'
import * as PPQ from '../../../lib/priority-queue/persisted'
import * as PQ from '../../../lib/priority-queue'
import { Context } from '../../../core/context'
import { createError, createSuccess } from '../../../test-helpers/factories'

jest.mock('../schedule-flush')
//jest.mock('../schedule-flush')

type QueueType = 'priority' | 'persisted'

describe('Segment.io retries 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

options = { apiKey: 'foo' }
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
})

test('retries on 500', async () => {
jest.useFakeTimers({ advanceTimers: true })
fetch.mockReturnValue(createError({ status: 500 }))
// .mockReturnValue(createSuccess({}))
const ctx = await analytics.track('event')
jest.runAllTimers()

expect(ctx.attempts).toBeGreaterThanOrEqual(3) // Gets incremented after use
expect(fetch.mock.calls.length).toBeGreaterThanOrEqual(2)
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":')
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1234
headers.set('x-ratelimit-reset', resetTime.toString())
fetch
.mockReturnValueOnce(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)
.mockReturnValue(createSuccess({}))
const spy = jest.spyOn(PQ.PriorityQueue.prototype, 'pushWithBackoff')
await analytics.track('event')
expect(spy).toHaveBeenLastCalledWith(expect.anything(), resetTime * 1000)
})
})

describe('Batches retry 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

options = {
apiKey: 'foo',
deliveryStrategy: {
strategy: 'batching',
// timeout is set very low to get consistent behavior out of scheduleflush
config: { size: 3, timeout: 1, maxRetries: 2 },
},
}
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
})

test('retries on 500', async () => {
fetch
.mockReturnValueOnce(createError({ status: 500 }))
.mockReturnValue(createSuccess({}))

await analytics.track('event1')
const ctx = await analytics.track('event2')
// wait a bit for retries - timeout is only 1 ms
await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx.attempts).toBe(2)
expect(analytics.queue.queue.getAttempts(ctx)).toBe(1)
expect(fetch).toHaveBeenCalledTimes(2)
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1
headers.set('x-ratelimit-reset', resetTime.toString())
fetch.mockReturnValue(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)

await analytics.track('event1')
const ctx = await analytics.track('event2')

await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx.attempts).toBe(2)
expect(fetch).toHaveBeenCalledTimes(1)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(2)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3) // capped at 2 retries (+ intial attempt)
// Check the metadata about retry count
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2')
})
})

describe('Segment.io retries', () => {
let options: SegmentioSettings
let analytics: Analytics
Expand All @@ -23,11 +152,14 @@ describe('Segment.io retries', () => {
;[false, true].forEach((persistenceIsDisabled) => {
describe(`disableClientPersistence: ${persistenceIsDisabled}`, () => {
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

// @ts-expect-error reassign import
isOffline = jest.fn().mockImplementation(() => true)
// @ts-expect-error reassign import
scheduleFlush = jest.fn().mockImplementation(() => {})

options = { apiKey: 'foo' }
analytics = new Analytics(
Expand Down Expand Up @@ -58,7 +190,6 @@ describe('Segment.io retries', () => {
}

segment = await segmentio(analytics, options, {})

await analytics.register(segment, envEnrichment)
})

Expand Down
57 changes: 50 additions & 7 deletions packages/browser/src/plugins/segmentio/batched-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { SegmentEvent } from '../../core/events'
import { fetch } from '../../lib/fetch'
import { onPageChange } from '../../lib/on-page-change'
import { SegmentFacade } from '../../lib/to-facade'
import { RateLimitError } from './ratelimit-error'
import { Context } from '../../core/context'

export type BatchingDispatchConfig = {
size?: number
timeout?: number
maxRetries?: number
keepalive?: boolean
}

Expand Down Expand Up @@ -63,6 +67,7 @@ export default function batch(

const limit = config?.size ?? 10
const timeout = config?.timeout ?? 5000
let rateLimitTimeout = 0

function sendBatch(batch: object[]) {
if (batch.length === 0) {
Expand All @@ -88,28 +93,66 @@ export default function batch(
batch: updatedBatch,
sentAt: new Date().toISOString(),
}),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status === 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS =
typeof retryTimeoutStringSecs == 'string'
? parseInt(retryTimeoutStringSecs) * 1000
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
: timeout
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

async function flush(): Promise<unknown> {
async function flush(attempt = 1): Promise<unknown> {
if (buffer.length) {
const batch = buffer
buffer = []
return sendBatch(batch)
return sendBatch(batch)?.catch((error) => {
const ctx = Context.system()
ctx.log('error', 'Error sending batch', error)
if (attempt <= (config?.maxRetries ?? 10)) {
if (error.name === 'RateLimitError') {
rateLimitTimeout = error.retryTimeout
}
buffer.push(...batch)
buffer.map((event) => {
if ('_metadata' in event) {
const segmentEvent = event as ReturnType<SegmentFacade['json']>
segmentEvent._metadata = {
...segmentEvent._metadata,
retryCount: attempt,
}
}
})
scheduleFlush(attempt + 1)
}
})
}
}

let schedule: NodeJS.Timeout | undefined

function scheduleFlush(): void {
function scheduleFlush(attempt = 1): void {
if (schedule) {
return
}

schedule = setTimeout(() => {
schedule = undefined
flush().catch(console.error)
}, timeout)
schedule = setTimeout(
() => {
schedule = undefined
flush(attempt).catch(console.error)
},
rateLimitTimeout ? rateLimitTimeout : timeout
)
rateLimitTimeout = 0
}

onPageChange((unloaded) => {
Expand Down
15 changes: 15 additions & 0 deletions packages/browser/src/plugins/segmentio/fetch-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fetch } from '../../lib/fetch'
import { RateLimitError } from './ratelimit-error'

export type Dispatcher = (url: string, body: object) => Promise<unknown>

Expand All @@ -15,6 +16,20 @@ export default function (config?: StandardDispatcherConfig): {
headers: { 'Content-Type': 'text/plain' },
method: 'post',
body: JSON.stringify(body),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status === 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS = retryTimeoutStringSecs
? parseInt(retryTimeoutStringSecs) * 1000
: 5000
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

Expand Down
12 changes: 9 additions & 3 deletions packages/browser/src/plugins/segmentio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ export function segmentio(
return client
.dispatch(
`${remote}/${path}`,
normalize(analytics, json, settings, integrations)
normalize(analytics, json, settings, integrations, ctx)
)
.then(() => ctx)
.catch(() => {
buffer.pushWithBackoff(ctx)
.catch((error) => {
ctx.log('error', 'Error sending event', error)
if (error.name === 'RateLimitError') {
const timeout = error.retryTimeout
buffer.pushWithBackoff(ctx, timeout)
} else {
buffer.pushWithBackoff(ctx)
}
// eslint-disable-next-line @typescript-eslint/no-use-before-define
scheduleFlush(flushing, buffer, segmentio, scheduleFlush)
return ctx
Expand Down
14 changes: 13 additions & 1 deletion packages/browser/src/plugins/segmentio/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Analytics } from '../../core/analytics'
import { CDNSettings } from '../../browser'
import { SegmentFacade } from '../../lib/to-facade'
import { SegmentioSettings } from './index'
import { Context } from '../../core/context'

export function normalize(
analytics: Analytics,
json: ReturnType<SegmentFacade['json']>,
settings?: SegmentioSettings,
integrations?: CDNSettings['integrations']
integrations?: CDNSettings['integrations'],
ctx?: Context
): object {
const user = analytics.user()

Expand All @@ -25,6 +27,16 @@ export function normalize(
json._metadata = { failedInitializations: failed }
}

if (ctx != null) {
if (ctx.attempts > 1) {
json._metadata = {
...json._metadata,
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
retryCount: ctx.attempts,
}
}
ctx.attempts++
}

const bundled: string[] = []
const unbundled: string[] = []

Expand Down
9 changes: 9 additions & 0 deletions packages/browser/src/plugins/segmentio/ratelimit-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class RateLimitError extends Error {
retryTimeout: number

constructor(message: string, retryTimeout: number) {
super(message)
this.retryTimeout = retryTimeout
this.name = 'RateLimitError'
}
}
Loading
Loading