diff --git a/packages/node/src/local/streamer.ts b/packages/node/src/local/streamer.ts index b535fc4..95a1140 100644 --- a/packages/node/src/local/streamer.ts +++ b/packages/node/src/local/streamer.ts @@ -20,7 +20,7 @@ export class FlagConfigStreamer { private readonly stream: SdkStreamFlagApi; private readonly retryStreamFlagDelayMillis: number; - private streamRetryTimeout?: NodeJS.Timeout; + private streamRetryInterval?: NodeJS.Timeout; public readonly cache: FlagConfigCache; @@ -77,7 +77,7 @@ export class FlagConfigStreamer { `[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`, ); this.poller.start(onChange); - this.startRetryStreamTimeout(); + this.startRetryStreamInterval(); }; this.stream.onUpdate = async (flagConfigs) => { @@ -99,9 +99,7 @@ export class FlagConfigStreamer { try { // Clear retry timeout. If stream isn't connected, we're trying now. // If stream is connected, timeout will be undefined and connect will do nothing. - if (this.streamRetryTimeout) { - clearTimeout(this.streamRetryTimeout); - } + this.clearRetryStreamInterval(); // stream connect error will be raised, not through calling onError. // So onError won't be called. await this.stream.connect({ @@ -116,7 +114,7 @@ export class FlagConfigStreamer { `[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`, ); await this.poller.start(onChange); - this.startRetryStreamTimeout(); + this.startRetryStreamInterval(); } } @@ -127,9 +125,7 @@ export class FlagConfigStreamer { */ public stop(): void { this.logger.debug('[Experiment] streamer - stop'); - if (this.streamRetryTimeout) { - clearTimeout(this.streamRetryTimeout); - } + this.clearRetryStreamInterval(); this.poller.stop(); this.stream.close(); } @@ -148,22 +144,29 @@ export class FlagConfigStreamer { } // Retry stream after a while. - private startRetryStreamTimeout() { - if (this.streamRetryTimeout) { - clearTimeout(this.streamRetryTimeout); - } - this.streamRetryTimeout = setTimeout(() => { + private startRetryStreamInterval() { + this.clearRetryStreamInterval(); + this.streamRetryInterval = setInterval(() => { this.logger.debug('[Experiment] streamer - retry stream'); this.stream .connect() .then(() => { this.logger.debug('[Experiment] streamer - retry stream success'); + // Clear interval. + this.clearRetryStreamInterval(); // Stop poller. this.poller.stop(); }) - // No need to set timeout here. onError handles calling startRetryStreamInterval(). // eslint-disable-next-line @typescript-eslint/no-empty-function .catch(() => {}); }, this.retryStreamFlagDelayMillis); } + + // Clear retry interval. + private clearRetryStreamInterval() { + if (this.streamRetryInterval) { + clearInterval(this.streamRetryInterval); + this.streamRetryInterval = undefined; + } + } } diff --git a/packages/node/test/local/flagConfigStreamer.test.ts b/packages/node/test/local/flagConfigStreamer.test.ts index 228552e..1ddfe9d 100644 --- a/packages/node/test/local/flagConfigStreamer.test.ts +++ b/packages/node/test/local/flagConfigStreamer.test.ts @@ -875,6 +875,136 @@ test('FlagConfigUpdater.connect, start success, keep alive fail, retry fail twic } }); +test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream success, stop poller, no more retry stream', async () => { + jest.setTimeout(10000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + 2000, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doErr({ status: 501 }); // Fatal err to fail initial conn. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. + assert(fetchCalls > 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Check for retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + // Retry stream success. + const prevFetchCalls = fetchCalls; + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == prevFetchCalls); + + // Wait to check poller stopped. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchCalls == prevFetchCalls); + + // Check there is no more retry stream. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + +test('FlagConfigUpdater.connect, start fail, fallback to poller, retry stream fail, continue poller, retry stream success, stop poller', async () => { + jest.setTimeout(10000); + const mockClient = getNewClient(); + let fetchCalls = 0; + const mockFetcher = new FlagConfigFetcher( + apiKey, + new MockHttpClient(async () => { + fetchCalls++; + return { status: 200, body: '[]' }; + }), + ); + const updater = new FlagConfigStreamer( + apiKey, + mockFetcher, + cache, + mockClient.clientClass, + 200, // poller fetch every 100ms. + streamConnTimeoutMillis, + streamFlagConnTimeoutMillis, + streamFlagTryAttempts, + streamFlagTryDelayMillis, + 2000, + serverUrl, + false, + ); + try { + updater.start(); + await mockClient.client!.doErr({ status: 501 }); // Fatal err to fail initial conn. + await new Promise((r) => setTimeout(r, 500)); // Wait for poller to start. + assert(fetchCalls > 0); + let n = mockClient.numCreated; + assert(n == 1); + + // Wait for retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 2); + + // Retry stream fail. + let prevFetchCalls = fetchCalls; + await mockClient.client!.doErr({ status: 500 }); // Fatal err to fail stream retry. + + // Wait to check poller continues to poll. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchCalls > prevFetchCalls); + + // Wait for another retry stream start. + await new Promise((r) => setTimeout(r, 2000)); // Wait for retry. + n = mockClient.numCreated; + assert(n == 3); + + // Retry stream success. + prevFetchCalls = fetchCalls; + await mockClient.client!.doOpen({ type: 'open' }); + await mockClient.client!.doMsg({ data: '[]' }); + assert(fetchCalls == prevFetchCalls); + + // Wait to check poller stopped. + await new Promise((r) => setTimeout(r, 500)); + assert(fetchCalls == prevFetchCalls); + + await updater.stop(); + // Pass + } catch (e) { + updater.stop(); + fail(e); + } +}); + test.todo( 'FlagConfigUpdater.connect, start and immediately stop and immediately start is an unhandled edge case', );