diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index 2289c00b6405e..cd37c6661ec00 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -201,26 +201,104 @@ describe('Workload Statistics Aggregator', () => { }); }); - const mockAggregatedResult = () => - asApiResponse({ - hits: { hits: [], max_score: 0, total: { value: 4, relation: 'eq' } }, - took: 1, - timed_out: false, - _shards: { total: 1, successful: 1, skipped: 1, failed: 0 }, - aggregations: { - schedule: { + const mockResult = (overrides = {}): ResponseWithAggs => ({ + hits: { hits: [], max_score: 0, total: { value: 4, relation: 'eq' } }, + took: 1, + timed_out: false, + _shards: { total: 1, successful: 1, skipped: 1, failed: 0 }, + aggregations: { + schedule: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [ + { key: '3600s', doc_count: 1 }, + { key: '60s', doc_count: 1 }, + { key: '720m', doc_count: 1 }, + ], + }, + taskType: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [ + { + key: 'foo', + doc_count: 2, + status: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [{ key: 'idle', doc_count: 2 }], + }, + }, + { + key: 'bar', + doc_count: 1, + status: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [{ key: 'idle', doc_count: 1 }], + }, + }, + { + key: 'report', + doc_count: 1, + status: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [{ key: 'idle', doc_count: 1 }], + }, + }, + ], + }, + nonRecurringTasks: { + doc_count: 1, + taskType: { + buckets: [{ key: 'report', doc_count: 1 }], doc_count_error_upper_bound: 0, sum_other_doc_count: 0, + }, + }, + ownerIds: { ownerIds: { value: 1 } }, + // The `FiltersAggregate` doesn't cover the case of a nested `AggregationsAggregationContainer`, in which `FiltersAggregate` + // would not have a `buckets` property, but rather a keyed property that's inferred from the request. + // @ts-expect-error + idleTasks: { + doc_count: 3, + overdue: { + doc_count: 2, + nonRecurring: { doc_count: 1 }, + taskTypes: { + buckets: [{ key: 'foo', doc_count: 1 }], + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + }, + }, + scheduleDensity: { buckets: [ - { key: '3600s', doc_count: 1 }, - { key: '60s', doc_count: 1 }, - { key: '720m', doc_count: 1 }, + mockHistogram(0, 7 * 3000 + 500, 60 * 1000, 3000, [2, 2, 5, 0, 0, 0, 0, 0, 0, 1]), ], }, + }, + ...overrides, + }, + }); + + const mockAggregatedResult = () => asApiResponse(mockResult()); + const mockAggregatedResultWithUnknownTaskType = () => + asApiResponse( + mockResult({ taskType: { doc_count_error_upper_bound: 0, sum_other_doc_count: 0, buckets: [ + { + key: 'unknownType', + doc_count: 1, + status: { + doc_count_error_upper_bound: 0, + sum_other_doc_count: 0, + buckets: [{ key: 'idle', doc_count: 1 }], + }, + }, { key: 'foo', doc_count: 2, @@ -250,37 +328,8 @@ describe('Workload Statistics Aggregator', () => { }, ], }, - nonRecurringTasks: { - doc_count: 1, - taskType: { - buckets: [{ key: 'report', doc_count: 1 }], - doc_count_error_upper_bound: 0, - sum_other_doc_count: 0, - }, - }, - ownerIds: { ownerIds: { value: 1 } }, - // The `FiltersAggregate` doesn't cover the case of a nested `AggregationsAggregationContainer`, in which `FiltersAggregate` - // would not have a `buckets` property, but rather a keyed property that's inferred from the request. - // @ts-expect-error - idleTasks: { - doc_count: 3, - overdue: { - doc_count: 2, - nonRecurring: { doc_count: 1 }, - taskTypes: { - buckets: [{ key: 'foo', doc_count: 1 }], - doc_count_error_upper_bound: 0, - sum_other_doc_count: 0, - }, - }, - scheduleDensity: { - buckets: [ - mockHistogram(0, 7 * 3000 + 500, 60 * 1000, 3000, [2, 2, 5, 0, 0, 0, 0, 0, 0, 1]), - ], - }, - }, - }, - }); + }) + ); test('returns a summary of the workload by task type', async () => { const taskStore = taskStoreMock.create({}); @@ -312,6 +361,36 @@ describe('Workload Statistics Aggregator', () => { }); }); + test('excludes unregistered task types from the summary', async () => { + const taskStore = taskStoreMock.create({}); + taskStore.aggregate.mockResolvedValue(mockAggregatedResultWithUnknownTaskType()); + + const workloadAggregator = createWorkloadAggregator({ + taskStore, + elasticsearchAndSOAvailability$: of(true), + refreshInterval: 10, + pollInterval: 3000, + logger, + taskDefinitions: definitions, + }); + + return new Promise((resolve) => { + workloadAggregator.pipe(first()).subscribe((result) => { + expect(result.key).toEqual('workload'); + expect(result.value).toMatchObject({ + count: 4, + cost: 15, + task_types: { + foo: { count: 2, cost: 4, status: { idle: 2 } }, + bar: { count: 1, cost: 1, status: { idle: 1 } }, + report: { count: 1, cost: 10, status: { idle: 1 } }, + }, + }); + resolve(); + }); + }); + }); + test('skips summary of the workload when services are unavailable', async () => { const taskStore = taskStoreMock.create({}); taskStore.aggregate.mockResolvedValue(mockAggregatedResult()); diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index e437b420c04f5..228ae207d2b8d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -273,16 +273,22 @@ export function createWorkloadAggregator({ let totalCost = 0; const taskTypeSummary = taskTypes.reduce((acc, bucket) => { const value = bucket as TaskTypeWithStatusBucket; - const cost = - value.doc_count * taskDefinitions.get(value.key as string)?.cost ?? TaskCost.Normal; - totalCost += cost; - return Object.assign(acc, { - [value.key as string]: { - count: value.doc_count, - cost, - status: mapValues(keyBy(value.status.buckets, 'key'), 'doc_count'), - }, - }); + try { + const taskDef = taskDefinitions.get(value.key as string); + const cost = value.doc_count * taskDef?.cost ?? TaskCost.Normal; + + totalCost += cost; + return Object.assign(acc, { + [value.key as string]: { + count: value.doc_count, + cost, + status: mapValues(keyBy(value.status.buckets, 'key'), 'doc_count'), + }, + }); + } catch (err) { + // task type is not registered with dictionary, do not add to summary + return acc; + } }, {}); const summary: WorkloadStat = { @@ -550,7 +556,12 @@ interface DateRangeBucket { function getTotalCost(taskTypeBuckets: TaskTypeBucket[], definitions: TaskTypeDictionary): number { let cost = 0; for (const bucket of taskTypeBuckets) { - cost += bucket.doc_count * definitions.get(bucket.key as string)?.cost ?? TaskCost.Normal; + try { + const taskDef = definitions.get(bucket.key as string); + cost += bucket.doc_count * taskDef?.cost ?? TaskCost.Normal; + } catch (err) { + // task type is not registered with dictionary, do not add to cost + } } return cost; }