Skip to content

Commit

Permalink
feat: Add cohort sync (#49)
Browse files Browse the repository at this point in the history
* move poller outside

* added cohort fetches

* remove unused imports

* always update cohort and fix test

* added cohort server url

* remove streamTest file

* added fetch key to base64 and memberId convert to set

* fixed type

* added cohort to eval context

* fixed bugs, moved configs, surface cohort errors to flag pollers

* storage update only after all cohort loads

* added tests

* added tests

* add ci test with secrets from env

* fix cohortPoller.test.ts

* not use environment

* added .env instr, added tests

* cleanup unnecessary check

* lint

* fix test node version matrix

* polish test and add macos-13

* updated gh action node versions to current lts's

* remove unsupported node v24

* added serverZone config option

* parameterize test

* moved config util code under util

* added eu test

* increase cohort fetch timeout

* update to flag poller loads new cohort, cohort updater polls updates

* fix client start and stop, cleanup

* fixed tests

* fixed typo, env, and err msg

* fix gh action

* added streamer test, added streamer onInitUpdate, clearer logic

* add test, add return types, move a util func

* fix null cohortUpdater when no cohort configs

* fix poller interval and comments

* fix relative imports

* add cohortRequestDelayMillis, use sleep util, skip retry if maxCohortSize error

* unused imports

* fix relative imports attempt 2

* Log error on eval, dont init fail on if cohort fail, add tests

* fix lint

* add no config integration test

* change default maxCohortSize

* changed configs

* fix lint

* add test, fix comment
  • Loading branch information
zhukaihan authored Aug 27, 2024
1 parent 824b87b commit 74221bf
Show file tree
Hide file tree
Showing 40 changed files with 4,135 additions and 696 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
fail-fast: false
matrix:
node-version: ['14', '16', '18']
node-version: ['16', '18', '20', '22']
os: [macos-latest, ubuntu-latest]
runs-on: ${{ matrix.os }}

Expand All @@ -38,3 +38,8 @@ jobs:

- name: Test
run: yarn test --testPathIgnorePatterns "benchmark.test.ts"
env:
API_KEY: ${{ secrets.API_KEY }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
EU_API_KEY: ${{ secrets.EU_API_KEY }}
EU_SECRET_KEY: ${{ secrets.EU_SECRET_KEY }}
1 change: 1 addition & 0 deletions packages/node/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Ignore generated files
gen
.env*
7 changes: 7 additions & 0 deletions packages/node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
To setup for running test on local, create a `.env` file with following
contents, and replace `{API_KEY}` and `{SECRET_KEY}` for the project in test:

```
API_KEY={API_KEY}
SECRET_KEY={SECRET_KEY}
```
152 changes: 138 additions & 14 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import { InMemoryAssignmentFilter } from '../assignment/assignment-filter';
import { AmplitudeAssignmentService } from '../assignment/assignment-service';
import { FetchHttpClient } from '../transport/http';
import { StreamEventSourceFactory } from '../transport/stream';
import { USER_GROUP_TYPE } from '../types/cohort';
import {
AssignmentConfig,
AssignmentConfigDefaults,
LocalEvaluationConfig,
LocalEvaluationDefaults,
} from '../types/config';
import { FlagConfigCache } from '../types/flag';
import { HttpClient } from '../types/transport';
import { ExperimentUser } from '../types/user';
import { Variant, Variants } from '../types/variant';
import { CohortUtils } from '../util/cohort';
import { populateLocalConfigDefaults } from '../util/config';
import { ConsoleLogger } from '../util/logger';
import { Logger } from '../util/logger';
import { convertUserToEvaluationContext } from '../util/user';
Expand All @@ -30,6 +32,10 @@ import {
} from '../util/variant';

import { InMemoryFlagConfigCache } from './cache';
import { CohortFetcher } from './cohort/fetcher';
import { CohortPoller } from './cohort/poller';
import { InMemoryCohortStorage } from './cohort/storage';
import { CohortUpdater } from './cohort/updater';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
Expand All @@ -40,33 +46,37 @@ const STREAM_RETRY_JITTER_MAX_MILLIS = 2000; // The jitter to add to delay after
const STREAM_ATTEMPTS = 1; // Number of attempts before fallback to poller.
const STREAM_TRY_DELAY_MILLIS = 1000; // The delay between attempts.

const COHORT_POLLING_INTERVAL_MILLIS_MIN = 60000;

/**
* Experiment client for evaluating variants for a user locally.
* @category Core Usage
*/
export class LocalEvaluationClient {
private readonly logger: Logger;
private readonly config: LocalEvaluationConfig;
protected readonly config: LocalEvaluationConfig;
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;
private readonly cohortUpdater?: CohortUpdater;

/**
* Directly access the client's flag config cache.
*
* Used for directly manipulating the flag configs used for evaluation.
*/
public readonly cache: InMemoryFlagConfigCache;
public readonly cohortStorage: InMemoryCohortStorage;

constructor(
apiKey: string,
config: LocalEvaluationConfig,
config?: LocalEvaluationConfig,
flagConfigCache?: FlagConfigCache,
httpClient: HttpClient = new FetchHttpClient(config?.httpAgent),
streamEventSourceFactory: StreamEventSourceFactory = (url, params) =>
new EventSource(url, params),
) {
this.config = { ...LocalEvaluationDefaults, ...config };
this.config = populateLocalConfigDefaults(config);
const fetcher = new FlagConfigFetcher(
apiKey,
httpClient,
Expand All @@ -78,27 +88,57 @@ export class LocalEvaluationClient {
this.config.bootstrap,
);
this.logger = new ConsoleLogger(this.config.debug);

this.cohortStorage = new InMemoryCohortStorage();
let cohortFetcher: CohortFetcher = undefined;
if (this.config.cohortSyncConfig) {
cohortFetcher = new CohortFetcher(
this.config.cohortSyncConfig.apiKey,
this.config.cohortSyncConfig.secretKey,
httpClient,
this.config.cohortSyncConfig?.cohortServerUrl,
this.config.cohortSyncConfig?.maxCohortSize,
undefined,
this.config.debug,
);
this.cohortUpdater = new CohortPoller(
cohortFetcher,
this.cohortStorage,
this.cache,
Math.max(
COHORT_POLLING_INTERVAL_MILLIS_MIN,
this.config.cohortSyncConfig?.cohortPollingIntervalMillis,
),
this.config.debug,
);
}

const flagsPoller = new FlagConfigPoller(
fetcher,
this.cache,
this.cohortStorage,
cohortFetcher,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
this.updater = this.config.streamUpdates
? new FlagConfigStreamer(
apiKey,
fetcher,
flagsPoller,
this.cache,
streamEventSourceFactory,
this.config.flagConfigPollingIntervalMillis,
this.config.streamFlagConnTimeoutMillis,
STREAM_ATTEMPTS,
STREAM_TRY_DELAY_MILLIS,
STREAM_RETRY_DELAY_MILLIS +
Math.floor(Math.random() * STREAM_RETRY_JITTER_MAX_MILLIS),
this.config.streamServerUrl,
this.cohortStorage,
cohortFetcher,
this.config.debug,
)
: new FlagConfigPoller(
fetcher,
this.cache,
this.config.flagConfigPollingIntervalMillis,
this.config.debug,
);
: flagsPoller;

if (this.config.assignmentConfig) {
this.config.assignmentConfig = {
...AssignmentConfigDefaults,
Expand Down Expand Up @@ -144,6 +184,7 @@ export class LocalEvaluationClient {
flagKeys?: string[],
): Record<string, Variant> {
const flags = this.cache.getAllCached() as Record<string, EvaluationFlag>;
this.enrichUserWithCohorts(user, flags);
this.logger.debug('[Experiment] evaluate - user:', user, 'flags:', flags);
const context = convertUserToEvaluationContext(user);
const sortedFlags = topologicalSort(flags, flagKeys);
Expand All @@ -153,6 +194,87 @@ export class LocalEvaluationClient {
return evaluationVariantsToVariants(results);
}

protected checkFlagsCohortsAvailable(
cohortIdsByFlag: Record<string, Set<string>>,
): boolean {
const availableCohortIds = this.cohortStorage.getAllCohortIds();
for (const key in cohortIdsByFlag) {
const flagCohortIds = cohortIdsByFlag[key];
const unavailableCohortIds = CohortUtils.setSubtract(
flagCohortIds,
availableCohortIds,
);
if (unavailableCohortIds.size > 0) {
this.logger.error(
`[Experiment] Flag ${key} has cohort ids ${[
...unavailableCohortIds,
]} unavailable, evaluation may be incorrect`,
);
return false;
}
}
return true;
}

protected enrichUserWithCohorts(
user: ExperimentUser,
flags: Record<string, EvaluationFlag>,
): void {
const cohortIdsByFlag: Record<string, Set<string>> = {};
const cohortIdsByGroup = {};
for (const key in flags) {
const cohortIdsByGroupOfFlag =
CohortUtils.extractCohortIdsByGroupFromFlag(flags[key]);

CohortUtils.mergeValuesOfBIntoValuesOfA(
cohortIdsByGroup,
cohortIdsByGroupOfFlag,
);

cohortIdsByFlag[key] = CohortUtils.mergeAllValues(cohortIdsByGroupOfFlag);
}

this.checkFlagsCohortsAvailable(cohortIdsByFlag);

// Enrich cohorts with user group type.
const userCohortIds = cohortIdsByGroup[USER_GROUP_TYPE];
if (user.user_id && userCohortIds && userCohortIds.size != 0) {
user.cohort_ids = Array.from(
this.cohortStorage.getCohortsForUser(user.user_id, userCohortIds),
);
}

// Enrich other group types for this user.
if (user.groups) {
for (const groupType in user.groups) {
const groupNames = user.groups[groupType];
if (groupNames.length == 0) {
continue;
}
const groupName = groupNames[0];

const cohortIds = cohortIdsByGroup[groupType];
if (!cohortIds || cohortIds.size == 0) {
continue;
}

if (!user.group_cohort_ids) {
user.group_cohort_ids = {};
}
if (!(groupType in user.group_cohort_ids)) {
user.group_cohort_ids[groupType] = {};
}
user.group_cohort_ids[groupType][groupName] = Array.from(
this.cohortStorage.getCohortsForGroup(
groupType,
groupName,
cohortIds,
),
);
}
}
}

/**
* Locally evaluates flag variants for a user.
*
Expand Down Expand Up @@ -184,7 +306,8 @@ export class LocalEvaluationClient {
* Calling this function while the poller is already running does nothing.
*/
public async start(): Promise<void> {
return await this.updater.start();
await this.updater.start();
await this.cohortUpdater?.start();
}

/**
Expand All @@ -193,6 +316,7 @@ export class LocalEvaluationClient {
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
return this.updater.stop();
this.updater.stop();
this.cohortUpdater?.stop();
}
}
97 changes: 97 additions & 0 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { HttpClient } from '@amplitude/experiment-core';

import { Cohort } from '../../types/cohort';

export type GetCohortOptions = {
libraryName: string;
libraryVersion: string;
cohortId: string;
maxCohortSize: number;
lastModified?: number;
timeoutMillis?: number;
};

export interface CohortApi {
/**
* Calls /sdk/v1/cohort/<cohortId> with query params maxCohortSize and lastModified if specified.
* Returns a promise that
* resolves to a
* Cohort if the cohort downloads successfully or
* undefined if cohort has no change since lastModified timestamp and
* throws an error if download failed.
* @param options
*/
getCohort(options?: GetCohortOptions): Promise<Cohort>;
}

export class CohortClientRequestError extends Error {} // 4xx errors except 429
export class CohortMaxSizeExceededError extends CohortClientRequestError {} // 413 error
export class CohortDownloadError extends Error {} // All other errors

export class SdkCohortApi implements CohortApi {
private readonly cohortApiKey;
private readonly serverUrl;
private readonly httpClient;

constructor(cohortApiKey: string, serverUrl: string, httpClient: HttpClient) {
this.cohortApiKey = cohortApiKey;
this.serverUrl = serverUrl;
this.httpClient = httpClient;
}

public async getCohort(
options?: GetCohortOptions,
): Promise<Cohort | undefined> {
const headers: Record<string, string> = {
Authorization: `Basic ${this.cohortApiKey}`,
};
if (options?.libraryName && options?.libraryVersion) {
headers[
'X-Amp-Exp-Library'
] = `${options.libraryName}/${options.libraryVersion}`;
}

const reqUrl = `${this.serverUrl}/sdk/v1/cohort/${
options.cohortId
}?maxCohortSize=${options.maxCohortSize}${
options.lastModified ? `&lastModified=${options.lastModified}` : ''
}`;
const response = await this.httpClient.request({
requestUrl: reqUrl,
method: 'GET',
headers: headers,
timeoutMillis: options?.timeoutMillis,
});

// Check status code.
// 200: download success.
// 204: no change.
// 413: cohort larger than maxCohortSize
if (response.status == 200) {
const cohort: Cohort = JSON.parse(response.body) as Cohort;
if (Array.isArray(cohort.memberIds)) {
cohort.memberIds = new Set<string>(cohort.memberIds);
}
return cohort;
} else if (response.status == 204) {
return undefined;
} else if (response.status == 413) {
throw new CohortMaxSizeExceededError(
`Cohort size > ${options.maxCohortSize}`,
);
} else if (
400 <= response.status &&
response.status < 500 &&
response.status != 429
) {
// Any 4xx other than 429.
throw new CohortClientRequestError(
`Cohort client error response status ${response.status}, body ${response.body}`,
);
} else {
throw new CohortDownloadError(
`Cohort error response status ${response.status}, body ${response.body}`,
);
}
}
}
Loading

0 comments on commit 74221bf

Please sign in to comment.