Skip to content

Commit

Permalink
Log error on eval, dont init fail on if cohort fail, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Aug 1, 2024
1 parent a1267d9 commit d11f76e
Show file tree
Hide file tree
Showing 15 changed files with 1,010 additions and 792 deletions.
41 changes: 39 additions & 2 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class LocalEvaluationClient {
this.logger = new ConsoleLogger(this.config.debug);

this.cohortStorage = new InMemoryCohortStorage();
let cohortFetcher = undefined;
let cohortFetcher: CohortFetcher = undefined;
if (this.config.cohortConfig) {
cohortFetcher = new CohortFetcher(
this.config.cohortConfig.apiKey,
Expand All @@ -102,6 +102,7 @@ export class LocalEvaluationClient {
this.cohortUpdater = new CohortPoller(
cohortFetcher,
this.cohortStorage,
this.cache,
60000,
this.config.debug,
);
Expand Down Expand Up @@ -188,11 +189,47 @@ export class LocalEvaluationClient {
return evaluationVariantsToVariants(results);
}

protected checkFlagsCohortsAvailable(
cohortIdsByFlag: Record<string, Set<string>>,
): boolean {
const availableCohortIds = this.cohortStorage.getAllCohortIds();
for (const key in cohortIdsByFlag) {
const flagCohortIds = cohortIdsByFlag[key];
const unavailableCohortIds = CohortUtils.setSubtract(
flagCohortIds,
availableCohortIds,
);
if (unavailableCohortIds.size > 0) {
this.logger.error(
`[Experiment] Flag ${key} has cohort ids ${[
...unavailableCohortIds,
]} unavailable, evaluation may be incorrect`,
);
return false;
}
}
return true;
}

protected enrichUserWithCohorts(
user: ExperimentUser,
flags: Record<string, EvaluationFlag>,
): void {
const cohortIdsByGroup = CohortUtils.extractCohortIdsByGroup(flags);
const cohortIdsByFlag: Record<string, Set<string>> = {};
const cohortIdsByGroup = {};
for (const key in flags) {
const cohortIdsByGroupOfFlag =
CohortUtils.extractCohortIdsByGroupFromFlag(flags[key]);

CohortUtils.mergeValuesOfBIntoValuesOfA(
cohortIdsByGroup,
cohortIdsByGroupOfFlag,
);

cohortIdsByFlag[key] = CohortUtils.mergeAllValues(cohortIdsByGroupOfFlag);
}

this.checkFlagsCohortsAvailable(cohortIdsByFlag);

// Enrich cohorts with user group type.
const userCohortIds = cohortIdsByGroup[USER_GROUP_TYPE];
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ export class SdkCohortApi implements CohortApi {
return undefined;
} else if (response.status == 413) {
throw new CohortMaxSizeExceededError(
`Cohort error response: size > ${options.maxCohortSize}`,
`Cohort size > ${options.maxCohortSize}`,
);
} else {
throw new CohortDownloadError(
`Cohort error response: status ${response.status}, body ${response.body}`,
`Cohort error response status ${response.status}, body ${response.body}`,
);
}
}
Expand Down
10 changes: 9 additions & 1 deletion packages/node/src/local/cohort/poller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { CohortStorage } from '../../types/cohort';
import { FlagConfigCache } from '../../types/flag';
import { CohortUtils } from '../../util/cohort';
import { ConsoleLogger } from '../../util/logger';
import { Logger } from '../../util/logger';

Expand All @@ -10,18 +12,21 @@ export class CohortPoller implements CohortUpdater {

public readonly fetcher: CohortFetcher;
public readonly storage: CohortStorage;
public readonly flagCache: FlagConfigCache;

private poller: NodeJS.Timeout;
private pollingIntervalMillis: number;

constructor(
fetcher: CohortFetcher,
storage: CohortStorage,
flagCache: FlagConfigCache,
pollingIntervalMillis = 60000,
debug = false,
) {
this.fetcher = fetcher;
this.storage = storage;
this.flagCache = flagCache;
this.pollingIntervalMillis = pollingIntervalMillis;
this.logger = new ConsoleLogger(debug);
}
Expand Down Expand Up @@ -64,8 +69,11 @@ export class CohortPoller implements CohortUpdater {
): Promise<void> {
let changed = false;
const promises = [];
const cohortIds = CohortUtils.extractCohortIds(
await this.flagCache.getAll(),
);

for (const cohortId of this.storage.getAllCohortIds()) {
for (const cohortId of cohortIds) {
this.logger.debug(`[Experiment] updating cohort ${cohortId}`);

// Get existing cohort and lastModified.
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/local/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class FlagConfigPoller
async () => await this.fetcher.fetch(),
BACKOFF_POLICY,
);
await super._update(flagConfigs, true, onChange);
await super._update(flagConfigs, onChange);
} catch (e) {
this.logger.error(
'[Experiment] flag config initial poll failed, stopping',
Expand Down Expand Up @@ -95,6 +95,6 @@ export class FlagConfigPoller
): Promise<void> {
this.logger.debug('[Experiment] updating flag configs');
const flagConfigs = await this.fetcher.fetch();
await super._update(flagConfigs, false, onChange);
await super._update(flagConfigs, onChange);
}
}
4 changes: 2 additions & 2 deletions packages/node/src/local/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ export class FlagConfigStreamer

this.stream.onInitUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
await super._update(flagConfigs, true, onChange);
await super._update(flagConfigs, onChange);
this.logger.debug('[Experiment] streamer - start flags stream success');
};
this.stream.onUpdate = async (flagConfigs) => {
this.logger.debug('[Experiment] streamer - receives updates');
await super._update(flagConfigs, false, onChange);
await super._update(flagConfigs, onChange);
};

try {
Expand Down
62 changes: 9 additions & 53 deletions packages/node/src/local/updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ export class FlagConfigUpdaterBase {

protected async _update(
flagConfigs: Record<string, FlagConfig>,
isInit: boolean,
onChange?: (cache: FlagConfigCache) => Promise<void>,
): Promise<void> {
let changed = false;
Expand All @@ -66,30 +65,20 @@ export class FlagConfigUpdaterBase {
// Get all cohort needs update.
const cohortIds = CohortUtils.extractCohortIds(flagConfigs);
if (cohortIds && cohortIds.size > 0 && !this.cohortFetcher) {
throw Error(
'cohort found in flag configs but no cohort download configured',
this.logger.error(
'Cohorts found in flag configs but no cohort download configured',
);
} else {
// Download new cohorts into cohortStorage.
await this.downloadNewCohorts(cohortIds);
}

// Download new cohorts into cohortStorage.
const failedCohortIds = await this.downloadNewCohorts(cohortIds);
if (isInit && failedCohortIds.size > 0) {
throw Error('Cohort download failed');
}

// Update the flags that has all cohorts successfully updated into flags cache.
const newFlagConfigs = await this.filterFlagConfigsWithFullCohorts(
flagConfigs,
);

// Update the flags with new flags.
await this.cache.clear();
await this.cache.putAll(newFlagConfigs);
await this.cache.putAll(flagConfigs);

// Remove cohorts not used by new flags.
await this.removeUnusedCohorts(
CohortUtils.extractCohortIds(newFlagConfigs),
);
await this.removeUnusedCohorts(cohortIds);

if (changed) {
await onChange(this.cache);
Expand All @@ -111,8 +100,8 @@ export class FlagConfigUpdaterBase {
}
})
.catch((err) => {
this.logger.warn(
`[Experiment] Cohort download failed ${cohortId}, using existing cohort if exist`,
this.logger.error(
`[Experiment] Cohort download failed ${cohortId}`,
err,
);
failedCohortIds.add(cohortId);
Expand All @@ -122,39 +111,6 @@ export class FlagConfigUpdaterBase {
return failedCohortIds;
}

protected async filterFlagConfigsWithFullCohorts(
flagConfigs: Record<string, FlagConfig>,
): Promise<Record<string, FlagConfig>> {
const newFlagConfigs = {};
const availableCohortIds = this.cohortStorage.getAllCohortIds();
for (const flagKey in flagConfigs) {
// Get cohorts for this flag.
const cohortIds = CohortUtils.extractCohortIdsFromFlag(
flagConfigs[flagKey],
);

// Check if all cohorts for this flag has downloaded.
// If any cohort failed, don't use the new flag.
const updateFlag =
cohortIds.size === 0 ||
CohortUtils.setSubtract(cohortIds, availableCohortIds).size === 0;

if (updateFlag) {
newFlagConfigs[flagKey] = flagConfigs[flagKey];
} else {
this.logger.warn(
`[Experiment] Flag ${flagKey} failed to update due to cohort update failure`,
);
const existingFlag = await this.cache.get(flagKey);
if (existingFlag) {
newFlagConfigs[flagKey] = existingFlag;
}
}
}

return newFlagConfigs;
}

protected async removeUnusedCohorts(
validCohortIds: Set<string>,
): Promise<void> {
Expand Down
23 changes: 4 additions & 19 deletions packages/node/src/util/cohort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,13 @@ export class CohortUtils {
public static extractCohortIds(
flagConfigs: Record<string, FlagConfig>,
): Set<string> {
return CohortUtils.mergeAllValues(
CohortUtils.extractCohortIdsByGroup(flagConfigs),
);
}

public static extractCohortIdsByGroup(
flagConfigs: Record<string, FlagConfig>,
): Record<string, Set<string>> {
const cohortIdsByGroup = {};
const cohortIdsByFlag = {};
for (const key in flagConfigs) {
CohortUtils.mergeBIntoA(
cohortIdsByGroup,
cohortIdsByFlag[key] = CohortUtils.mergeAllValues(
CohortUtils.extractCohortIdsByGroupFromFlag(flagConfigs[key]),
);
}
return cohortIdsByGroup;
}

public static extractCohortIdsFromFlag(flag: FlagConfig): Set<string> {
return CohortUtils.mergeAllValues(
CohortUtils.extractCohortIdsByGroupFromFlag(flag),
);
return CohortUtils.mergeAllValues(cohortIdsByFlag);
}

public static extractCohortIdsByGroupFromFlag(
Expand Down Expand Up @@ -85,7 +70,7 @@ export class CohortUtils {
return cohortIdsByGroup;
}

public static mergeBIntoA(
public static mergeValuesOfBIntoValuesOfA(
a: Record<string, Set<string>>,
b: Record<string, Set<string>>,
): void {
Expand Down
Loading

0 comments on commit d11f76e

Please sign in to comment.