Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream support #42

Merged
merged 14 commits into from
Mar 15, 2024
3 changes: 2 additions & 1 deletion packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"dependencies": {
"@amplitude/analytics-node": "^1.3.4",
"@amplitude/analytics-types": "^1.3.1",
"@amplitude/experiment-core": "^0.7.2"
"@amplitude/experiment-core": "^0.7.2",
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
"eventsource": "^2.0.2"
}
}
39 changes: 30 additions & 9 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import {
EvaluationFlag,
topologicalSort,
} from '@amplitude/experiment-core';
import EventSource from 'eventsource';

import { Assignment, AssignmentService } from '../assignment/assignment';
import { InMemoryAssignmentFilter } from '../assignment/assignment-filter';
import { AmplitudeAssignmentService } from '../assignment/assignment-service';
import { FetchHttpClient } from '../transport/http';
import { StreamEventSourceFactory } from '../transport/stream';
import {
AssignmentConfig,
AssignmentConfigDefaults,
Expand All @@ -30,6 +32,8 @@ import {
import { InMemoryFlagConfigCache } from './cache';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
import { FlagConfigUpdater } from './updater';

/**
* Experiment client for evaluating variants for a user locally.
Expand All @@ -38,7 +42,7 @@ import { FlagConfigPoller } from './poller';
export class LocalEvaluationClient {
private readonly logger: Logger;
private readonly config: LocalEvaluationConfig;
private readonly poller: FlagConfigPoller;
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;

Expand All @@ -54,6 +58,8 @@ export class LocalEvaluationClient {
config: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceFactory: StreamEventSourceFactory = (url, params) =>
new EventSource(url, params),
) {
this.config = { ...LocalEvaluationDefaults, ...config };
const fetcher = new FlagConfigFetcher(
Expand All @@ -67,12 +73,27 @@ export class LocalEvaluationClient {
this.config.bootstrap,
);
this.logger = new ConsoleLogger(this.config.debug);
this.poller = new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
this.updater = this.config.streamUpdates
? new FlagConfigStreamer(
apiKey,
fetcher,
this.cache,
streamEventSourceFactory,
this.config.flagConfigPollingIntervalMillis,
this.config.streamConnTimeoutMillis,
this.config.streamFlagConnTimeoutMillis,
this.config.streamFlagTryAttempts,
this.config.streamFlagTryDelayMillis,
this.config.streamFlagRetryDelayMillis,
this.config.streamServerUrl,
this.config.debug,
)
: new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
if (this.config.assignmentConfig) {
this.config.assignmentConfig = {
...AssignmentConfigDefaults,
Expand Down Expand Up @@ -158,7 +179,7 @@ export class LocalEvaluationClient {
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.poller.start();
return await this.updater.start();
}

/**
Expand All @@ -167,6 +188,6 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.poller.stop();
return this.updater.stop();
}
}
3 changes: 2 additions & 1 deletion packages/node/src/local/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';

import { FlagConfigFetcher } from './fetcher';
import { FlagConfigUpdater } from './updater';

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 5,
Expand All @@ -13,7 +14,7 @@ const BACKOFF_POLICY: BackoffPolicy = {
scalar: 1,
};

export class FlagConfigPoller {
export class FlagConfigPoller implements FlagConfigUpdater {
private readonly logger: Logger;
private readonly pollingIntervalMillis: number;

Expand Down
261 changes: 261 additions & 0 deletions packages/node/src/local/stream-flag-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import { EvaluationFlag } from '@amplitude/experiment-core';

import {
StreamErrorEvent,
DEFAULT_STREAM_ERR_EVENTS,
StreamEventSourceFactory,
SdkStream,
StreamOnErrorCallback,
StreamOptions,
} from '../transport/stream';

const DEFAULT_INITIAL_CONN_TIMEOUT = 1000;
const DEFAULT_TRY_ATTEMPTS = 2;
const DEFAULT_TRY_WAIT_TIMEOUT = 1000;

export type StreamFlagOptions = StreamOptions;

export type StreamFlagOnUpdateCallback = (
flags: Record<string, EvaluationFlag>,
) => unknown;
export type StreamFlagOnErrorCallback = StreamOnErrorCallback;

export interface StreamFlagApi {
/**
* To connect to the stream flag endpoint.
* It will connect the stream and makes sure the initial flag configs are received and valid.
* The initial flag configs are delivered through onUpdate.
* It attempts to retry up to the attempts specified.
* If fatal error happens during connect() call, error will be thrown instead of delivered through onError.
* @param options Options for connection.
*/
connect(options?: StreamOptions): Promise<void>;
/**
* To close the stream.
* If application don't call this, the application may not exit as there are underlaying timers.
*/
close(): void;
/**
* Check if the stream is closed and no retry action is happening.
*/
isClosed: boolean;
/**
* Callback for receiving flag configs updates.
* Can set this value directly multiple times and effect immediately.
*/
onUpdate?: StreamFlagOnUpdateCallback;
/**
* Callback for receiving fatal errors.
* Fatal errors are defined as server returning 501 or retry has reached max attempts.
* This callback will not be called when error happens during connect() call. The error will be throwed in connect() instead.
* Can set this value directly multiple times and effect immediately.
*/
onError?: StreamFlagOnErrorCallback;
}

/**
* This class receives flag config updates from server.
* It also handles errors, retries, flag parsing, and initial flags on connection, in addition to SdkStreamApi.
*/
export class SdkStreamFlagApi implements StreamFlagApi {
// Underlaying SSE api.
private api: SdkStream;
// Flag for whether the stream is open and retrying or closed. This is to avoid calling connect() twice.
private isClosedAndNotTrying = true;

// Callback for updating flag configs. Can be set or changed multiple times and effect immediately.
public onUpdate?: StreamFlagOnUpdateCallback;
// Callback for notifying user of fatal errors. Can be set or changed multiple times and effect immediately.
public onError?: StreamFlagOnErrorCallback;

// Options for streaming.
private options?: StreamFlagOptions;
// Timeout for a single try of connection. Includes streamConnTimeoutMillis and time for receiving initial flag configs.
private streamFlagConnTimeoutMillis: number;
// Number of attempts for trying connection.
private streamFlagTryAttempts: number;
// The delay between attempts.
private streamFlagTryDelayMillis: number;

constructor(
deploymentKey: string,
serverUrl: string,
eventSourceFactory: StreamEventSourceFactory,
streamConnTimeoutMillis?: number,
streamFlagConnTimeoutMillis: number = DEFAULT_INITIAL_CONN_TIMEOUT,
streamFlagTryAttempts: number = DEFAULT_TRY_ATTEMPTS,
streamFlagTryDelayMillis: number = DEFAULT_TRY_WAIT_TIMEOUT,
) {
this.api = new SdkStream(
deploymentKey,
serverUrl + '/sdk/stream/v1/flags',
eventSourceFactory,
streamConnTimeoutMillis,
);
this.streamFlagConnTimeoutMillis = Math.max(0, streamFlagConnTimeoutMillis);
this.streamFlagTryAttempts = Math.max(1, streamFlagTryAttempts);
this.streamFlagTryDelayMillis = Math.max(0, streamFlagTryDelayMillis);
}

// A try:
// Try connect and receive at least one single flag update.
private connectTry(options?: StreamFlagOptions) {
// Timeout for initial connection. Makes sure the connection do not exceed a certain interval.
let timeout: NodeJS.Timeout | undefined = undefined;
return new Promise<void>((resolve, reject) => {
// On connection and receiving first update, success, set future flag update callback and error handling retries.
const dealWithFlagUpdateInOneTry = async (data: string) => {
if (timeout) {
clearTimeout(timeout);
}
try {
// Make sure valid flag configs.
SdkStreamFlagApi.parseFlagConfigs(data);
} catch (e) {
return reject(DEFAULT_STREAM_ERR_EVENTS.DATA_UNPARSABLE);
}
// Update the callbacks.
this.api.onUpdate = (data: string) => this.handleNewMsg(data);
this.api.onError = (err: StreamErrorEvent) => this.errorAndRetry(err);
// Handoff data to application. Make sure it finishes processing initial new flag configs.
await this.handleNewMsg(data);
// Resolve promise which declares client ready.
resolve();
};
this.api.onUpdate = dealWithFlagUpdateInOneTry;

// If it fails to connect, fails try.
// If it disconnects before flag update, fails try.
const dealWithErrorInOneTry = async (err: StreamErrorEvent) => {
if (timeout) {
clearTimeout(timeout);
}
reject(err); // Reject promise which will either retry or fatal err.
};
this.api.onError = dealWithErrorInOneTry;

// Try connect.
this.api.connect(options);

// If it fails to return flag update within limit time, fails try.
timeout = setTimeout(() => {
dealWithErrorInOneTry(DEFAULT_STREAM_ERR_EVENTS.TIMEOUT);
}, this.streamFlagConnTimeoutMillis);
});
}

// Do try up to 2 times. If any of error is fatal, stop any further tries.
// If trials times reached, fatal error.
public async connect(options?: StreamFlagOptions) {
// Makes sure there is no other connect running.
if (!this.isClosedAndNotTrying) {
return;
}
this.isClosedAndNotTrying = false;

this.options = options; // Save options for retries in case of errors.
const attempts = this.streamFlagTryAttempts;
const delay = this.streamFlagTryDelayMillis;
for (let i = 0; i < attempts; i++) {
try {
// Try.
return await this.connectTry(options);
} catch (e) {
if (this.isClosedAndNotTrying) {
// There's a call to close while waiting for connection.
return;
}

// connectTry() does not call close or closeForRetry on error.
const err = e as StreamErrorEvent;
if (this.isFatal(err) || i == attempts - 1) {
// We want to throw exception instead of call onError callback.
this.close();
throw err;
}

// Retry.
this.closeForRetry();
await new Promise((resolve) => setTimeout(resolve, delay));

if (this.isClosedAndNotTrying) {
// There's a call to close while waiting for retry.
return;
}
}
}
}

// Close stream.
public close() {
this.closeForRetry();
this.isClosedAndNotTrying = true;
}

// Close stream, but we know there will be another try happening very soon.
public closeForRetry() {
this.api.close();
}

get isClosed() {
return this.isClosedAndNotTrying;
}

// Fatal error if 501 Unimplemented.
private isFatal(err: StreamErrorEvent) {
return err && err?.status == 501;
}

// If error during normal operation, retry init connection up to 2 times.
private async errorAndRetry(err: StreamErrorEvent) {
if (this.isFatal(err)) {
this.close();
await this.fatalErr(err);
} else {
this.close(); // Not closeForRetry(), connect checks for isClosedAndNotTrying.
this.connect(this.options).catch((err) => {
this.fatalErr(err);
});
}
}

// No more retry, 501 unimplemented. Need fallback.
private async fatalErr(err: StreamErrorEvent) {
if (this.onError) {
try {
await this.onError(err);
// eslint-disable-next-line no-empty
} catch {} // Don't care about application errors after handoff.
}
}

// Handles new messages, parse them, and handoff to application. Retries if have parsing error.
private async handleNewMsg(data: string) {
let flagConfigs;
try {
flagConfigs = SdkStreamFlagApi.parseFlagConfigs(data);
} catch (e) {
this.errorAndRetry(DEFAULT_STREAM_ERR_EVENTS.DATA_UNPARSABLE);
return;
}
// Put update outside try catch. onUpdate error doesn't mean stream error.
if (this.onUpdate) {
try {
await this.onUpdate(flagConfigs);
// eslint-disable-next-line no-empty
} catch {} // Don't care about application errors after handoff.
}
}

// Parse message. Throws if unparsable.
private static parseFlagConfigs(data: string) {
const flagsArray: EvaluationFlag[] = JSON.parse(data) as EvaluationFlag[];
return flagsArray.reduce(
(map: Record<string, EvaluationFlag>, flag: EvaluationFlag) => {
map[flag.key] = flag;
return map;
},
{},
);
}
}
Loading
Loading