Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Task to publish Agent metrics #168435

Merged
merged 27 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aa8f2a6
fleet metrics task
juliaElastic Oct 10, 2023
0627b36
added upgrading_step aggregation
juliaElastic Oct 10, 2023
bf746e9
[CI] Auto-commit changed files from 'node scripts/lint_ts_projects --…
kibanamachine Oct 10, 2023
c302b2f
fixed test
juliaElastic Oct 10, 2023
82f89b9
Merge branch 'main' into fleet-metrics
juliaElastic Oct 10, 2023
6ac0546
calculating unhealthy_reason
juliaElastic Oct 10, 2023
a6345e9
enabled write to es, populating generic fields
juliaElastic Oct 11, 2023
0389572
fix import
juliaElastic Oct 11, 2023
3fe4073
fixed mock
juliaElastic Oct 11, 2023
7e3ce02
Merge branch 'main' into fleet-metrics
juliaElastic Oct 11, 2023
df49e79
added unit tests
juliaElastic Oct 11, 2023
29e8671
removed unhealthy_reason for now
juliaElastic Oct 11, 2023
fbd503b
Merge branch 'main' into fleet-metrics
juliaElastic Oct 12, 2023
aaa9b66
changed to agent fields
juliaElastic Oct 12, 2023
5ed395c
retry, using bulk
juliaElastic Oct 12, 2023
725a688
added test for bulk error
juliaElastic Oct 12, 2023
d63f3df
added fleet_server package to serverless, allow to be installed
juliaElastic Oct 12, 2023
b1c3240
updated task version to 1.0.0
juliaElastic Oct 12, 2023
c4ba9d2
Merge branch 'main' into fleet-metrics
juliaElastic Oct 16, 2023
93a6e7e
Merge branch 'main' into fleet-metrics
juliaElastic Oct 17, 2023
eeae806
fixed test
juliaElastic Oct 17, 2023
e866a71
Merge branch 'main' into fleet-metrics
juliaElastic Oct 17, 2023
b038d5d
adding fleet_server package in oblt and security projects only
juliaElastic Oct 17, 2023
f1de97b
disable fleet task in functional tests
juliaElastic Oct 17, 2023
8d439cc
disable fleet task
juliaElastic Oct 17, 2023
2af5949
Merge branch 'main' into fleet-metrics
juliaElastic Oct 18, 2023
b0add0c
revert serverless disable task, skip failing test
juliaElastic Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions x-pack/plugins/fleet/server/collectors/agent_collectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ export const getAgentUsage = async (
};
};

export interface AgentPerVersion {
version: string;
count: number;
}

