diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collector_runner.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collector_runner.ts index eb60a49e0240d..f563d18a9f4e3 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collector_runner.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collector_runner.ts @@ -5,10 +5,15 @@ * 2.0. */ +import apm from 'elastic-apm-node'; + import { ImplicitCollectionOptions } from '.'; -import { Collector } from './collectors'; +import { Collector, QUERY_MAX_SIZE } from './collectors'; import { Asset } from '../../../common/types_api'; +const TRANSACTION_TYPE = 'asset_manager-implicit_collection'; +const transactionName = (collectorName: string) => `asset_manager-collector_${collectorName}`; + export class CollectorRunner { private collectors: Array<{ name: string; collector: Collector }> = []; @@ -19,15 +24,19 @@ export class CollectorRunner { } async run() { - const collectorOptions = { - client: this.options.inputClient, - from: Date.now() - this.options.intervalMs, - }; + const now = Date.now(); for (let i = 0; i < this.collectors.length; i++) { const { name, collector } = this.collectors[i]; this.options.logger.info(`Collector '${name}' started`); + const transaction = apm.startTransaction(transactionName(name), TRANSACTION_TYPE); + const collectorOptions = { + from: now - this.options.intervalMs, + client: this.options.inputClient, + transaction, + }; + const assets = await collector(collectorOptions) .then((collectedAssets) => { this.options.logger.info(`Collector '${name}' found ${collectedAssets.length} assets`); @@ -38,6 +47,12 @@ export class CollectorRunner { return []; }); + transaction?.addLabels({ + assets_count: assets.length, + interval_ms: this.options.intervalMs, + page_size: QUERY_MAX_SIZE, + }); + if (assets.length) { const bulkBody = assets.flatMap((asset: Asset) => { return [{ create: { _index: `assets-${asset['asset.kind']}-default` } }, asset]; @@ -58,6 +73,8 @@ export class CollectorRunner { ); }); } + + transaction?.end(); } } } diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/containers.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/containers.ts index 845e81d5c5640..e833c4be87f16 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/containers.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/containers.ts @@ -7,12 +7,13 @@ import { APM_INDICES, LOGS_INDICES, METRICS_INDICES } from '../../../constants'; import { Asset } from '../../../../common/types_api'; -import { CollectorOptions } from '.'; +import { CollectorOptions, QUERY_MAX_SIZE } from '.'; +import { withSpan } from './helpers'; -export async function collectContainers({ client, from }: CollectorOptions) { +export async function collectContainers({ client, from, transaction }: CollectorOptions) { const dsl = { index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES], - size: 1000, + size: QUERY_MAX_SIZE, collapse: { field: 'container.id', }, @@ -48,30 +49,32 @@ export async function collectContainers({ client, from }: CollectorOptions) { const esResponse = await client.search(dsl); - const containers = esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { - const { fields = {} } = hit; - const containerId = fields['container.id']; - const podUid = fields['kubernetes.pod.uid']; - const nodeName = fields['kubernetes.node.name']; + const containers = withSpan({ transaction, name: 'processing_response' }, () => { + return esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { + const { fields = {} } = hit; + const containerId = fields['container.id']; + const podUid = fields['kubernetes.pod.uid']; + const nodeName = fields['kubernetes.node.name']; - const parentEan = podUid ? `pod:${podUid}` : `host:${fields['host.hostname']}`; + const parentEan = podUid ? `pod:${podUid}` : `host:${fields['host.hostname']}`; - const container: Asset = { - '@timestamp': new Date().toISOString(), - 'asset.kind': 'container', - 'asset.id': containerId, - 'asset.ean': `container:${containerId}`, - 'asset.parents': [parentEan], - }; + const container: Asset = { + '@timestamp': new Date().toISOString(), + 'asset.kind': 'container', + 'asset.id': containerId, + 'asset.ean': `container:${containerId}`, + 'asset.parents': [parentEan], + }; - if (nodeName) { - container['asset.references'] = [`host:${nodeName}`]; - } + if (nodeName) { + container['asset.references'] = [`host:${nodeName}`]; + } - acc.push(container); + acc.push(container); - return acc; - }, []); + return acc; + }, []); + }); return containers; } diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/helpers.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/helpers.ts new file mode 100644 index 0000000000000..52617729cba4e --- /dev/null +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/helpers.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Transaction } from 'elastic-apm-node'; + +type CollectorSpan = 'processing_response'; + +interface SpanOptions { + name: CollectorSpan; + transaction: Transaction | null; +} + +export function withSpan(options: SpanOptions, fn: () => T) { + const span = options.transaction?.startSpan(options.name); + const result = fn(); + span?.end(); + return result; +} diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/hosts.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/hosts.ts index f5409495c3fad..0225947f7f498 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/hosts.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/hosts.ts @@ -7,12 +7,17 @@ import { APM_INDICES, METRICS_INDICES, LOGS_INDICES } from '../../../constants'; import { Asset } from '../../../../common/types_api'; -import { CollectorOptions } from '.'; +import { CollectorOptions, QUERY_MAX_SIZE } from '.'; +import { withSpan } from './helpers'; -export async function collectHosts({ client, from }: CollectorOptions): Promise { +export async function collectHosts({ + client, + from, + transaction, +}: CollectorOptions): Promise { const dsl = { index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES], - size: 1000, + size: QUERY_MAX_SIZE, collapse: { field: 'host.hostname' }, sort: [{ _score: 'desc' }, { '@timestamp': 'desc' }], _source: false, @@ -47,50 +52,52 @@ export async function collectHosts({ client, from }: CollectorOptions): Promise< const esResponse = await client.search(dsl); - const hosts = esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { - const { fields = {} } = hit; - const hostName = fields['host.hostname']; - const k8sNode = fields['kubernetes.node.name']; - const k8sPod = fields['kubernetes.pod.uid']; + const hosts = withSpan({ transaction, name: 'processing_response' }, () => { + return esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { + const { fields = {} } = hit; + const hostName = fields['host.hostname']; + const k8sNode = fields['kubernetes.node.name']; + const k8sPod = fields['kubernetes.pod.uid']; - const hostEan = `host:${k8sNode || hostName}`; + const hostEan = `host:${k8sNode || hostName}`; - const host: Asset = { - '@timestamp': new Date().toISOString(), - 'asset.kind': 'host', - 'asset.id': k8sNode || hostName, - 'asset.name': k8sNode || hostName, - 'asset.ean': hostEan, - }; + const host: Asset = { + '@timestamp': new Date().toISOString(), + 'asset.kind': 'host', + 'asset.id': k8sNode || hostName, + 'asset.name': k8sNode || hostName, + 'asset.ean': hostEan, + }; - if (fields['cloud.provider']) { - host['cloud.provider'] = fields['cloud.provider']; - } + if (fields['cloud.provider']) { + host['cloud.provider'] = fields['cloud.provider']; + } - if (fields['cloud.instance.id']) { - host['cloud.instance.id'] = fields['cloud.instance.id']; - } + if (fields['cloud.instance.id']) { + host['cloud.instance.id'] = fields['cloud.instance.id']; + } - if (fields['cloud.service.name']) { - host['cloud.service.name'] = fields['cloud.service.name']; - } + if (fields['cloud.service.name']) { + host['cloud.service.name'] = fields['cloud.service.name']; + } - if (fields['cloud.region']) { - host['cloud.region'] = fields['cloud.region']; - } + if (fields['cloud.region']) { + host['cloud.region'] = fields['cloud.region']; + } - if (fields['orchestrator.cluster.name']) { - host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name']; - } + if (fields['orchestrator.cluster.name']) { + host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name']; + } - if (k8sPod) { - host['asset.children'] = [`pod:${k8sPod}`]; - } + if (k8sPod) { + host['asset.children'] = [`pod:${k8sPod}`]; + } - acc.push(host); + acc.push(host); - return acc; - }, []); + return acc; + }, []); + }); return hosts; } diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/index.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/index.ts index 79b53597a9a40..72c973b95ae7e 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/index.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/index.ts @@ -4,12 +4,17 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ + +import { Transaction } from 'elastic-apm-node'; import { ElasticsearchClient } from '@kbn/core/server'; import { Asset } from '../../../../common/types_api'; +export const QUERY_MAX_SIZE = 1000; + export interface CollectorOptions { client: ElasticsearchClient; from: number; + transaction: Transaction | null; } export type Collector = (opts: CollectorOptions) => Promise; diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/pods.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/pods.ts index 4bb983acfca6e..bc5d3d4155184 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/pods.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/pods.ts @@ -7,12 +7,13 @@ import { APM_INDICES, LOGS_INDICES, METRICS_INDICES } from '../../../constants'; import { Asset } from '../../../../common/types_api'; -import { CollectorOptions } from '.'; +import { CollectorOptions, QUERY_MAX_SIZE } from '.'; +import { withSpan } from './helpers'; -export async function collectPods({ client, from }: CollectorOptions) { +export async function collectPods({ client, from, transaction }: CollectorOptions) { const dsl = { index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES], - size: 1000, + size: QUERY_MAX_SIZE, collapse: { field: 'kubernetes.pod.uid', }, @@ -46,32 +47,34 @@ export async function collectPods({ client, from }: CollectorOptions) { const esResponse = await client.search(dsl); - const pods = esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { - const { fields = {} } = hit; - const podUid = fields['kubernetes.pod.uid']; - const nodeName = fields['kubernetes.node.name']; - const clusterName = fields['orchestrator.cluster.name']; + const pods = withSpan({ transaction, name: 'processing_response' }, () => { + return esResponse.hits.hits.reduce((acc: Asset[], hit: any) => { + const { fields = {} } = hit; + const podUid = fields['kubernetes.pod.uid']; + const nodeName = fields['kubernetes.node.name']; + const clusterName = fields['orchestrator.cluster.name']; - const pod: Asset = { - '@timestamp': new Date().toISOString(), - 'asset.kind': 'pod', - 'asset.id': podUid, - 'asset.ean': `pod:${podUid}`, - 'asset.parents': [`host:${nodeName}`], - }; + const pod: Asset = { + '@timestamp': new Date().toISOString(), + 'asset.kind': 'pod', + 'asset.id': podUid, + 'asset.ean': `pod:${podUid}`, + 'asset.parents': [`host:${nodeName}`], + }; - if (fields['cloud.provider']) { - pod['cloud.provider'] = fields['cloud.provider']; - } + if (fields['cloud.provider']) { + pod['cloud.provider'] = fields['cloud.provider']; + } - if (clusterName) { - pod['orchestrator.cluster.name'] = clusterName; - } + if (clusterName) { + pod['orchestrator.cluster.name'] = clusterName; + } - acc.push(pod); + acc.push(pod); - return acc; - }, []); + return acc; + }, []); + }); return pods; } diff --git a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/services.ts b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/services.ts index e1fd0c7a1a042..9057d8e2d8152 100644 --- a/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/services.ts +++ b/x-pack/plugins/asset_manager/server/lib/implicit_collection/collectors/services.ts @@ -7,11 +7,16 @@ import { APM_INDICES } from '../../../constants'; import { Asset } from '../../../../common/types_api'; -import { CollectorOptions } from '.'; +import { CollectorOptions, QUERY_MAX_SIZE } from '.'; +import { withSpan } from './helpers'; const MISSING_KEY = '__unknown__'; -export async function collectServices({ client, from }: CollectorOptions): Promise { +export async function collectServices({ + client, + from, + transaction, +}: CollectorOptions): Promise { const dsl = { index: APM_INDICES, size: 0, @@ -44,7 +49,7 @@ export async function collectServices({ client, from }: CollectorOptions): Promi aggs: { service_environment: { multi_terms: { - size: 100, + size: QUERY_MAX_SIZE, terms: [ { field: 'service.name', @@ -58,7 +63,7 @@ export async function collectServices({ client, from }: CollectorOptions): Promi aggs: { container_host: { multi_terms: { - size: 100, + size: QUERY_MAX_SIZE, terms: [ { field: 'container.id', missing: MISSING_KEY }, { field: 'host.hostname', missing: MISSING_KEY }, @@ -71,40 +76,43 @@ export async function collectServices({ client, from }: CollectorOptions): Promi }; const esResponse = await client.search(dsl); - const serviceEnvironment = esResponse.aggregations?.service_environment as { buckets: any[] }; - const services = (serviceEnvironment?.buckets ?? []).reduce((acc: Asset[], hit: any) => { - const [serviceName, environment] = hit.key; - const containerHosts = hit.container_host.buckets; + const services = withSpan({ transaction, name: 'processing_response' }, () => { + const serviceEnvironment = esResponse.aggregations?.service_environment as { buckets: any[] }; - const service: Asset = { - '@timestamp': new Date().toISOString(), - 'asset.kind': 'service', - 'asset.id': serviceName, - 'asset.ean': `service:${serviceName}`, - 'asset.references': [], - 'asset.parents': [], - }; + return (serviceEnvironment?.buckets ?? []).reduce((acc: Asset[], hit: any) => { + const [serviceName, environment] = hit.key; + const containerHosts = hit.container_host.buckets; - if (environment !== MISSING_KEY) { - service['service.environment'] = environment; - } + const service: Asset = { + '@timestamp': new Date().toISOString(), + 'asset.kind': 'service', + 'asset.id': serviceName, + 'asset.ean': `service:${serviceName}`, + 'asset.references': [], + 'asset.parents': [], + }; - containerHosts.forEach((nestedHit: any) => { - const [containerId, hostname] = nestedHit.key; - if (containerId !== MISSING_KEY) { - (service['asset.parents'] as string[]).push(`container:${containerId}`); + if (environment !== MISSING_KEY) { + service['service.environment'] = environment; } - if (hostname !== MISSING_KEY) { - (service['asset.references'] as string[]).push(`host:${hostname}`); - } - }); + containerHosts.forEach((nestedHit: any) => { + const [containerId, hostname] = nestedHit.key; + if (containerId !== MISSING_KEY) { + (service['asset.parents'] as string[]).push(`container:${containerId}`); + } + + if (hostname !== MISSING_KEY) { + (service['asset.references'] as string[]).push(`host:${hostname}`); + } + }); - acc.push(service); + acc.push(service); - return acc; - }, []); + return acc; + }, []); + }); return services; }