Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] using point in time for agent status query to avoid discrepancy #135816

Merged
merged 5 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export const TableRowActions: React.FunctionComponent<{
onClick={(event) => {
onAddRemoveTagsClick((event.target as Element).closest('button')!);
}}
disabled={!agent.active}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small change to disable Add / remove tags action on inactive agents, like the other actions are disabled
image

key="addRemoveTags"
>
<FormattedMessage
Expand Down
121 changes: 25 additions & 96 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,104 +92,19 @@ export async function getAgents(esClient: ElasticsearchClient, options: GetAgent
return agents;
}

export async function getAgentsByKueryPit(
export async function openPointInTime(
esClient: ElasticsearchClient,
options: ListWithKuery & {
showInactive: boolean;
pitId: string;
searchAfter?: SortResults;
}
): Promise<{
agents: Agent[];
total: number;
page: number;
perPage: number;
}> {
const {
page = 1,
perPage = 20,
sortField = 'enrolled_at',
sortOrder = 'desc',
kuery,
showInactive = false,
showUpgradeable,
searchAfter,
} = options;
const { pitId } = options;
const filters = [];

if (kuery && kuery !== '') {
filters.push(kuery);
}

if (showInactive === false) {
filters.push(ACTIVE_AGENT_CONDITION);
}

const kueryNode = _joinFilters(filters);
const body = kueryNode ? { query: toElasticsearchQuery(kueryNode) } : {};

const queryAgents = async (from: number, size: number) => {
return esClient.search<FleetServerAgent, {}>({
from,
size,
track_total_hits: true,
rest_total_hits_as_int: true,
body: {
...body,
sort: [{ [sortField]: { order: sortOrder } }],
},
pit: {
id: pitId,
keep_alive: '1m',
},
...(searchAfter ? { search_after: searchAfter, from: 0 } : {}),
});
};

const res = await queryAgents((page - 1) * perPage, perPage);

let agents = res.hits.hits.map(searchHitToAgent);
let total = res.hits.total as number;

// filtering for a range on the version string will not work,
// nor does filtering on a flattened field (local_metadata), so filter here
if (showUpgradeable) {
// query all agents, then filter upgradeable, and return the requested page and correct total
// if there are more than SO_SEARCH_LIMIT agents, the logic falls back to same as before
if (total < SO_SEARCH_LIMIT) {
const response = await queryAgents(0, SO_SEARCH_LIMIT);
agents = response.hits.hits
.map(searchHitToAgent)
.filter((agent) => isAgentUpgradeable(agent, appContextService.getKibanaVersion()));
total = agents.length;
const start = (page - 1) * perPage;
agents = agents.slice(start, start + perPage);
} else {
agents = agents.filter((agent) =>
isAgentUpgradeable(agent, appContextService.getKibanaVersion())
);
}
}

return {
agents,
total,
page,
perPage,
};
}

export async function openAgentsPointInTime(esClient: ElasticsearchClient): Promise<string> {
index: string = AGENTS_INDEX
): Promise<string> {
const pitKeepAlive = '10m';
const pitRes = await esClient.openPointInTime({
index: AGENTS_INDEX,
index,
keep_alive: pitKeepAlive,
});
return pitRes.id;
}

export async function closeAgentsPointInTime(esClient: ElasticsearchClient, pitId: string) {
export async function closePointInTime(esClient: ElasticsearchClient, pitId: string) {
try {
await esClient.closePointInTime({ id: pitId });
} catch (error) {
Expand All @@ -205,6 +120,8 @@ export async function getAgentsByKuery(
showInactive: boolean;
sortField?: string;
sortOrder?: 'asc' | 'desc';
pitId?: string;
searchAfter?: SortResults;
}
): Promise<{
agents: Agent[];
Expand All @@ -220,6 +137,8 @@ export async function getAgentsByKuery(
kuery,
showInactive = false,
showUpgradeable,
searchAfter,
pitId,
} = options;
const filters = [];

Expand All @@ -240,16 +159,26 @@ export async function getAgentsByKuery(
: [];
const queryAgents = async (from: number, size: number) =>
esClient.search<FleetServerAgent, {}>({
index: AGENTS_INDEX,
from,
size,
track_total_hits: true,
rest_total_hits_as_int: true,
ignore_unavailable: true,
body: {
...body,
sort: [{ [sortField]: { order: sortOrder } }, ...secondarySort],
},
...(pitId
? {
pit: {
id: pitId,
keep_alive: '1m',
},
}
: {
index: AGENTS_INDEX,
ignore_unavailable: true,
}),
...(pitId && searchAfter ? { search_after: searchAfter, from: 0 } : {}),
});
const res = await queryAgents((page - 1) * perPage, perPage);

Expand Down Expand Up @@ -295,11 +224,11 @@ export async function processAgentsInBatches(
includeSuccess: boolean
) => Promise<{ items: BulkActionResult[] }>
): Promise<{ items: BulkActionResult[] }> {
const pitId = await openAgentsPointInTime(esClient);
const pitId = await openPointInTime(esClient);

const perPage = options.batchSize ?? SO_SEARCH_LIMIT;

const res = await getAgentsByKueryPit(esClient, {
const res = await getAgentsByKuery(esClient, {
...options,
page: 1,
perPage,
Expand All @@ -315,7 +244,7 @@ export async function processAgentsInBatches(

while (allAgentsProcessed < res.total) {
const lastAgent = currentAgents[currentAgents.length - 1];
const nextPage = await getAgentsByKueryPit(esClient, {
const nextPage = await getAgentsByKuery(esClient, {
...options,
page: 1,
perPage,
Expand All @@ -328,7 +257,7 @@ export async function processAgentsInBatches(
allAgentsProcessed += currentAgents.length;
}

await closeAgentsPointInTime(esClient, pitId);
await closePointInTime(esClient, pitId);

return results;
}
Expand Down
31 changes: 28 additions & 3 deletions x-pack/plugins/fleet/server/services/agents/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import type { AgentStatus } from '../../types';
import { AgentStatusKueryHelper } from '../../../common/services';
import { FleetUnauthorizedError } from '../../errors';

import { getAgentById, getAgentsByKuery, removeSOAttributes } from './crud';
import { appContextService } from '../app_context';

import {
closePointInTime,
getAgentById,
getAgentsByKuery,
openPointInTime,
removeSOAttributes,
} from './crud';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*';
const MAX_AGENT_DATA_PREVIEW_SIZE = 20;
Expand Down Expand Up @@ -55,6 +63,18 @@ export async function getAgentStatusForAgentPolicy(
agentPolicyId?: string,
filterKuery?: string
) {
let pitId: string | undefined;
try {
pitId = await openPointInTime(esClient);
} catch (error) {
if (error.statusCode === 404) {
appContextService
.getLogger()
.debug('Index .fleet-agents does not exist yet, skipping point in time.');
} else {
throw error;
}
}
const [all, allActive, online, error, offline, updating] = await pMap(
[
undefined, // All agents, including inactive
Expand All @@ -69,11 +89,11 @@ export async function getAgentStatusForAgentPolicy(
showInactive: index === 0,
perPage: 0,
page: 1,
pitId,
kuery: joinKuerys(
...[
kuery,
filterKuery,
`${AGENTS_PREFIX}.attributes.active:true`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding active filter here is incorrect, as getAgentsByKuery adds it based on showInactive:

if (showInactive === false) {
filters.push(ACTIVE_AGENT_CONDITION);
}

agentPolicyId ? `${AGENTS_PREFIX}.policy_id:"${agentPolicyId}"` : undefined,
]
),
Expand All @@ -83,7 +103,11 @@ export async function getAgentStatusForAgentPolicy(
}
);

return {
if (pitId) {
await closePointInTime(esClient, pitId);
}

const result = {
total: allActive.total,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that total doesn't include inactive, is it intentional? It can be a little confusing, example:

{
  "results": {
    "total": 981,
    "inactive": 20,
    "online": 981,
    "error": 0,
    "offline": 0,
    "updating": 0,
    "other": 20,
    "events": 0
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inactive agents are the ones that have been unenrolled, so they're generally excluded from the status as they're not relevant most of the time. We should probably have two fields, one for the real total and one for the total active, but I think we should only do this in a non-breaking way. Maybe total (all active, deprecated) and all (all active and inactive) and active (all active)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this would be cleaner.
Do you happen to know if there is any usage of other status? It might be good to deprecate that as well, as it seems confusing. It is not mentioned in the UI/docs either.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is confusing. Total seems to be total active, which you don't assume from the name total, my vote would be to fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised an issue to deprecate total status: #135980

inactive: all.total - allActive.total,
online: online.total,
Expand All @@ -94,6 +118,7 @@ export async function getAgentStatusForAgentPolicy(
/* @deprecated Agent events do not exists anymore */
events: 0,
};
return result;
}
export async function getIncomingDataByAgentsId(
esClient: ElasticsearchClient,
Expand Down
22 changes: 20 additions & 2 deletions x-pack/test/fleet_api_integration/apis/agents/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ export default function ({ getService }: FtrProviderContext) {
},
},
});
// 1 agent inactive
await es.create({
id: 'agent5',
refresh: 'wait_for',
index: AGENTS_INDEX,
body: {
doc: {
active: false,
access_api_key_id: 'api-key-4',
policy_id: 'policy1',
type: 'PERMANENT',
local_metadata: { host: { hostname: 'host5' } },
user_provided_metadata: {},
enrolled_at: '2022-06-21T12:17:25Z',
last_checkin: '2022-06-27T12:29:29Z',
},
},
});
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/fleet/agents');
Expand All @@ -78,8 +96,8 @@ export default function ({ getService }: FtrProviderContext) {
error: 0,
offline: 1,
updating: 1,
other: 1,
inactive: 0,
other: 2,
inactive: 1,
},
});
});
Expand Down