export interface AgentData {
agents_per_version: Array<
{
version: string;
count: number;
} & AgentStatus
>;
agents_per_version: Array<AgentPerVersion & AgentStatus>;
agent_checkin_status: {
error: number;
degraded: number;
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugins/fleet/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ import { FleetActionsClient, type FleetActionsClientInterface } from './services
import type { FilesClientFactory } from './services/files/types';
import { PolicyWatcher } from './services/agent_policy_watch';
import { getPackageSpecTagId } from './services/epm/kibana/assets/tag_assets';
import { FleetMetricsTask } from './services/metrics/fleet_metrics_task';
import { fetchAgentMetrics } from './services/metrics/fetch_agent_metrics';

export interface FleetSetupDeps {
security: SecurityPluginSetup;
Expand Down Expand Up @@ -259,6 +261,7 @@ export class FleetPlugin
private bulkActionsResolver?: BulkActionsResolver;
private fleetUsageSender?: FleetUsageSender;
private checkDeletedFilesTask?: CheckDeletedFilesTask;
private fleetMetricsTask?: FleetMetricsTask;

private agentService?: AgentService;
private packageService?: PackageService;
Expand Down Expand Up @@ -440,6 +443,10 @@ export class FleetPlugin
this.fleetUsageSender = new FleetUsageSender(deps.taskManager, core, fetch);
registerFleetUsageLogger(deps.taskManager, async () => fetchAgentsUsage(core, config));

const fetchAgents = async (abortController: AbortController) =>
await fetchAgentMetrics(core, abortController);
this.fleetMetricsTask = new FleetMetricsTask(deps.taskManager, fetchAgents);

const router: FleetRouter = core.http.createRouter<FleetRequestHandlerContext>();
// Allow read-only users access to endpoints necessary for Integrations UI
// Only some endpoints require superuser so we pass a raw IRouter here
Expand Down Expand Up @@ -504,6 +511,7 @@ export class FleetPlugin
this.fleetUsageSender?.start(plugins.taskManager);
this.checkDeletedFilesTask?.start({ taskManager: plugins.taskManager });
startFleetUsageLogger(plugins.taskManager);
this.fleetMetricsTask?.start(plugins.taskManager, core.elasticsearch.client.asInternalUser);

const logger = appContextService.getLogger();

Expand Down
260 changes: 260 additions & 0 deletions x-pack/plugins/fleet/server/services/metrics/fetch_agent_metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { CoreSetup } from '@kbn/core-lifecycle-server';

import { AGENTS_INDEX } from '../../../common';

import type { AgentPerVersion, AgentUsage } from '../../collectors/agent_collectors';
import { getAgentUsage } from '../../collectors/agent_collectors';
import { getInternalClients } from '../../collectors/helpers';
import { appContextService } from '../app_context';

export interface AgentMetrics {
agents: AgentUsage;
agents_per_version: AgentPerVersion[];
upgrading_step: UpgradingSteps;
unhealthy_reason: UnhealthyReason;
}

export interface UpgradingSteps {
requested: number;
scheduled: number;
downloading: number;
extracting: number;
replacing: number;
restarting: number;
watching: number;
rollback: number;
failed: number;
}

export interface UnhealthyReason {
input: number;
output: number;
other: number;
}

export const fetchAgentMetrics = async (
core: CoreSetup,
abortController: AbortController
): Promise<AgentMetrics | undefined> => {
const [soClient, esClient] = await getInternalClients(core);
if (!soClient || !esClient) {
return;
}
const usage = {
agents: await getAgentUsage(soClient, esClient),
agents_per_version: await getAgentsPerVersion(esClient, abortController),
upgrading_step: await getUpgradingSteps(esClient, abortController),
unhealthy_reason: await getUnhealthyReason(esClient, abortController),
};
return usage;
};

export const getAgentsPerVersion = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<AgentPerVersion[]> => {
try {
const response = await esClient.search(
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
{
index: AGENTS_INDEX,
query: {
bool: {
filter: [
{
term: {
active: 'true',
},
},
],
},
},
size: 0,
aggs: {
versions: {
terms: { field: 'agent.version' },
},
},
},
{ signal: abortController.signal }
);
return ((response?.aggregations?.versions as any).buckets ?? []).map((bucket: any) => ({
version: bucket.key,
count: bucket.doc_count,
}));
} catch (error) {
if (error.statusCode === 404) {
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth logging at another level so we can see it in serverless dashboards, etc? Not sure how common this is or if it's helpful in production debugging to know when we're swallowing these errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to avoid logging too much, as this task runs every minute. I think we can leave as debug for now and change later if needed. Probably we would realize anyway if there are no agents.

} else {
throw error;
}
return [];
}
};

export const getUpgradingSteps = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<UpgradingSteps> => {
const upgradingSteps = {
requested: 0,
scheduled: 0,
downloading: 0,
extracting: 0,
replacing: 0,
restarting: 0,
watching: 0,
rollback: 0,
failed: 0,
};
try {
const response = await esClient.search(
{
index: AGENTS_INDEX,
query: {
bool: {
filter: [
{
term: {
active: 'true',
},
},
],
},
},
size: 0,
aggs: {
upgrade_details: {
terms: { field: 'upgrade_details.state' },
},
},
},
{ signal: abortController.signal }
);
((response?.aggregations?.upgrade_details as any).buckets ?? []).forEach((bucket: any) => {
switch (bucket.key) {
case 'UPG_REQUESTED':
upgradingSteps.requested = bucket.doc_count;
break;
case 'UPG_SCHEDULED':
upgradingSteps.scheduled = bucket.doc_count;
break;
case 'UPG_DOWNLOADING':
upgradingSteps.downloading = bucket.doc_count;
break;
case 'UPG_EXTRACTING':
upgradingSteps.extracting = bucket.doc_count;
break;
case 'UPG_REPLACING':
upgradingSteps.replacing = bucket.doc_count;
break;
case 'UPG_RESTARTING':
upgradingSteps.restarting = bucket.doc_count;
break;
case 'UPG_WATCHING':
upgradingSteps.watching = bucket.doc_count;
break;
case 'UPG_ROLLBACK':
upgradingSteps.rollback = bucket.doc_count;
break;
case 'UPG_FAILED':
upgradingSteps.failed = bucket.doc_count;
break;
default:
break;
}
});
return upgradingSteps;
} catch (error) {
if (error.statusCode === 404) {
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
} else {
throw error;
}
return upgradingSteps;
}
};

export const getUnhealthyReason = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<UnhealthyReason> => {
const unhealthyReason = {
input: 0,
output: 0,
other: 0,
};
try {
const response = await esClient.search(
{
index: AGENTS_INDEX,
query: {
bool: {
filter: [
{
term: {
active: 'true',
},
},
{
terms: {
last_checkin_status: ['error', 'degraded'],
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
},
},
],
},
},
size: 10000,
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
_source: ['components'],
},
{ signal: abortController.signal }
);

response.hits.hits.forEach((hit: any) => {
// counting agent as other unhealthy reason if it doesn't have a component unit unhealthy
if (!hit._source.components || hit._source.components.length === 0) {
unhealthyReason.other++;
return;
}
// considering component unhealthy if not healthy
const hasUnhealthyUnit = (type: string) => {
const unhealthyComponent = hit._source.components.find(
(comp: any) => comp.status !== 'HEALTHY'
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
);
if (!unhealthyComponent) return false;
const unhealthyUnit = unhealthyComponent.units.find(
(unit: any) => unit.type === type && unit.status !== 'HEALTHY'
);
return unhealthyUnit !== undefined;
};
const hasUnhealthyInput = hasUnhealthyUnit('input');
// counting agents in both input and output unhealthy_reason if they have unhealthy component units in both
if (hasUnhealthyInput) {
unhealthyReason.input++;
}
const hasUnhealthyOutput = hasUnhealthyUnit('output');
if (hasUnhealthyOutput) {
unhealthyReason.output++;
}
if (!hasUnhealthyInput && !hasUnhealthyOutput) {
unhealthyReason.other++;
}
});

return unhealthyReason;
} catch (error) {
if (error.statusCode === 404) {
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
} else {
throw error;
}
return unhealthyReason;
}
};
Loading