Skip to content

Commit

Permalink
[8.x] Hook up discovery service to Task Manager health (elastic#194113)…
Browse files Browse the repository at this point in the history
… (elastic#194685)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Hook up discovery service to Task Manager health
(elastic#194113)](elastic#194113)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-10-02T11:19:06Z","message":"Hook
up discovery service to Task Manager health (elastic#194113)\n\nResolves
https://github.com/elastic/kibana/issues/192568\r\n\r\nIn this PR, I'm
solving the issue where the task manager health API is\r\nunable to
determine how many Kibana nodes are running. I'm doing so
by\r\nleveraging the Kibana discovery service to get a count instead
of\r\ncalculating it based on an aggregation on the
`.kibana_task_manager`\r\nindex where we count the unique number of
`ownerId`, which requires\r\ntasks to be running and a sufficient
distribution across the Kibana\r\nnodes to determine the number
properly.\r\n\r\nNote: This will only work when mget is the task claim
strategy\r\n\r\n## To verify\r\n1. Set
`xpack.task_manager.claim_strategy: mget` in kibana.yml\r\n2. Startup
the PR locally with Elasticsearch and Kibana running\r\n3. Navigate to
the `/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is `1`\r\n4. Apply the following
code and restart Kibana\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\nindex
090847032bf..69dfb6d1b36 100644\r\n---
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n+++
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n@@
-59,6 +59,7 @@ export class KibanaDiscoveryService {\r\n const lastSeen
= lastSeenDate.toISOString();\r\n try {\r\n await
this.upsertCurrentNode({ id: this.currentNode, lastSeen });\r\n+ await
this.upsertCurrentNode({ id: `${this.currentNode}-2`, lastSeen });\r\n
if (!this.started) {\r\n this.logger.info('Kibana Discovery Service has
been started');\r\n this.started = true;\r\n```\r\n5. Navigate to the
`/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is
`2`\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"d0d2032f18a37e4c458a26d92092665453b737b0","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","ci:cloud-deploy","v8.16.0"],"title":"Hook
up discovery service to Task Manager
health","number":194113,"url":"https://github.com/elastic/kibana/pull/194113","mergeCommit":{"message":"Hook
up discovery service to Task Manager health (elastic#194113)\n\nResolves
https://github.com/elastic/kibana/issues/192568\r\n\r\nIn this PR, I'm
solving the issue where the task manager health API is\r\nunable to
determine how many Kibana nodes are running. I'm doing so
by\r\nleveraging the Kibana discovery service to get a count instead
of\r\ncalculating it based on an aggregation on the
`.kibana_task_manager`\r\nindex where we count the unique number of
`ownerId`, which requires\r\ntasks to be running and a sufficient
distribution across the Kibana\r\nnodes to determine the number
properly.\r\n\r\nNote: This will only work when mget is the task claim
strategy\r\n\r\n## To verify\r\n1. Set
`xpack.task_manager.claim_strategy: mget` in kibana.yml\r\n2. Startup
the PR locally with Elasticsearch and Kibana running\r\n3. Navigate to
the `/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is `1`\r\n4. Apply the following
code and restart Kibana\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\nindex
090847032bf..69dfb6d1b36 100644\r\n---
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n+++
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n@@
-59,6 +59,7 @@ export class KibanaDiscoveryService {\r\n const lastSeen
= lastSeenDate.toISOString();\r\n try {\r\n await
this.upsertCurrentNode({ id: this.currentNode, lastSeen });\r\n+ await
this.upsertCurrentNode({ id: `${this.currentNode}-2`, lastSeen });\r\n
if (!this.started) {\r\n this.logger.info('Kibana Discovery Service has
been started');\r\n this.started = true;\r\n```\r\n5. Navigate to the
`/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is
`2`\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"d0d2032f18a37e4c458a26d92092665453b737b0"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/194113","number":194113,"mergeCommit":{"message":"Hook
up discovery service to Task Manager health (elastic#194113)\n\nResolves
https://github.com/elastic/kibana/issues/192568\r\n\r\nIn this PR, I'm
solving the issue where the task manager health API is\r\nunable to
determine how many Kibana nodes are running. I'm doing so
by\r\nleveraging the Kibana discovery service to get a count instead
of\r\ncalculating it based on an aggregation on the
`.kibana_task_manager`\r\nindex where we count the unique number of
`ownerId`, which requires\r\ntasks to be running and a sufficient
distribution across the Kibana\r\nnodes to determine the number
properly.\r\n\r\nNote: This will only work when mget is the task claim
strategy\r\n\r\n## To verify\r\n1. Set
`xpack.task_manager.claim_strategy: mget` in kibana.yml\r\n2. Startup
the PR locally with Elasticsearch and Kibana running\r\n3. Navigate to
the `/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is `1`\r\n4. Apply the following
code and restart Kibana\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\nindex
090847032bf..69dfb6d1b36 100644\r\n---
a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n+++
b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts\r\n@@
-59,6 +59,7 @@ export class KibanaDiscoveryService {\r\n const lastSeen
= lastSeenDate.toISOString();\r\n try {\r\n await
this.upsertCurrentNode({ id: this.currentNode, lastSeen });\r\n+ await
this.upsertCurrentNode({ id: `${this.currentNode}-2`, lastSeen });\r\n
if (!this.started) {\r\n this.logger.info('Kibana Discovery Service has
been started');\r\n this.started = true;\r\n```\r\n5. Navigate to the
`/api/task_manager/_health` route and
confirm\r\n`observed_kibana_instances` is
`2`\r\n\r\n---------\r\n\r\nCo-authored-by: Elastic Machine
<[email protected]>","sha":"d0d2032f18a37e4c458a26d92092665453b737b0"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Mike Côté <[email protected]>
  • Loading branch information
kibanamachine and mikecote authored Oct 2, 2024
1 parent 9f500cd commit c24fee0
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository.find.mockResolvedValueOnce(createFindResponse(mockActiveNodes));

it('returns the active kibana nodes', async () => {
const onNodesCounted = jest.fn();
const kibanaDiscoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
Expand All @@ -254,6 +255,7 @@ describe('KibanaDiscoveryService', () => {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
onNodesCounted,
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();
Expand All @@ -265,6 +267,7 @@ describe('KibanaDiscoveryService', () => {
type: BACKGROUND_TASK_NODE_SO_NAME,
});
expect(activeNodes).toEqual(mockActiveNodes);
expect(onNodesCounted).toHaveBeenCalledWith(mockActiveNodes.length);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ interface DiscoveryServiceParams {
currentNode: string;
savedObjectsRepository: ISavedObjectsRepository;
logger: Logger;
onNodesCounted?: (numOfNodes: number) => void;
}

interface DiscoveryServiceUpsertParams {
Expand All @@ -34,13 +35,15 @@ export class KibanaDiscoveryService {
private logger: Logger;
private stopped = false;
private timer: NodeJS.Timeout | undefined;
private onNodesCounted?: (numOfNodes: number) => void;

constructor({ config, currentNode, savedObjectsRepository, logger }: DiscoveryServiceParams) {
this.activeNodesLookBack = config.active_nodes_lookback;
this.discoveryInterval = config.interval;
this.savedObjectsRepository = savedObjectsRepository;
this.logger = logger;
this.currentNode = currentNode;
constructor(opts: DiscoveryServiceParams) {
this.activeNodesLookBack = opts.config.active_nodes_lookback;
this.discoveryInterval = opts.config.interval;
this.savedObjectsRepository = opts.savedObjectsRepository;
this.logger = opts.logger;
this.currentNode = opts.currentNode;
this.onNodesCounted = opts.onNodesCounted;
}

private async upsertCurrentNode({ id, lastSeen }: DiscoveryServiceUpsertParams) {
Expand Down Expand Up @@ -106,6 +109,10 @@ export class KibanaDiscoveryService {
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}`,
});

if (this.onNodesCounted) {
this.onNodesCounted(activeNodes.length);
}

return activeNodes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -119,7 +120,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -158,7 +160,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -214,7 +217,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -271,7 +275,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down Expand Up @@ -327,7 +332,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
3
).value.observed
).toMatchObject({
observed_kibana_instances: 3,
Expand Down Expand Up @@ -396,7 +402,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
2
).value.observed
).toMatchObject({
observed_kibana_instances: provisionedKibanaInstances,
Expand Down Expand Up @@ -477,7 +484,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
2
).value
).toMatchObject({
observed: {
Expand Down Expand Up @@ -561,7 +569,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -626,7 +635,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -691,7 +701,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -755,7 +766,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -831,7 +843,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
)
).toMatchObject({
status: 'OK',
Expand Down Expand Up @@ -905,7 +918,8 @@ describe('estimateCapacity', () => {
result_frequency_percent_as_number: {},
},
}
)
),
1
).value.observed
).toMatchObject({
observed_kibana_instances: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ function isCapacityEstimationParams(

export function estimateCapacity(
logger: Logger,
capacityStats: CapacityEstimationParams
capacityStats: CapacityEstimationParams,
assumedKibanaInstances: number
): RawMonitoredStat<CapacityEstimationStat> {
const workload = capacityStats.workload.value;
// if there are no active owners right now, assume there's at least 1
const assumedKibanaInstances = Math.max(workload.owner_ids, 1);

const {
load: { p90: averageLoadPercentage },
Expand Down Expand Up @@ -262,12 +261,13 @@ function getHealthStatus(

export function withCapacityEstimate(
logger: Logger,
monitoredStats: RawMonitoringStats['stats']
monitoredStats: RawMonitoringStats['stats'],
assumedKibanaInstances: number
): RawMonitoringStats['stats'] {
if (isCapacityEstimationParams(monitoredStats)) {
return {
...monitoredStats,
capacity_estimation: estimateCapacity(logger, monitoredStats),
capacity_estimation: estimateCapacity(logger, monitoredStats, assumedKibanaInstances),
};
}
return monitoredStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,42 +158,47 @@ export function summarizeMonitoringStats(
last_update,
stats: { runtime, workload, configuration, ephemeral, utilization },
}: MonitoringStats,
config: TaskManagerConfig
config: TaskManagerConfig,
assumedKibanaInstances: number
): RawMonitoringStats {
const summarizedStats = withCapacityEstimate(logger, {
...(configuration
? {
configuration: {
...configuration,
status: HealthStatus.OK,
},
}
: {}),
...(runtime
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
...(workload
? {
workload: {
timestamp: workload.timestamp,
...summarizeWorkloadStat(workload.value),
},
}
: {}),
...(ephemeral
? {
ephemeral: {
timestamp: ephemeral.timestamp,
...summarizeEphemeralStat(ephemeral.value),
},
}
: {}),
});
const summarizedStats = withCapacityEstimate(
logger,
{
...(configuration
? {
configuration: {
...configuration,
status: HealthStatus.OK,
},
}
: {}),
...(runtime
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
...(workload
? {
workload: {
timestamp: workload.timestamp,
...summarizeWorkloadStat(workload.value),
},
}
: {}),
...(ephemeral
? {
ephemeral: {
timestamp: ephemeral.timestamp,
...summarizeEphemeralStat(ephemeral.value),
},
}
: {}),
},
assumedKibanaInstances
);

return {
last_update,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ describe('Workload Statistics Aggregator', () => {
missing: { field: 'task.schedule.interval' },
aggs: { taskType: { terms: { size: 3, field: 'task.taskType' } } },
},
ownerIds: {
filter: { range: { 'task.startedAt': { gte: 'now-1w/w' } } },
aggs: { ownerIds: { cardinality: { field: 'task.ownerId' } } },
},
idleTasks: {
filter: { term: { 'task.status': 'idle' } },
aggs: {
Expand Down
Loading

0 comments on commit c24fee0

Please sign in to comment.