Skip to content

Commit

Permalink
added stream flag configs updates
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Feb 25, 2024
1 parent aba5334 commit 9220241
Show file tree
Hide file tree
Showing 9 changed files with 1,266 additions and 12 deletions.
9 changes: 7 additions & 2 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
"dependencies": {
"@amplitude/analytics-node": "^1.3.4",
"@amplitude/analytics-types": "^1.3.1",
"@amplitude/experiment-core": "^0.7.2"
"@amplitude/experiment-core": "^0.7.2",
"eventsource": "^2.0.2"
},
"devDependencies": {
"@types/eventsource": "^1.1.15",
"@types/node": "18.7.23"
}
}
}
38 changes: 29 additions & 9 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import * as amplitude from '@amplitude/analytics-node';
import {
EvaluationEngine,
EvaluationFlag,
StreamEventSourceClass,
topologicalSort,
} from '@amplitude/experiment-core';
import EventSource from 'eventsource';

import { Assignment, AssignmentService } from '../assignment/assignment';
import { InMemoryAssignmentFilter } from '../assignment/assignment-filter';
Expand All @@ -30,6 +32,8 @@ import {
import { InMemoryFlagConfigCache } from './cache';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
import { FlagConfigUpdater } from './updater';

/**
* Experiment client for evaluating variants for a user locally.
Expand All @@ -38,7 +42,7 @@ import { FlagConfigPoller } from './poller';
export class LocalEvaluationClient {
private readonly logger: Logger;
private readonly config: LocalEvaluationConfig;
private readonly poller: FlagConfigPoller;
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;

Expand All @@ -54,6 +58,7 @@ export class LocalEvaluationClient {
config: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceClass: StreamEventSourceClass = EventSource,
) {
this.config = { ...LocalEvaluationDefaults, ...config };
const fetcher = new FlagConfigFetcher(
Expand All @@ -67,12 +72,27 @@ export class LocalEvaluationClient {
this.config.bootstrap,
);
this.logger = new ConsoleLogger(this.config.debug);
this.poller = new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
this.updater = this.config.getFlagConfigUpdateWithStream
? new FlagConfigStreamer(
apiKey,
fetcher,
this.cache,
streamEventSourceClass,
this.config.flagConfigPollingIntervalMillis,
this.config.streamConnTimeoutMillis,
this.config.streamFlagConnTimeoutMillis,
this.config.streamFlagTryAttempts,
this.config.streamFlagTryDelayMillis,
this.config.retryStreamFlagDelayMillis,
this.config.streamServerUrl,
this.config.debug,
)
: new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
if (this.config.assignmentConfig) {
this.config.assignmentConfig = {
...AssignmentConfigDefaults,
Expand Down Expand Up @@ -158,7 +178,7 @@ export class LocalEvaluationClient {
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.poller.start();
return await this.updater.start();
}

/**
Expand All @@ -167,6 +187,6 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.poller.stop();
return this.updater.stop();
}
}
3 changes: 2 additions & 1 deletion packages/node/src/local/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';

import { FlagConfigFetcher } from './fetcher';
import { FlagConfigUpdater } from './updater';

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 5,
Expand All @@ -13,7 +14,7 @@ const BACKOFF_POLICY: BackoffPolicy = {
scalar: 1,
};

export class FlagConfigPoller {
export class FlagConfigPoller implements FlagConfigUpdater {
private readonly logger: Logger;
private readonly pollingIntervalMillis: number;

Expand Down
169 changes: 169 additions & 0 deletions packages/node/src/local/streamer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import {
SdkStreamFlagApi,
StreamErrorEvent,
StreamEventSourceClass,
} from '@amplitude/experiment-core';

import { version as PACKAGE_VERSION } from '../../gen/version';
import { LocalEvaluationDefaults } from '../types/config';
import { FlagConfigCache } from '../types/flag';
import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';

import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';

export class FlagConfigStreamer {
private readonly logger: Logger;

private readonly poller: FlagConfigPoller;
private readonly stream: SdkStreamFlagApi;
private readonly retryStreamFlagDelayMillis: number;

private streamRetryTimeout?: NodeJS.Timeout;

public readonly cache: FlagConfigCache;

constructor(
apiKey: string,
fetcher: FlagConfigFetcher,
cache: FlagConfigCache,
streamEventSourceClass: StreamEventSourceClass,
pollingIntervalMillis = LocalEvaluationDefaults.flagConfigPollingIntervalMillis,
streamConnTimeoutMillis = LocalEvaluationDefaults.streamConnTimeoutMillis,
streamFlagConnTimeoutMillis = LocalEvaluationDefaults.streamFlagConnTimeoutMillis,
streamFlagTryAttempts = LocalEvaluationDefaults.streamFlagTryAttempts,
streamFlagTryDelayMillis = LocalEvaluationDefaults.streamFlagTryDelayMillis,
retryStreamFlagDelayMillis = LocalEvaluationDefaults.retryStreamFlagDelayMillis,
serverUrl: string = LocalEvaluationDefaults.serverUrl,
debug = false,
) {
this.logger = new ConsoleLogger(debug);
this.logger.debug('[Experiment] streamer - init');
this.cache = cache;
this.poller = new FlagConfigPoller(
fetcher,
cache,
pollingIntervalMillis,
debug,
);
this.stream = new SdkStreamFlagApi(
apiKey,
serverUrl,
streamEventSourceClass,
streamConnTimeoutMillis,
streamFlagConnTimeoutMillis,
streamFlagTryAttempts,
streamFlagTryDelayMillis,
);
this.retryStreamFlagDelayMillis = retryStreamFlagDelayMillis;
}

/**
* Fetch initial flag configurations and start polling for updates.
*
* You must call this function to begin polling for flag config updates.
* The promise returned by this function is resolved when the initial call
* to fetch the flag configuration completes.
*
* Calling this function while the poller is already running does nothing.
*/
public async start(
onChange?: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
this.stream.onError = (e) => {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - onError, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
);
this.poller.start(onChange);
this.startRetryStreamTimeout();
};

this.stream.onUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
let changed = false;
if (onChange) {
const current = await this.cache.getAll();
if (!Object.is(current, flagConfigs)) {
changed = true;
}
}
await this.cache.clear();
await this.cache.putAll(flagConfigs);
if (changed) {
await onChange(this.cache);
}
};

