Skip to content

Commit

Permalink
Not showing unrecognized task types in the health summary
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Jul 30, 2024
1 parent a9ae3e5 commit 58eb2b1
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,26 +201,104 @@ describe('Workload Statistics Aggregator', () => {
});
});

const mockAggregatedResult = () =>
asApiResponse({
hits: { hits: [], max_score: 0, total: { value: 4, relation: 'eq' } },
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 1, failed: 0 },
aggregations: {
schedule: {
const mockResult = (overrides = {}): ResponseWithAggs => ({
hits: { hits: [], max_score: 0, total: { value: 4, relation: 'eq' } },
took: 1,
timed_out: false,
_shards: { total: 1, successful: 1, skipped: 1, failed: 0 },
aggregations: {
schedule: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{ key: '3600s', doc_count: 1 },
{ key: '60s', doc_count: 1 },
{ key: '720m', doc_count: 1 },
],
},
taskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'foo',
doc_count: 2,
status: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [{ key: 'idle', doc_count: 2 }],
},
},
{
key: 'bar',
doc_count: 1,
status: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [{ key: 'idle', doc_count: 1 }],
},
},
{
key: 'report',
doc_count: 1,
status: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [{ key: 'idle', doc_count: 1 }],
},
},
],
},
nonRecurringTasks: {
doc_count: 1,
taskType: {
buckets: [{ key: 'report', doc_count: 1 }],
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
},
},
ownerIds: { ownerIds: { value: 1 } },
// The `FiltersAggregate` doesn't cover the case of a nested `AggregationsAggregationContainer`, in which `FiltersAggregate`
// would not have a `buckets` property, but rather a keyed property that's inferred from the request.
// @ts-expect-error
idleTasks: {
doc_count: 3,
overdue: {
doc_count: 2,
nonRecurring: { doc_count: 1 },
taskTypes: {
buckets: [{ key: 'foo', doc_count: 1 }],
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
},
},
scheduleDensity: {
buckets: [
{ key: '3600s', doc_count: 1 },
{ key: '60s', doc_count: 1 },
{ key: '720m', doc_count: 1 },
mockHistogram(0, 7 * 3000 + 500, 60 * 1000, 3000, [2, 2, 5, 0, 0, 0, 0, 0, 0, 1]),
],
},
},
...overrides,
},
});

const mockAggregatedResult = () => asApiResponse(mockResult());
const mockAggregatedResultWithUnknownTaskType = () =>
asApiResponse(
mockResult({
taskType: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [
{
key: 'unknownType',
doc_count: 1,
status: {
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
buckets: [{ key: 'idle', doc_count: 1 }],
},
},
{
key: 'foo',
doc_count: 2,
Expand Down Expand Up @@ -250,37 +328,8 @@ describe('Workload Statistics Aggregator', () => {
},
],
},
nonRecurringTasks: {
doc_count: 1,
taskType: {
buckets: [{ key: 'report', doc_count: 1 }],
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
},
},
ownerIds: { ownerIds: { value: 1 } },
// The `FiltersAggregate` doesn't cover the case of a nested `AggregationsAggregationContainer`, in which `FiltersAggregate`
// would not have a `buckets` property, but rather a keyed property that's inferred from the request.
// @ts-expect-error
idleTasks: {
doc_count: 3,
overdue: {
doc_count: 2,
nonRecurring: { doc_count: 1 },
taskTypes: {
buckets: [{ key: 'foo', doc_count: 1 }],
doc_count_error_upper_bound: 0,
sum_other_doc_count: 0,
},
},
scheduleDensity: {
buckets: [
mockHistogram(0, 7 * 3000 + 500, 60 * 1000, 3000, [2, 2, 5, 0, 0, 0, 0, 0, 0, 1]),
],
},
},
},
});
})
);

test('returns a summary of the workload by task type', async () => {
const taskStore = taskStoreMock.create({});
Expand Down Expand Up @@ -312,6 +361,36 @@ describe('Workload Statistics Aggregator', () => {
});
});

test('excludes unregistered task types from the summary', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResultWithUnknownTaskType());

const workloadAggregator = createWorkloadAggregator({
taskStore,
elasticsearchAndSOAvailability$: of(true),
refreshInterval: 10,
pollInterval: 3000,
logger,
taskDefinitions: definitions,
});

return new Promise<void>((resolve) => {
workloadAggregator.pipe(first()).subscribe((result) => {
expect(result.key).toEqual('workload');
expect(result.value).toMatchObject({
count: 4,
cost: 15,
task_types: {
foo: { count: 2, cost: 4, status: { idle: 2 } },
bar: { count: 1, cost: 1, status: { idle: 1 } },
report: { count: 1, cost: 10, status: { idle: 1 } },
},
});
resolve();
});
});
});

test('skips summary of the workload when services are unavailable', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,22 @@ export function createWorkloadAggregator({
let totalCost = 0;
const taskTypeSummary = taskTypes.reduce((acc, bucket) => {
const value = bucket as TaskTypeWithStatusBucket;
const cost =
value.doc_count * taskDefinitions.get(value.key as string)?.cost ?? TaskCost.Normal;
totalCost += cost;
return Object.assign(acc, {
[value.key as string]: {
count: value.doc_count,
cost,
status: mapValues(keyBy(value.status.buckets, 'key'), 'doc_count'),
},
});
try {
const taskDef = taskDefinitions.get(value.key as string);
const cost = value.doc_count * taskDef?.cost ?? TaskCost.Normal;

totalCost += cost;
return Object.assign(acc, {
[value.key as string]: {
count: value.doc_count,
cost,
status: mapValues(keyBy(value.status.buckets, 'key'), 'doc_count'),
},
});
} catch (err) {
// task type is not registered with dictionary, do not add to summary
return acc;
}
}, {});

const summary: WorkloadStat = {
Expand Down Expand Up @@ -550,7 +556,12 @@ interface DateRangeBucket {
function getTotalCost(taskTypeBuckets: TaskTypeBucket[], definitions: TaskTypeDictionary): number {
let cost = 0;
for (const bucket of taskTypeBuckets) {
cost += bucket.doc_count * definitions.get(bucket.key as string)?.cost ?? TaskCost.Normal;
try {
const taskDef = definitions.get(bucket.key as string);
cost += bucket.doc_count * taskDef?.cost ?? TaskCost.Normal;
} catch (err) {
// task type is not registered with dictionary, do not add to cost
}
}
return cost;
}

0 comments on commit 58eb2b1

Please sign in to comment.