diff --git a/packages/node/package.json b/packages/node/package.json index ba21872..96a8931 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -32,6 +32,11 @@ "dependencies": { "@amplitude/analytics-node": "^1.3.4", "@amplitude/analytics-types": "^1.3.1", - "@amplitude/experiment-core": "^0.7.2" + "@amplitude/experiment-core": "^0.7.2", + "eventsource": "^2.0.2" + }, + "devDependencies": { + "@types/eventsource": "^1.1.15", + "@types/node": "18.7.23" } -} +} \ No newline at end of file diff --git a/packages/node/src/local/client.ts b/packages/node/src/local/client.ts index 363e15d..d674168 100644 --- a/packages/node/src/local/client.ts +++ b/packages/node/src/local/client.ts @@ -2,8 +2,10 @@ import * as amplitude from '@amplitude/analytics-node'; import { EvaluationEngine, EvaluationFlag, + StreamEventSourceClass, topologicalSort, } from '@amplitude/experiment-core'; +import EventSource from 'eventsource'; import { Assignment, AssignmentService } from '../assignment/assignment'; import { InMemoryAssignmentFilter } from '../assignment/assignment-filter'; @@ -30,6 +32,8 @@ import { import { InMemoryFlagConfigCache } from './cache'; import { FlagConfigFetcher } from './fetcher'; import { FlagConfigPoller } from './poller'; +import { FlagConfigStreamer } from './streamer'; +import { FlagConfigUpdater } from './updater'; /** * Experiment client for evaluating variants for a user locally. @@ -38,7 +42,7 @@ import { FlagConfigPoller } from './poller'; export class LocalEvaluationClient { private readonly logger: Logger; private readonly config: LocalEvaluationConfig; - private readonly poller: FlagConfigPoller; + private readonly updater: FlagConfigUpdater; private readonly assignmentService: AssignmentService; private readonly evaluation: EvaluationEngine; @@ -54,6 +58,7 @@ export class LocalEvaluationClient { config: LocalEvaluationConfig, flagConfigCache?: FlagConfigCache, httpClient: HttpClient = new FetchHttpClient(config?.httpAgent), + streamEventSourceClass: StreamEventSourceClass = EventSource, ) { this.config = { ...LocalEvaluationDefaults, ...config }; const fetcher = new FlagConfigFetcher( @@ -67,12 +72,27 @@ export class LocalEvaluationClient { this.config.bootstrap, ); this.logger = new ConsoleLogger(this.config.debug); - this.poller = new FlagConfigPoller( - fetcher, - this.cache, - this.config.flagConfigPollingIntervalMillis, - this.config.debug, - ); + this.updater = this.config.getFlagConfigUpdateWithStream + ? new FlagConfigStreamer( + apiKey, + fetcher, + this.cache, + streamEventSourceClass, + this.config.flagConfigPollingIntervalMillis, + this.config.streamConnTimeoutMillis, + this.config.streamFlagConnTimeoutMillis, + this.config.streamFlagTryAttempts, + this.config.streamFlagTryDelayMillis, + this.config.retryStreamFlagDelayMillis, + this.config.streamServerUrl, + this.config.debug, + ) + : new FlagConfigPoller( + fetcher, + this.cache, + this.config.flagConfigPollingIntervalMillis, + this.config.debug, + ); if (this.config.assignmentConfig) { this.config.assignmentConfig = { ...AssignmentConfigDefaults, @@ -158,7 +178,7 @@ export class LocalEvaluationClient { * Calling this function while the poller is already running does nothing. */ public async start(): Promise { - return await this.poller.start(); + return await this.updater.start(); } /** @@ -167,6 +187,6 @@ export class LocalEvaluationClient { * Calling this function while the poller is not running will do nothing. */ public stop(): void { - return this.poller.stop(); + return this.updater.stop(); } } diff --git a/packages/node/src/local/poller.ts b/packages/node/src/local/poller.ts index 80b066d..cb2463c 100644 --- a/packages/node/src/local/poller.ts +++ b/packages/node/src/local/poller.ts @@ -5,6 +5,7 @@ import { ConsoleLogger } from '../util/logger'; import { Logger } from '../util/logger'; import { FlagConfigFetcher } from './fetcher'; +import { FlagConfigUpdater } from './updater'; const BACKOFF_POLICY: BackoffPolicy = { attempts: 5, @@ -13,7 +14,7 @@ const BACKOFF_POLICY: BackoffPolicy = { scalar: 1, }; -export class FlagConfigPoller { +export class FlagConfigPoller implements FlagConfigUpdater { private readonly logger: Logger; private readonly pollingIntervalMillis: number; diff --git a/packages/node/src/local/streamer.ts b/packages/node/src/local/streamer.ts new file mode 100644 index 0000000..b535fc4 --- /dev/null +++ b/packages/node/src/local/streamer.ts @@ -0,0 +1,169 @@ +import { + SdkStreamFlagApi, + StreamErrorEvent, + StreamEventSourceClass, +} from '@amplitude/experiment-core'; + +import { version as PACKAGE_VERSION } from '../../gen/version'; +import { LocalEvaluationDefaults } from '../types/config'; +import { FlagConfigCache } from '../types/flag'; +import { ConsoleLogger } from '../util/logger'; +import { Logger } from '../util/logger'; + +import { FlagConfigFetcher } from './fetcher'; +import { FlagConfigPoller } from './poller'; + +export class FlagConfigStreamer { + private readonly logger: Logger; + + private readonly poller: FlagConfigPoller; + private readonly stream: SdkStreamFlagApi; + private readonly retryStreamFlagDelayMillis: number; + + private streamRetryTimeout?: NodeJS.Timeout; + + public readonly cache: FlagConfigCache; + + constructor( + apiKey: string, + fetcher: FlagConfigFetcher, + cache: FlagConfigCache, + streamEventSourceClass: StreamEventSourceClass, + pollingIntervalMillis = LocalEvaluationDefaults.flagConfigPollingIntervalMillis, + streamConnTimeoutMillis = LocalEvaluationDefaults.streamConnTimeoutMillis, + streamFlagConnTimeoutMillis = LocalEvaluationDefaults.streamFlagConnTimeoutMillis, + streamFlagTryAttempts = LocalEvaluationDefaults.streamFlagTryAttempts, + streamFlagTryDelayMillis = LocalEvaluationDefaults.streamFlagTryDelayMillis, + retryStreamFlagDelayMillis = LocalEvaluationDefaults.retryStreamFlagDelayMillis, + serverUrl: string = LocalEvaluationDefaults.serverUrl, + debug = false, + ) { + this.logger = new ConsoleLogger(debug); + this.logger.debug('[Experiment] streamer - init'); + this.cache = cache; + this.poller = new FlagConfigPoller( + fetcher, + cache, + pollingIntervalMillis, + debug, + ); + this.stream = new SdkStreamFlagApi( + apiKey, + serverUrl, + streamEventSourceClass, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + ); + this.retryStreamFlagDelayMillis = retryStreamFlagDelayMillis; + } + + /** + * Fetch initial flag configurations and start polling for updates. + * + * You must call this function to begin polling for flag config updates. + * The promise returned by this function is resolved when the initial call + * to fetch the flag configuration completes. + * + * Calling this function while the poller is already running does nothing. + */ + public async start( + onChange?: (cache: FlagConfigCache) => Promise, + ): Promise { + this.stream.onError = (e) => { + const err = e as StreamErrorEvent; + this.logger.debug( + `[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`, + ); + this.poller.start(onChange); + this.startRetryStreamTimeout(); + }; + + this.stream.onUpdate = async (flagConfigs) => { + this.logger.debug('[Experiment] streamer - receives updates'); + let changed = false; + if (onChange) { + const current = await this.cache.getAll(); + if (!Object.is(current, flagConfigs)) { + changed = true; + } + } + await this.cache.clear(); + await this.cache.putAll(flagConfigs); + if (changed) { + await onChange(this.cache); + } + }; + + try { + // Clear retry timeout. If stream isn't connected, we're trying now. + // If stream is connected, timeout will be undefined and connect will do nothing. + if (this.streamRetryTimeout) { + clearTimeout(this.streamRetryTimeout); + } + // stream connect error will be raised, not through calling onError. + // So onError won't be called. + await this.stream.connect({ + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + }); + this.poller.stop(); + this.logger.debug('[Experiment] streamer - start stream success'); + } catch (e) { + const err = e as StreamErrorEvent; + this.logger.debug( + `[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`, + ); + await this.poller.start(onChange); + this.startRetryStreamTimeout(); + } + } + + /** + * Stop polling for flag configurations. + * + * Calling this function while the poller is not running will do nothing. + */ + public stop(): void { + this.logger.debug('[Experiment] streamer - stop'); + if (this.streamRetryTimeout) { + clearTimeout(this.streamRetryTimeout); + } + this.poller.stop(); + this.stream.close(); + } + + /** + * Force a flag config fetch and cache the update with an optional callback + * which gets called if the flag configs change in any way. + * + * @param onChange optional callback which will get called if the flag configs + * in the cache have changed. + */ + public async update( + onChange?: (cache: FlagConfigCache) => Promise, + ): Promise { + this.poller.update(onChange); + } + + // Retry stream after a while. + private startRetryStreamTimeout() { + if (this.streamRetryTimeout) { + clearTimeout(this.streamRetryTimeout); + } + this.streamRetryTimeout = setTimeout(() => { + this.logger.debug('[Experiment] streamer - retry stream'); + this.stream + .connect() + .then(() => { + this.logger.debug('[Experiment] streamer - retry stream success'); + // Stop poller. + this.poller.stop(); + }) + // No need to set timeout here. onError handles calling startRetryStreamInterval(). + // eslint-disable-next-line @typescript-eslint/no-empty-function + .catch(() => {}); + }, this.retryStreamFlagDelayMillis); + } +} diff --git a/packages/node/src/local/updater.ts b/packages/node/src/local/updater.ts new file mode 100644 index 0000000..3674785 --- /dev/null +++ b/packages/node/src/local/updater.ts @@ -0,0 +1,26 @@ +import { FlagConfigCache } from '..'; + +export interface FlagConfigUpdater { + /** + * Fetch initial flag configurations and start watching for updates. + * + * You must call this function to begin watching for flag config updates. + * The promise returned by this function is resolved when the initial call + * to fetch the flag configuration completes. + */ + start(onChange?: (cache: FlagConfigCache) => Promise): Promise; + + /** + * Stop updating flag configurations. + */ + stop(): void; + + /** + * Force a flag config fetch and cache the update with an optional callback + * which gets called if the flag configs change in any way. + * + * @param onChange optional callback which will get called if the flag configs + * in the cache have changed. + */ + update(onChange?: (cache: FlagConfigCache) => Promise): Promise; +} diff --git a/packages/node/src/types/config.ts b/packages/node/src/types/config.ts index 65b20ad..7581041 100644 --- a/packages/node/src/types/config.ts +++ b/packages/node/src/types/config.ts @@ -148,6 +148,44 @@ export type LocalEvaluationConfig = { * evaluation. */ assignmentConfig?: AssignmentConfig; + + /** + * To use streaming API or polling. With streaming, flag config updates are + * received immediately, no polling is necessary. If stream fails, it will + * fallback to polling automatically. + */ + getFlagConfigUpdateWithStream?: boolean; + + /** + * The stream server endpoint from which to stream data. + */ + streamServerUrl?: string; + + /** + * To use with streaming. The timeout for connecting an server-side event stream. Aka, the timeout for http connection. + */ + streamConnTimeoutMillis?: number; + + /** + * To use with streaming. The timeout for a single attempt of establishing a valid stream of flag configs. + * This includes streamConnTimeoutMillis and time for receiving initial flag configs. + */ + streamFlagConnTimeoutMillis?: number; + + /** + * To use with streaming. The number attempts to connect before declaring streaming fatal error. + */ + streamFlagTryAttempts?: number; + + /** + * To use with streaming. The delay between attempts to connect. + */ + streamFlagTryDelayMillis?: number; + + /** + * To use with streaming. The delay to retry streaming after stream fatal error and fallbacked to poller. + */ + retryStreamFlagDelayMillis?: number; }; export type AssignmentConfig = { @@ -181,6 +219,13 @@ export const LocalEvaluationDefaults: LocalEvaluationConfig = { bootstrap: {}, flagConfigPollingIntervalMillis: 30000, httpAgent: null, + getFlagConfigUpdateWithStream: false, + streamServerUrl: 'https://stream.lab.amplitude.com', + streamConnTimeoutMillis: 1000, + streamFlagConnTimeoutMillis: 1000, + streamFlagTryAttempts: 2, + streamFlagTryDelayMillis: 1000, + retryStreamFlagDelayMillis: 15000, }; export const AssignmentConfigDefaults: Omit = { diff --git a/packages/node/test/local/flagConfigStreamer.test.ts b/packages/node/test/local/flagConfigStreamer.test.ts new file mode 100644 index 0000000..228552e --- /dev/null +++ b/packages/node/test/local/flagConfigStreamer.test.ts @@ -0,0 +1,880 @@ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ +import assert from 'assert'; + +import { InMemoryFlagConfigCache } from 'src/index'; +import { FlagConfigFetcher } from 'src/local/fetcher'; +import { FlagConfigStreamer } from 'src/local/streamer'; + +import { MockHttpClient } from './util/mockHttpClient'; +import { getNewClient } from './util/mockStreamEventSource'; + +const apiKey = 'client-xxxx'; +const serverUrl = 'http://localhostxxxx:799999999'; +const streamConnTimeoutMillis = 1000; +const streamFlagConnTimeoutMillis = 1000; +const streamFlagTryAttempts = 2; +const streamFlagTryDelayMillis = 1000; +const retryStreamFlagDelayMillis = 15000; + +// Following values may not be used in all tests. +const pollingIntervalMillis = 1000; +const fetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + return { status: 500, body: undefined }; + }), +); +const cache = new InMemoryFlagConfigCache(); + +test('FlagConfigUpdater.connect, success', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + assert(mockClient.numCreated == 1); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start success, gets initial flag configs, gets subsequent flag configs', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const cache = new InMemoryFlagConfigCache(); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: '[{"key": "a", "variants": {}, "segments": []}]', + }); + assert(fetchCalls == 0); + assert(mockClient.numCreated == 1); + await new Promise((r) => setTimeout(r, 200)); + assert((await cache.get('a')).key == 'a'); + + await mockClient.client!.doMsg({ + data: '[{"key": "b", "variants": {}, "segments": []}]', + }); + await new Promise((r) => setTimeout(r, 200)); + assert((await cache.get('b')).key == 'b'); + assert((await cache.get('a')) == undefined); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, stream start fail, fallback to poller, poller updates flag configs correctly', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + let dataI = 0; + const data = [ + '[{"key": "a", "variants": {}, "segments": []}]', + '[{"key": "b", "variants": {}, "segments": []}]', + ]; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: data[dataI] }; + }), + ); + const cache = new InMemoryFlagConfigCache(); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doErr({ status: 501 }); // Send 501 fatal err to fallback to poller. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchCalls >= 1); + assert(mockClient.numCreated == 1); + assert((await cache.get('a')).key == 'a'); + + dataI++; + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert((await cache.get('b')).key == 'b'); + assert((await cache.get('a')) == undefined); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start success, gets error initial flag configs, fallback to poller', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: 'xxx', + }); // Initial error flag configs for first try. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: '[{"key: aaa}]', + }); // Another error flag configs for second try. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + // Should fallbacked to poller. + assert(fetchCalls > 0); + assert(mockClient.numCreated == 2); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start success, gets ok initial flag configs, but gets error flag configs later, fallback to poller', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const cache = new InMemoryFlagConfigCache(); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: '[{"key": "a", "variants": {}, "segments": []}]', + }); // Initial flag configs are fine. + await new Promise((r) => setTimeout(r, 200)); + assert(fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Start error ones. + await mockClient.client!.doMsg({ + data: 'hahaha', + }); // An error flag configs to start retry. + await new Promise((r) => setTimeout(r, 500)); + + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: 'xxx', + }); // Error flag configs for first retry. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ + data: '[{"key: aaa}]', + }); // Error flag configs for second retry. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + assert(fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, open but no initial flag configs', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await new Promise((r) => setTimeout(r, 1100)); + await mockClient.client!.doOpen({ type: 'open' }); + await new Promise((r) => setTimeout(r, 2000)); + assert(fetchCalls > 0); + assert(mockClient.numCreated == 2); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, success and then fails and then reconnects', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + await mockClient.client!.doErr({ status: 500 }); + await new Promise((r) => setTimeout(r, 500)); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + assert(mockClient.numCreated == 2); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, timeout first try, retry success', async () => { + const mockClient = getNewClient(); + const updater = new FlagConfigStreamer( + apiKey, + fetcher, + cache, + mockClient.clientClass, + pollingIntervalMillis, + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await new Promise((r) => setTimeout(r, 2200)); // Wait at least 2 secs, at most 3 secs for first try timeout. + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(mockClient.numCreated == 2); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, retry timeout, backoff to poll after 2 tries', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + await updater.start(); // Awaits start(), no data sent. + assert(fetchCalls >= 1); + assert(mockClient.numCreated == 2); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, 501, backoff to poll after 1 try', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doErr({ status: 501 }); // Send 501 fatal err. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchCalls >= 1); + assert(mockClient.numCreated == 1); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, 404, backoff to poll after 2 tries', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doErr({ status: 404 }); // Send error for first try. + await new Promise((r) => setTimeout(r, 1100)); // Wait for poller to poll. + await mockClient.client!.doErr({ status: 404 }); // Send error for second try. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchCalls >= 1); + assert(mockClient.numCreated == 2); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, two starts, second does nothing', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + await new Promise((r) => setTimeout(r, 2500)); // Wait for stream to init success. + assert(fetchCalls == 0); + assert(mockClient.numCreated == 1); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start and immediately stop does not retry', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 100, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + updater.stop(); + await new Promise((r) => setTimeout(r, 1000)); + assert(fetchCalls == 0); + assert(mockClient.numCreated == 1); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, test error after connection, poller starts, stream retry success, poller stops', async () => { + jest.setTimeout(25000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + // Test error after normal close. + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + let n = mockClient.numCreated; + assert(n == 1); + // Pass errors to stop first stream. + await mockClient.client!.doErr({ status: 500 }); + await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. + await mockClient.client!.doErr({ status: 500 }); // Pass errors to make first retry fail. + n = mockClient.numCreated; + assert(n == 2); + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. + await mockClient.client!.doErr({ status: 500 }); // Pass error to make second retry fail. + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + // No stop() here. The streamRetryTimeout will still be running. + assert(fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + // Check retry. + await new Promise((r) => setTimeout(r, retryStreamFlagDelayMillis)); // Wait for retry. + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + n = mockClient.numCreated; + assert(n == 4); + // Check poller stop. + const prevFetchCalls = fetchCalls; + await new Promise((r) => setTimeout(r, 500)); // Wait to see if poller runs while waiting. + assert(fetchCalls == prevFetchCalls); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, test restarts', async () => { + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + await updater.stop(); + + // Test start after normal close. + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 2); + await updater.stop(); + + // Test error after normal close. + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + await mockClient.client!.doErr({ status: 500 }); // Send error to stop current stream. + await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. + await mockClient.client!.doErr({ status: 500 }); // Send error for first retry. + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to timeout and start second try. + await mockClient.client!.doErr({ status: 500 }); // Send error for second retry. + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + assert(fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 5); + // No stop() here. The streamRetryTimeout will still be running. + + // Test normal start after error close. Poller should be stopped. + const prevFetchCalls = fetchCalls; + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + assert(fetchCalls == prevFetchCalls); + n = mockClient.numCreated; + assert(n == 6); + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start success, keep alive success, no fallback to poller', async () => { + jest.setTimeout(20000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive. + await new Promise((r) => setTimeout(r, 15000)); // Wait before keep alive timeouts. + await mockClient.client!.doMsg({ data: ' ' }); + assert(fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 1); + + await new Promise((r) => setTimeout(r, 3000)); // Wait for original keep alive timeout to reach. + assert(fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 1); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigStreamer.connect, start success, keep alive fail, retry success', async () => { + jest.setTimeout(20000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive fail. + await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 2); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start success, keep alive fail, retry fail twice, fallback to poller', async () => { + jest.setTimeout(20000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + retryStreamFlagDelayMillis, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive fail. + await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. + await mockClient.client!.doErr({ status: 500 }); // Send error for first try. + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. + await mockClient.client!.doErr({ status: 500 }); // Send error for second try. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to init. + assert(fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test.todo( + 'FlagConfigUpdater.connect, start and immediately stop and immediately start is an unhandled edge case', +); diff --git a/packages/node/test/local/util/mockStreamEventSource.ts b/packages/node/test/local/util/mockStreamEventSource.ts new file mode 100644 index 0000000..8b3f38f --- /dev/null +++ b/packages/node/test/local/util/mockStreamEventSource.ts @@ -0,0 +1,93 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable @typescript-eslint/no-unused-vars */ +/* eslint-disable @typescript-eslint/no-empty-function */ +import { + StreamEventSource, + StreamOpenEvent, + StreamMessageEvent, + StreamErrorEvent, + StreamEvent, + StreamEventSourceClass, +} from '@amplitude/experiment-core'; + +export interface MockStreamEventSourceClient extends StreamEventSource { + // Methods for test. + doOpen(evt: StreamOpenEvent): Promise; + doMsg(evt: StreamMessageEvent): Promise; + doErr(evt: StreamErrorEvent): Promise; +} + +export function getNewClient(): { + client: MockStreamEventSourceClient | undefined; + numCreated: number; + clientClass: StreamEventSourceClass; +} { + const clientObj = { + client: undefined, + numCreated: 0, + clientClass: undefined, + }; + class AClientClass implements MockStreamEventSourceClient { + static readonly CLOSED: number = 0; + static readonly CONNECTING: number = 1; + static readonly OPEN: number = 2; + + constructor(url: string, initDict: Record) { + clientObj.client = this; + clientObj.numCreated++; + + this.url = url; + this._readyState = AClientClass.CONNECTING; + // this.withCredentials = params['withCredentials']; + } + CLOSED: number = AClientClass.CLOSED; + CONNECTING: number = AClientClass.CONNECTING; + OPEN: number = AClientClass.OPEN; + + // Variables + readonly url: string; + private _readyState: number; + get readyState(): number { + return this._readyState; + } + readonly withCredentials: boolean; + + // Handlers. + onopen: (evt: StreamOpenEvent) => any; + onmessage: (evt: StreamMessageEvent) => any; + onerror: (evt: StreamErrorEvent) => any; + + // Other unused methods. + addEventListener( + type: string, + listener: (evt: StreamEvent) => void, + ): void {} // No implementation. + dispatchEvent(evt: Event): boolean { + return false; + } + removeEventListener( + type: string, + listener: (evt: StreamEvent) => void, + ): void {} // No implementation. + + // Close. + close(): void { + this._readyState = AClientClass.CLOSED; + } + + // Methods for test. + async doOpen(evt: StreamOpenEvent): Promise { + this._readyState = AClientClass.OPEN; + await this.onopen(evt); + } + async doMsg(evt: StreamMessageEvent): Promise { + await this.onmessage(evt); + } + async doErr(evt: StreamErrorEvent): Promise { + this._readyState = AClientClass.CLOSED; + await this.onerror(evt); + } + } + clientObj.clientClass = AClientClass; + return clientObj; +} diff --git a/yarn.lock b/yarn.lock index a1f030d..b62f6a3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1933,6 +1933,11 @@ dependencies: "@babel/types" "^7.3.0" +"@types/eventsource@^1.1.15": + version "1.1.15" + resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.15.tgz#949383d3482e20557cbecbf3b038368d94b6be27" + integrity sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA== + "@types/graceful-fs@^4.1.2": version "4.1.6" resolved "https://registry.yarnpkg.com/@types/graceful-fs/-/graceful-fs-4.1.6.tgz#e14b2576a1c25026b7f02ede1de3b84c3a1efeae" @@ -1992,6 +1997,11 @@ resolved "https://registry.yarnpkg.com/@types/node/-/node-18.15.11.tgz#b3b790f09cb1696cffcec605de025b088fa4225f" integrity sha512-E5Kwq2n4SbMzQOn6wnmBjuK9ouqlURrcZDVfbo9ftDDTFt3nk7ZKK4GMOzoYgnpQJKcxwQw+lGaBvvlMo0qN/Q== +"@types/node@18.7.23": + version "18.7.23" + resolved "https://registry.yarnpkg.com/@types/node/-/node-18.7.23.tgz#75c580983846181ebe5f4abc40fe9dfb2d65665f" + integrity sha512-DWNcCHolDq0ZKGizjx2DZjR/PqsYwAcYUJmfMWqtVU2MBMG5Mo+xFZrhGId5r/O5HOuMPyQEcM6KUBp5lBZZBg== + "@types/node@^14.11.8": version "14.18.42" resolved "https://registry.yarnpkg.com/@types/node/-/node-14.18.42.tgz#fa39b2dc8e0eba61bdf51c66502f84e23b66e114" @@ -3819,6 +3829,11 @@ events@^3.3.0: resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q== +eventsource@^2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508" + integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA== + exec-sh@^0.3.2: version "0.3.6" resolved "https://registry.yarnpkg.com/exec-sh/-/exec-sh-0.3.6.tgz#ff264f9e325519a60cb5e273692943483cca63bc"