From ffc30a6c242533fc71ad28dc5d838715af87f23f Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Sat, 13 Jul 2024 02:22:47 -0700 Subject: [PATCH] added streamer test, added streamer onInitUpdate, clearer logic --- packages/node/src/local/client.ts | 2 +- packages/node/src/local/poller.ts | 28 +- packages/node/src/local/stream-flag-api.ts | 30 +- packages/node/src/local/streamer.ts | 24 +- packages/node/src/remote/client.ts | 2 +- packages/node/src/util/config.ts | 15 +- .../node/test/local/flagConfigPoller.test.ts | 7 +- .../test/local/flagConfigStreamer.test.ts | 938 +++++++++--------- 8 files changed, 530 insertions(+), 516 deletions(-) diff --git a/packages/node/src/local/client.ts b/packages/node/src/local/client.ts index 7460b82..64de75d 100644 --- a/packages/node/src/local/client.ts +++ b/packages/node/src/local/client.ts @@ -68,7 +68,7 @@ export class LocalEvaluationClient { constructor( apiKey: string, - config: LocalEvaluationConfig, + config?: LocalEvaluationConfig, flagConfigCache?: FlagConfigCache, httpClient: HttpClient = new FetchHttpClient(config?.httpAgent), streamEventSourceFactory: StreamEventSourceFactory = (url, params) => diff --git a/packages/node/src/local/poller.ts b/packages/node/src/local/poller.ts index 6c98dbd..4ca1332 100644 --- a/packages/node/src/local/poller.ts +++ b/packages/node/src/local/poller.ts @@ -2,7 +2,7 @@ import { CohortStorage } from 'src/types/cohort'; import { LocalEvaluationDefaults } from '../types/config'; import { FlagConfigCache } from '../types/flag'; -import { doWithBackoff, BackoffPolicy } from '../util/backoff'; +import { BackoffPolicy, doWithBackoffFailLoudly } from '../util/backoff'; import { CohortFetcher } from './cohort/fetcher'; import { FlagConfigFetcher } from './fetcher'; @@ -61,18 +61,20 @@ export class FlagConfigPoller }, this.pollingIntervalMillis); // Fetch initial flag configs and await the result. - await doWithBackoff(async () => { - try { - const flagConfigs = await this.fetcher.fetch(); - await super._update(flagConfigs, true, onChange); - } catch (e) { - this.logger.error( - '[Experiment] flag config initial poll failed, stopping', - e, - ); - this.stop(); - } - }, BACKOFF_POLICY); + try { + const flagConfigs = await doWithBackoffFailLoudly( + async () => await this.fetcher.fetch(), + BACKOFF_POLICY, + ); + await super._update(flagConfigs, true, onChange); + } catch (e) { + this.logger.error( + '[Experiment] flag config initial poll failed, stopping', + e, + ); + this.stop(); + throw e; + } } } diff --git a/packages/node/src/local/stream-flag-api.ts b/packages/node/src/local/stream-flag-api.ts index d28add3..9e37ae8 100644 --- a/packages/node/src/local/stream-flag-api.ts +++ b/packages/node/src/local/stream-flag-api.ts @@ -63,6 +63,8 @@ export class SdkStreamFlagApi implements StreamFlagApi { // Flag for whether the stream is open and retrying or closed. This is to avoid calling connect() twice. private isClosedAndNotTrying = true; + // Callback for updating flag configs. Can be set or changed multiple times and effect immediately. + public onInitUpdate?: StreamFlagOnUpdateCallback; // Callback for updating flag configs. Can be set or changed multiple times and effect immediately. public onUpdate?: StreamFlagOnUpdateCallback; // Callback for notifying user of fatal errors. Can be set or changed multiple times and effect immediately. @@ -115,12 +117,16 @@ export class SdkStreamFlagApi implements StreamFlagApi { return reject(DEFAULT_STREAM_ERR_EVENTS.DATA_UNPARSABLE); } // Update the callbacks. - this.api.onUpdate = (data: string) => this.handleNewMsg(data); + this.api.onUpdate = (data: string) => this.handleNewMsg(data, false); this.api.onError = (err: StreamErrorEvent) => this.errorAndRetry(err); // Handoff data to application. Make sure it finishes processing initial new flag configs. - await this.handleNewMsg(data); - // Resolve promise which declares client ready. - resolve(); + try { + await this.handleNewMsg(data, true); + // Resolve promise which declares client ready. + resolve(); + } catch { + reject(); + } }; this.api.onUpdate = dealWithFlagUpdateInOneTry; @@ -230,7 +236,7 @@ export class SdkStreamFlagApi implements StreamFlagApi { } // Handles new messages, parse them, and handoff to application. Retries if have parsing error. - private async handleNewMsg(data: string) { + private async handleNewMsg(data: string, isInit: boolean) { let flagConfigs; try { flagConfigs = SdkStreamFlagApi.parseFlagConfigs(data); @@ -239,11 +245,17 @@ export class SdkStreamFlagApi implements StreamFlagApi { return; } // Put update outside try catch. onUpdate error doesn't mean stream error. - if (this.onUpdate) { + const updateFunc = + isInit && this.onInitUpdate ? this.onInitUpdate : this.onUpdate; + if (updateFunc) { try { - await this.onUpdate(flagConfigs); - // eslint-disable-next-line no-empty - } catch {} // Don't care about application errors after handoff. + await updateFunc(flagConfigs); + } catch (e) { + // Only care about application errors after handoff if initing. Ensure init is success. + if (isInit) { + throw e; + } + } } } diff --git a/packages/node/src/local/streamer.ts b/packages/node/src/local/streamer.ts index eca4d77..d9eaecf 100644 --- a/packages/node/src/local/streamer.ts +++ b/packages/node/src/local/streamer.ts @@ -67,27 +67,19 @@ export class FlagConfigStreamer this.stream.onError = (e) => { const err = e as StreamErrorEvent; this.logger.debug( - `[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`, + `[Experiment] streamer - onError, fallback to poller, err status: ${err?.status}, err message: ${err?.message}, err ${err}`, ); this.poller.start(onChange); this.startRetryStreamInterval(); }; - let isInitUpdate = true; + this.stream.onInitUpdate = async (flagConfigs) => { + this.logger.debug('[Experiment] streamer - receives updates'); + await super._update(flagConfigs, true, onChange); + }; this.stream.onUpdate = async (flagConfigs) => { this.logger.debug('[Experiment] streamer - receives updates'); - if (isInitUpdate) { - isInitUpdate = false; - try { - super._update(flagConfigs, true, onChange); - } catch { - // Flag update failed on init, stop, fallback to poller. - await this.poller.start(onChange); - this.startRetryStreamInterval(); - } - } else { - super._update(flagConfigs, false, onChange); - } + await super._update(flagConfigs, false, onChange); }; try { @@ -102,11 +94,11 @@ export class FlagConfigStreamer libraryVersion: PACKAGE_VERSION, }); this.poller.stop(); - this.logger.debug('[Experiment] streamer - start stream success'); + this.logger.debug('[Experiment] streamer - start flags stream success'); } catch (e) { const err = e as StreamErrorEvent; this.logger.debug( - `[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`, + `[Experiment] streamer - start stream failed, fallback to poller, err status: ${err?.status}, err message: ${err?.message}, err ${err}`, ); await this.poller.start(onChange); this.startRetryStreamInterval(); diff --git a/packages/node/src/remote/client.ts b/packages/node/src/remote/client.ts index 5a3ac3f..9a62141 100644 --- a/packages/node/src/remote/client.ts +++ b/packages/node/src/remote/client.ts @@ -32,7 +32,7 @@ export class RemoteEvaluationClient { * @param apiKey The environment API Key * @param config See {@link ExperimentConfig} for config options */ - public constructor(apiKey: string, config: RemoteEvaluationConfig) { + public constructor(apiKey: string, config?: RemoteEvaluationConfig) { this.apiKey = apiKey; this.config = populateRemoteConfigDefaults(config); this.evaluationApi = new SdkEvaluationApi( diff --git a/packages/node/src/util/config.ts b/packages/node/src/util/config.ts index c86ab9f..11c0c47 100644 --- a/packages/node/src/util/config.ts +++ b/packages/node/src/util/config.ts @@ -11,12 +11,12 @@ import { } from '..'; export const populateRemoteConfigDefaults = ( - customConfig: RemoteEvaluationConfig, + customConfig?: RemoteEvaluationConfig, ): RemoteEvaluationConfig => { const config = { ...RemoteEvaluationDefaults, ...customConfig }; const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name; - if (!customConfig.serverUrl) { + if (!customConfig?.serverUrl) { config.serverUrl = isEu ? EU_SERVER_URLS.remote : RemoteEvaluationDefaults.serverUrl; @@ -25,22 +25,25 @@ export const populateRemoteConfigDefaults = ( }; export const populateLocalConfigDefaults = ( - customConfig: LocalEvaluationConfig, + customConfig?: LocalEvaluationConfig, ): LocalEvaluationConfig => { const config = { ...LocalEvaluationDefaults, ...customConfig }; const isEu = config.serverZone.toLowerCase() === EU_SERVER_URLS.name; - if (!customConfig.serverUrl) { + if (!customConfig?.serverUrl) { config.serverUrl = isEu ? EU_SERVER_URLS.flags : LocalEvaluationDefaults.serverUrl; } - if (!customConfig.streamServerUrl) { + if (!customConfig?.streamServerUrl) { config.streamServerUrl = isEu ? EU_SERVER_URLS.stream : LocalEvaluationDefaults.streamServerUrl; } - if (customConfig.cohortConfig && !customConfig.cohortConfig.cohortServerUrl) { + if ( + customConfig?.cohortConfig && + !customConfig?.cohortConfig.cohortServerUrl + ) { config.cohortConfig.cohortServerUrl = isEu ? EU_SERVER_URLS.cohort : CohortConfigDefaults.cohortServerUrl; diff --git a/packages/node/test/local/flagConfigPoller.test.ts b/packages/node/test/local/flagConfigPoller.test.ts index 8821c89..01bbdc2 100644 --- a/packages/node/test/local/flagConfigPoller.test.ts +++ b/packages/node/test/local/flagConfigPoller.test.ts @@ -288,7 +288,12 @@ test('flagConfig poller initial error', async () => { throw new Error(); }); // FLAG should be empty, as cohort failed. Poller should be stopped immediately and test exists cleanly. - await poller.start(); + try { + // Should throw when init failed. + await poller.start(); + fail(); + // eslint-disable-next-line no-empty + } catch {} expect(await poller.cache.getAll()).toStrictEqual({}); }); diff --git a/packages/node/test/local/flagConfigStreamer.test.ts b/packages/node/test/local/flagConfigStreamer.test.ts index c008615..a06176e 100644 --- a/packages/node/test/local/flagConfigStreamer.test.ts +++ b/packages/node/test/local/flagConfigStreamer.test.ts @@ -1,6 +1,7 @@ import assert from 'assert'; import { FlagConfigPoller, InMemoryFlagConfigCache } from 'src/index'; +import { SdkCohortApi } from 'src/local/cohort/cohort-api'; import { CohortFetcher } from 'src/local/cohort/fetcher'; import { InMemoryCohortStorage } from 'src/local/cohort/storage'; import { FlagConfigFetcher } from 'src/local/fetcher'; @@ -9,6 +10,16 @@ import { FlagConfigStreamer } from 'src/local/streamer'; import { MockHttpClient } from './util/mockHttpClient'; import { getNewClient } from './util/mockStreamEventSource'; +const FLAG_WITH_COHORT = `[{"key":"flag2","segments":[{ + "conditions":[[{"op":"set contains any","selector":["context","user","cohort_ids"],"values":["hahahaha2"]}]], + "metadata":{"segmentName": "Segment 1"},"variant": "off" + }],"variants": {}}]`; + +let updater; +afterEach(() => { + updater?.stop(); +}); + const getTestObjs = ({ pollingIntervalMillis = 1000, streamFlagConnTimeoutMillis = 1000, @@ -17,6 +28,10 @@ const getTestObjs = ({ streamFlagRetryDelayMillis = 15000, apiKey = 'client-xxxx', serverUrl = 'http://localhostxxxx:00000000', + fetcherData = [ + '[{"key": "fetcher-a", "variants": {}, "segments": []}]', + '[{"key": "fetcher-b", "variants": {}, "segments": []}]', + ], debug = false, }) => { const fetchObj = { @@ -30,15 +45,11 @@ const getTestObjs = ({ ), }; let dataI = 0; - const data = [ - '[{"key": "fetcher-a", "variants": {}, "segments": []}]', - '[{"key": "fetcher-b", "variants": {}, "segments": []}]', - ]; fetchObj.fetcher = new FlagConfigFetcher( apiKey, new MockHttpClient(async () => { fetchObj.fetchCalls++; - return { status: 200, body: data[dataI] }; + return { status: 200, body: fetcherData[dataI] }; }), ); const fetcherReturnNext = () => { @@ -46,7 +57,7 @@ const getTestObjs = ({ }; const cache = new InMemoryFlagConfigCache(); const mockClient = getNewClient(); - const updater = new FlagConfigStreamer( + updater = new FlagConfigStreamer( apiKey, new FlagConfigPoller( fetchObj.fetcher, @@ -80,325 +91,250 @@ test('FlagConfigUpdater.connect, success', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 1); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 1); + updater.stop(); }); test('FlagConfigUpdater.connect, start success, gets initial flag configs, gets subsequent flag configs', async () => { const { fetchObj, cache, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: '[{"key": "a", "variants": {}, "segments": []}]', - }); - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 1); - await new Promise((r) => setTimeout(r, 200)); - assert((await cache.get('a')).key == 'a'); + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: '[{"key": "a", "variants": {}, "segments": []}]', + }); + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 1); + await new Promise((r) => setTimeout(r, 200)); + assert((await cache.get('a')).key == 'a'); - await mockClient.client.doMsg({ - data: '[{"key": "b", "variants": {}, "segments": []}]', - }); - await new Promise((r) => setTimeout(r, 200)); - assert((await cache.get('b')).key == 'b'); - assert((await cache.get('a')) == undefined); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + await mockClient.client.doMsg({ + data: '[{"key": "b", "variants": {}, "segments": []}]', + }); + await new Promise((r) => setTimeout(r, 200)); + assert((await cache.get('b')).key == 'b'); + assert((await cache.get('a')) == undefined); + + updater.stop(); }); test('FlagConfigUpdater.connect, stream start fail, only 1 attempt, fallback to poller, poller updates flag configs correctly', async () => { const { fetchObj, fetcherReturnNext, cache, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, streamFlagTryAttempts: 1 }); - try { - updater.start(); - await mockClient.client.doErr({ status: 503 }); // Send 503 non fatal to fallback to poller after single attempt. - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert(fetchObj.fetchCalls >= 1); - assert(mockClient.numCreated == 1); - assert((await cache.get('fetcher-a')).key == 'fetcher-a'); - - fetcherReturnNext(); - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert((await cache.get('fetcher-b')).key == 'fetcher-b'); - assert((await cache.get('fetcher-a')) == undefined); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 503 }); // Send 503 non fatal to fallback to poller after single attempt. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchObj.fetchCalls >= 1); + assert(mockClient.numCreated == 1); + assert((await cache.get('fetcher-a')).key == 'fetcher-a'); + + fetcherReturnNext(); + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert((await cache.get('fetcher-b')).key == 'fetcher-b'); + assert((await cache.get('fetcher-a')) == undefined); + + updater.stop(); }); test('FlagConfigUpdater.connect, stream start fail, fallback to poller, poller updates flag configs correctly', async () => { const { fetchObj, fetcherReturnNext, cache, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100 }); - try { - updater.start(); - await mockClient.client.doErr({ status: 501 }); // Send 501 fatal err to fallback to poller. - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert(fetchObj.fetchCalls >= 1); - assert(mockClient.numCreated == 1); - assert((await cache.get('fetcher-a')).key == 'fetcher-a'); - - fetcherReturnNext(); - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert((await cache.get('fetcher-b')).key == 'fetcher-b'); - assert((await cache.get('fetcher-a')) == undefined); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 501 }); // Send 501 fatal err to fallback to poller. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchObj.fetchCalls >= 1); + assert(mockClient.numCreated == 1); + assert((await cache.get('fetcher-a')).key == 'fetcher-a'); + + fetcherReturnNext(); + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert((await cache.get('fetcher-b')).key == 'fetcher-b'); + assert((await cache.get('fetcher-a')) == undefined); + + updater.stop(); }); test('FlagConfigUpdater.connect, start success, gets error initial flag configs, fallback to poller', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: 'xxx', - }); // Initial error flag configs for first try. - await new Promise((r) => setTimeout(r, 1100)); // Wait try delay. - - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: '[{"key: aaa}]', - }); // Another error flag configs for second try. - await new Promise((r) => setTimeout(r, 1100)); // Wait try delay. - - // Should fallbacked to poller. - assert(fetchObj.fetchCalls > 0); - assert(mockClient.numCreated == 2); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: 'xxx', + }); // Initial error flag configs for first try. + await new Promise((r) => setTimeout(r, 1100)); // Wait try delay. + + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: '[{"key: aaa}]', + }); // Another error flag configs for second try. + await new Promise((r) => setTimeout(r, 1100)); // Wait try delay. + + // Should fallbacked to poller. + assert(fetchObj.fetchCalls > 0); + assert(mockClient.numCreated == 2); + + updater.stop(); }); test('FlagConfigUpdater.connect, start success, gets ok initial flag configs, but gets error flag configs later, fallback to poller', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: '[{"key": "a", "variants": {}, "segments": []}]', - }); // Initial flag configs are fine. - await new Promise((r) => setTimeout(r, 200)); - assert(fetchObj.fetchCalls == 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Start error ones. - await mockClient.client.doMsg({ - data: 'hahaha', - }); // An error flag configs to start retry. - await new Promise((r) => setTimeout(r, 500)); - - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: 'xxx', - }); // Error flag configs for first retry. - await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. - - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ - data: '[{"key: aaa}]', - }); // Error flag configs for second retry. - await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. - - assert(fetchObj.fetchCalls > 0); - n = mockClient.numCreated; - assert(n == 3); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: '[{"key": "a", "variants": {}, "segments": []}]', + }); // Initial flag configs are fine. + await new Promise((r) => setTimeout(r, 200)); + assert(fetchObj.fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Start error ones. + await mockClient.client.doMsg({ + data: 'hahaha', + }); // An error flag configs to start retry. + await new Promise((r) => setTimeout(r, 500)); + + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: 'xxx', + }); // Error flag configs for first retry. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: '[{"key: aaa}]', + }); // Error flag configs for second retry. + await new Promise((r) => setTimeout(r, 1000)); // Need to yield quite some time to start retry. + + assert(fetchObj.fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + + updater.stop(); }); test('FlagConfigUpdater.connect, open but no initial flag configs', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await new Promise((r) => setTimeout(r, 1100)); - await mockClient.client.doOpen({ type: 'open' }); - await new Promise((r) => setTimeout(r, 2000)); - assert(fetchObj.fetchCalls > 0); - assert(mockClient.numCreated == 2); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await new Promise((r) => setTimeout(r, 1100)); + await mockClient.client.doOpen({ type: 'open' }); + await new Promise((r) => setTimeout(r, 2000)); + assert(fetchObj.fetchCalls > 0); + assert(mockClient.numCreated == 2); + updater.stop(); }); test('FlagConfigUpdater.connect, success and then fails and then reconnects', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - await mockClient.client.doErr({ status: 500 }); - await new Promise((r) => setTimeout(r, 500)); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 2); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + await mockClient.client.doErr({ status: 500 }); + await new Promise((r) => setTimeout(r, 500)); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 2); + updater.stop(); }); test('FlagConfigUpdater.connect, timeout first try, retry success', async () => { const { mockClient, updater } = getTestObjs({}); - try { - updater.start(); - await new Promise((r) => setTimeout(r, 2200)); // Wait at least 2 secs, at most 3 secs for first try timeout. - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(mockClient.numCreated == 2); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await new Promise((r) => setTimeout(r, 2200)); // Wait at least 2 secs, at most 3 secs for first try timeout. + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(mockClient.numCreated == 2); + updater.stop(); }); test('FlagConfigUpdater.connect, retry timeout, backoff to poll after 2 tries', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - await updater.start(); // Awaits start(), no data sent. - assert(fetchObj.fetchCalls >= 1); - assert(mockClient.numCreated == 2); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + await updater.start(); // Awaits start(), no data sent. + assert(fetchObj.fetchCalls >= 1); + assert(mockClient.numCreated == 2); + updater.stop(); }); test('FlagConfigUpdater.connect, 501, backoff to poll after 1 try', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doErr({ status: 501 }); // Send 501 fatal err. - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert(fetchObj.fetchCalls >= 1); - assert(mockClient.numCreated == 1); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 501 }); // Send 501 fatal err. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchObj.fetchCalls >= 1); + assert(mockClient.numCreated == 1); + updater.stop(); }); test('FlagConfigUpdater.connect, 404, backoff to poll after 2 tries', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await mockClient.client.doErr({ status: 404 }); // Send error for first try. - await new Promise((r) => setTimeout(r, 1100)); // Wait for poller to poll. - await mockClient.client.doErr({ status: 404 }); // Send error for second try. - await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. - assert(fetchObj.fetchCalls >= 1); - assert(mockClient.numCreated == 2); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 404 }); // Send error for first try. + await new Promise((r) => setTimeout(r, 1100)); // Wait for poller to poll. + await mockClient.client.doErr({ status: 404 }); // Send error for second try. + await new Promise((r) => setTimeout(r, 200)); // Wait for poller to poll. + assert(fetchObj.fetchCalls >= 1); + assert(mockClient.numCreated == 2); + updater.stop(); }); test('FlagConfigUpdater.connect, two starts, second does nothing', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - await new Promise((r) => setTimeout(r, 2500)); // Wait for stream to init success. - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 1); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + await new Promise((r) => setTimeout(r, 2500)); // Wait for stream to init success. + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 1); + updater.stop(); }); test('FlagConfigUpdater.connect, start and immediately stop does not retry', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - updater.stop(); - await new Promise((r) => setTimeout(r, 1000)); - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 1); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + updater.stop(); + await new Promise((r) => setTimeout(r, 1000)); + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 1); }); test('FlagConfigUpdater.connect, start fail, retry and immediately stop, no poller start', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 100, }); - try { - updater.start(); - await new Promise((r) => setTimeout(r, 2100)); // Wait for timeout and try delay. - updater.stop(); - assert(fetchObj.fetchCalls == 0); - assert(mockClient.numCreated == 2); - - await new Promise((r) => setTimeout(r, 200)); // Wait to check poller start. - assert(fetchObj.fetchCalls == 0); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await new Promise((r) => setTimeout(r, 2100)); // Wait for timeout and try delay. + updater.stop(); + assert(fetchObj.fetchCalls == 0); + assert(mockClient.numCreated == 2); + + await new Promise((r) => setTimeout(r, 200)); // Wait to check poller start. + assert(fetchObj.fetchCalls == 0); }); test('FlagConfigUpdater.connect, test error after connection, poller starts, stream retry success, poller stops', async () => { @@ -408,94 +344,84 @@ test('FlagConfigUpdater.connect, test error after connection, poller starts, str pollingIntervalMillis: 200, streamFlagRetryDelayMillis, }); - try { - // Test error after normal close. - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - let n = mockClient.numCreated; - assert(n == 1); - // Pass errors to stop first stream. - await mockClient.client.doErr({ status: 500 }); - await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. - await mockClient.client.doErr({ status: 500 }); // Pass errors to make first retry fail. - n = mockClient.numCreated; - assert(n == 2); - await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. - await mockClient.client.doErr({ status: 500 }); // Pass error to make second retry fail. - await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. - // No stop() here. The streamRetryTimeout will still be running. - assert(fetchObj.fetchCalls > 0); - n = mockClient.numCreated; - assert(n == 3); - // Check retry. - await new Promise((r) => setTimeout(r, streamFlagRetryDelayMillis)); // Wait for retry. - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - n = mockClient.numCreated; - assert(n == 4); - // Check poller stop. - const prevFetchCalls = fetchObj.fetchCalls; - await new Promise((r) => setTimeout(r, 500)); // Wait to see if poller runs while waiting. - assert(fetchObj.fetchCalls == prevFetchCalls); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + // Test error after normal close. + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + let n = mockClient.numCreated; + assert(n == 1); + // Pass errors to stop first stream. + await mockClient.client.doErr({ status: 500 }); + await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. + await mockClient.client.doErr({ status: 500 }); // Pass errors to make first retry fail. + n = mockClient.numCreated; + assert(n == 2); + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. + await mockClient.client.doErr({ status: 500 }); // Pass error to make second retry fail. + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + // No stop() here. The streamRetryTimeout will still be running. + assert(fetchObj.fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + // Check retry. + await new Promise((r) => setTimeout(r, streamFlagRetryDelayMillis)); // Wait for retry. + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + n = mockClient.numCreated; + assert(n == 4); + // Check poller stop. + const prevFetchCalls = fetchObj.fetchCalls; + await new Promise((r) => setTimeout(r, 500)); // Wait to see if poller runs while waiting. + assert(fetchObj.fetchCalls == prevFetchCalls); + updater.stop(); }); test('FlagConfigUpdater.connect, test restarts', async () => { const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 200, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - let n = mockClient.numCreated; - assert(n == 1); - updater.stop(); - - // Test start after normal close. - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - n = mockClient.numCreated; - assert(n == 2); - updater.stop(); - - // Test error after normal close. - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - await mockClient.client.doErr({ status: 500 }); // Send error to stop current stream. - await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. - await mockClient.client.doErr({ status: 500 }); // Send error for first retry. - await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to timeout and start second try. - await mockClient.client.doErr({ status: 500 }); // Send error for second retry. - await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. - assert(fetchObj.fetchCalls > 0); - n = mockClient.numCreated; - assert(n == 5); - // No stop() here. The streamRetryTimeout will still be running. - - // Test normal start after error close. Poller should be stopped. - const prevFetchCalls = fetchObj.fetchCalls; - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. - assert(fetchObj.fetchCalls == prevFetchCalls); - n = mockClient.numCreated; - assert(n == 6); - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + updater.stop(); + + // Test start after normal close. + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 2); + updater.stop(); + + // Test error after normal close. + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + await mockClient.client.doErr({ status: 500 }); // Send error to stop current stream. + await new Promise((r) => setTimeout(r, 200)); // Wait for stream to init. + await mockClient.client.doErr({ status: 500 }); // Send error for first retry. + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to timeout and start second try. + await mockClient.client.doErr({ status: 500 }); // Send error for second retry. + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + assert(fetchObj.fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 5); + // No stop() here. The streamRetryTimeout will still be running. + + // Test normal start after error close. Poller should be stopped. + const prevFetchCalls = fetchObj.fetchCalls; + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + await new Promise((r) => setTimeout(r, 500)); // Wait for stream to init. + assert(fetchObj.fetchCalls == prevFetchCalls); + n = mockClient.numCreated; + assert(n == 6); + updater.stop(); }); test('FlagConfigUpdater.connect, start success, keep alive success, no fallback to poller', async () => { @@ -503,31 +429,26 @@ test('FlagConfigUpdater.connect, start success, keep alive success, no fallback const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 200, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Test keep alive. - await new Promise((r) => setTimeout(r, 15000)); // Wait before keep alive timeouts. - await mockClient.client.doMsg({ data: ' ' }); - assert(fetchObj.fetchCalls == 0); - n = mockClient.numCreated; - assert(n == 1); - - await new Promise((r) => setTimeout(r, 3000)); // Wait for original keep alive timeout to reach. - assert(fetchObj.fetchCalls == 0); - n = mockClient.numCreated; - assert(n == 1); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive. + await new Promise((r) => setTimeout(r, 15000)); // Wait before keep alive timeouts. + await mockClient.client.doMsg({ data: ' ' }); + assert(fetchObj.fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 1); + + await new Promise((r) => setTimeout(r, 3000)); // Wait for original keep alive timeout to reach. + assert(fetchObj.fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 1); + + updater.stop(); }); test('FlagConfigStreamer.connect, start success, keep alive fail, retry success', async () => { @@ -535,27 +456,22 @@ test('FlagConfigStreamer.connect, start success, keep alive fail, retry success' const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 200, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Test keep alive fail. - await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - n = mockClient.numCreated; - assert(n == 2); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive fail. + await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + n = mockClient.numCreated; + assert(n == 2); + + updater.stop(); }); test('FlagConfigUpdater.connect, start success, keep alive fail, retry fail twice, fallback to poller', async () => { @@ -563,29 +479,24 @@ test('FlagConfigUpdater.connect, start success, keep alive fail, retry fail twic const { fetchObj, mockClient, updater } = getTestObjs({ pollingIntervalMillis: 200, }); - try { - updater.start(); - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Test keep alive fail. - await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. - await mockClient.client.doErr({ status: 500 }); // Send error for first try. - await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. - await mockClient.client.doErr({ status: 500 }); // Send error for second try. - await new Promise((r) => setTimeout(r, 500)); // Wait for poller to init. - assert(fetchObj.fetchCalls > 0); - n = mockClient.numCreated; - assert(n == 3); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Test keep alive fail. + await new Promise((r) => setTimeout(r, 17500)); // Wait for keep alive to fail and enter retry. + await mockClient.client.doErr({ status: 500 }); // Send error for first try. + await new Promise((r) => setTimeout(r, 1200)); // Wait for stream to init. + await mockClient.client.doErr({ status: 500 }); // Send error for second try. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to init. + assert(fetchObj.fetchCalls > 0); + n = mockClient.numCreated; + assert(n == 3); + + updater.stop(); }); test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream success, stop poller, no more retry stream', async () => { @@ -594,39 +505,34 @@ test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream su pollingIntervalMillis: 200, streamFlagRetryDelayMillis: 2000, }); - try { - updater.start(); - await mockClient.client.doErr({ status: 501 }); // Fatal err to fail initial conn. - await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. - assert(fetchObj.fetchCalls > 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Check for retry stream start. - await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. - n = mockClient.numCreated; - assert(n == 2); - - // Retry stream success. - const prevFetchCalls = fetchObj.fetchCalls; - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == prevFetchCalls); - - // Wait to check poller stopped. - await new Promise((r) => setTimeout(r, 500)); - assert(fetchObj.fetchCalls == prevFetchCalls); - - // Check there is no more retry stream. - await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. - n = mockClient.numCreated; - assert(n == 2); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 501 }); // Fatal err to fail initial conn. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. + assert(fetchObj.fetchCalls > 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Check for retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + // Retry stream success. + const prevFetchCalls = fetchObj.fetchCalls; + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == prevFetchCalls); + + // Wait to check poller stopped. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchObj.fetchCalls == prevFetchCalls); + + // Check there is no more retry stream. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + updater.stop(); }); test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream fail, continue poller, retry stream success, stop poller', async () => { @@ -635,49 +541,143 @@ test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream fa pollingIntervalMillis: 200, streamFlagRetryDelayMillis: 2000, }); - try { - updater.start(); - await mockClient.client.doErr({ status: 501 }); // Fatal err to fail initial conn. - await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. - assert(fetchObj.fetchCalls > 0); - let n = mockClient.numCreated; - assert(n == 1); - - // Wait for retry stream start. - await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. - n = mockClient.numCreated; - assert(n == 2); - - // Retry stream fail. - let prevFetchCalls = fetchObj.fetchCalls; - await mockClient.client.doErr({ status: 500 }); // Fatal err to fail stream retry. - - // Wait to check poller continues to poll. - await new Promise((r) => setTimeout(r, 500)); - assert(fetchObj.fetchCalls > prevFetchCalls); - - // Wait for another retry stream start. - await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. - n = mockClient.numCreated; - assert(n == 3); - - // Retry stream success. - prevFetchCalls = fetchObj.fetchCalls; - await mockClient.client.doOpen({ type: 'open' }); - await mockClient.client.doMsg({ data: '[]' }); - assert(fetchObj.fetchCalls == prevFetchCalls); - - // Wait to check poller stopped. - await new Promise((r) => setTimeout(r, 500)); - assert(fetchObj.fetchCalls == prevFetchCalls); - - updater.stop(); - } catch (e) { - updater.stop(); - fail(e); - } + updater.start(); + await mockClient.client.doErr({ status: 501 }); // Fatal err to fail initial conn. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. + assert(fetchObj.fetchCalls > 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Wait for retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + // Retry stream fail. + let prevFetchCalls = fetchObj.fetchCalls; + await mockClient.client.doErr({ status: 500 }); // Fatal err to fail stream retry. + + // Wait to check poller continues to poll. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchObj.fetchCalls > prevFetchCalls); + + // Wait for another retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 3); + + // Retry stream success. + prevFetchCalls = fetchObj.fetchCalls; + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ data: '[]' }); + assert(fetchObj.fetchCalls == prevFetchCalls); + + // Wait to check poller stopped. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchObj.fetchCalls == prevFetchCalls); + + updater.stop(); }); test.todo( 'FlagConfigUpdater.connect, start and immediately stop and immediately start is an unhandled edge case', ); + +test('FlagConfigUpdater.connect, flag success, cohort success', async () => { + const { fetchObj, mockClient, updater } = getTestObjs({ + pollingIntervalMillis: 100, + }); + // Return cohort with their own cohortId. + jest + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async (options) => { + return { + cohortId: options.cohortId, + groupType: '', + groupTypeId: 0, + lastComputed: 0, + lastModified: 0, + size: 0, + memberIds: new Set([]), + }; + }); + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: `[{"key":"flag2","segments":[{ + "conditions":[[{"op":"set contains any","selector":["context","user","cohort_ids"],"values":["hahahaha2"]}]], + "metadata":{"segmentName": "Segment 1"},"variant": "off" + }],"variants": {}}]`, + }); + await new Promise((r) => setTimeout(r, 1000)); // Wait for poller to poll. + expect(fetchObj.fetchCalls).toBe(0); + expect(mockClient.numCreated).toBe(1); + updater.stop(); +}); + +test('FlagConfigUpdater.connect, flag success, cohort fail, retry fail, initialization fails, fallback to poller', async () => { + jest.setTimeout(20000); + jest + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async () => { + throw Error(); + }); + const { fetchObj, mockClient, updater } = getTestObjs({ + pollingIntervalMillis: 100, + streamFlagTryAttempts: 2, + streamFlagTryDelayMillis: 1000, + streamFlagRetryDelayMillis: 100000, + }); + // Return cohort with their own cohortId. + updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: FLAG_WITH_COHORT, + }); + // Second try + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: FLAG_WITH_COHORT, + }); + + expect(fetchObj.fetchCalls).toBeGreaterThanOrEqual(1); + expect(mockClient.numCreated).toBe(2); + updater.stop(); +}); + +test('FlagConfigUpdater.connect, flag success, cohort fail, initialization fails, fallback to poller, poller fails, streamer start error', async () => { + jest.setTimeout(10000); + jest + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async () => { + throw Error(); + }); + const { fetchObj, mockClient, updater } = getTestObjs({ + pollingIntervalMillis: 30000, + streamFlagTryAttempts: 1, + streamFlagTryDelayMillis: 1000, + streamFlagRetryDelayMillis: 100000, + fetcherData: [ + FLAG_WITH_COHORT, + FLAG_WITH_COHORT, + FLAG_WITH_COHORT, + FLAG_WITH_COHORT, + FLAG_WITH_COHORT, + ], + }); + // Return cohort with their own cohortId. + const startPromise = updater.start(); + await mockClient.client.doOpen({ type: 'open' }); + await mockClient.client.doMsg({ + data: FLAG_WITH_COHORT, + }); + // Stream failed, poller should fail as well given the flags and cohort mock. + expect(fetchObj.fetchCalls).toBeGreaterThanOrEqual(1); + expect(mockClient.numCreated).toBe(1); + // Test should exit cleanly as updater.start() failure should stop the streamer. + try { + await startPromise; + fail(); + // eslint-disable-next-line no-empty + } catch {} +});