try {
// Clear retry timeout. If stream isn't connected, we're trying now.
// If stream is connected, timeout will be undefined and connect will do nothing.
if (this.streamRetryTimeout) {
clearTimeout(this.streamRetryTimeout);
}
// stream connect error will be raised, not through calling onError.
// So onError won't be called.
await this.stream.connect({
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
});
this.poller.stop();
this.logger.debug('[Experiment] streamer - start stream success');
} catch (e) {
const err = e as StreamErrorEvent;
this.logger.debug(
`[Experiment] streamer - start stream failed, fallback to poller, err status: ${err.status}, err message: ${err.message}`,
);
await this.poller.start(onChange);
this.startRetryStreamTimeout();
}
}

/**
* Stop polling for flag configurations.
*
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
this.logger.debug('[Experiment] streamer - stop');
if (this.streamRetryTimeout) {
clearTimeout(this.streamRetryTimeout);
}
this.poller.stop();
this.stream.close();
}

/**
* Force a flag config fetch and cache the update with an optional callback
* which gets called if the flag configs change in any way.
*
* @param onChange optional callback which will get called if the flag configs
* in the cache have changed.
*/
public async update(
onChange?: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
this.poller.update(onChange);
}

// Retry stream after a while.
private startRetryStreamTimeout() {
if (this.streamRetryTimeout) {
clearTimeout(this.streamRetryTimeout);
}
this.streamRetryTimeout = setTimeout(() => {
this.logger.debug('[Experiment] streamer - retry stream');
this.stream
.connect()
.then(() => {
this.logger.debug('[Experiment] streamer - retry stream success');
// Stop poller.
this.poller.stop();
})
// No need to set timeout here. onError handles calling startRetryStreamInterval().
// eslint-disable-next-line @typescript-eslint/no-empty-function
.catch(() => {});
}, this.retryStreamFlagDelayMillis);
}
}
26 changes: 26 additions & 0 deletions packages/node/src/local/updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { FlagConfigCache } from '..';

export interface FlagConfigUpdater {
/**
* Fetch initial flag configurations and start watching for updates.
*
* You must call this function to begin watching for flag config updates.
* The promise returned by this function is resolved when the initial call
* to fetch the flag configuration completes.
*/
start(onChange?: (cache: FlagConfigCache) => Promise<void>): Promise<void>;

/**
* Stop updating flag configurations.
*/
stop(): void;

/**
* Force a flag config fetch and cache the update with an optional callback
* which gets called if the flag configs change in any way.
*
* @param onChange optional callback which will get called if the flag configs
* in the cache have changed.
*/
update(onChange?: (cache: FlagConfigCache) => Promise<void>): Promise<void>;
}
45 changes: 45 additions & 0 deletions packages/node/src/types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,44 @@ export type LocalEvaluationConfig = {
* evaluation.
*/
assignmentConfig?: AssignmentConfig;

/**
* To use streaming API or polling. With streaming, flag config updates are
* received immediately, no polling is necessary. If stream fails, it will
* fallback to polling automatically.
*/
getFlagConfigUpdateWithStream?: boolean;

/**
* The stream server endpoint from which to stream data.
*/
streamServerUrl?: string;

/**
* To use with streaming. The timeout for connecting an server-side event stream. Aka, the timeout for http connection.
*/
streamConnTimeoutMillis?: number;

/**
* To use with streaming. The timeout for a single attempt of establishing a valid stream of flag configs.
* This includes streamConnTimeoutMillis and time for receiving initial flag configs.
*/
streamFlagConnTimeoutMillis?: number;

/**
* To use with streaming. The number attempts to connect before declaring streaming fatal error.
*/
streamFlagTryAttempts?: number;

/**
* To use with streaming. The delay between attempts to connect.
*/
streamFlagTryDelayMillis?: number;

/**
* To use with streaming. The delay to retry streaming after stream fatal error and fallbacked to poller.
*/
retryStreamFlagDelayMillis?: number;
};

export type AssignmentConfig = {
Expand Down Expand Up @@ -181,6 +219,13 @@ export const LocalEvaluationDefaults: LocalEvaluationConfig = {
bootstrap: {},
flagConfigPollingIntervalMillis: 30000,
httpAgent: null,
getFlagConfigUpdateWithStream: false,
streamServerUrl: 'https://stream.lab.amplitude.com',
streamConnTimeoutMillis: 1000,
streamFlagConnTimeoutMillis: 1000,
streamFlagTryAttempts: 2,
streamFlagTryDelayMillis: 1000,
retryStreamFlagDelayMillis: 15000,
};

export const AssignmentConfigDefaults: Omit<AssignmentConfig, 'apiKey'> = {
Expand Down
Loading

0 comments on commit 9220241

Please sign in to comment.