Skip to content

Commit

Permalink
[Asset Manager] implicit collection telemetry (elastic#157474)
Browse files Browse the repository at this point in the history
## Summary

Closes elastic#157377

Records performances of implicit collection queries using apm
transactions

### Testing
- configure implicit collection in kibana settings
```
xpack.assetManager:
  implicitCollection:
    enabled: true
    interval: 30s
    
    # elasticsearch cluster we should extract signals from
    input:
      hosts: http://input:9200
      username: ...
      password: ...
    
    # elasticsearch cluster we should write assets to
    output:
      hosts: http://output:9200
      username: ...
      password: ...
```
- (for elastic maintainers) start kibana with `ELASTIC_APM_ACTIVE=true
yarn start`
- look for `transaction.type : "asset_manager-implicit_collection"` in
dev cluster

---------

Co-authored-by: Jason Rhodes <[email protected]>
  • Loading branch information
klacabane and jasonrhodes committed Jul 19, 2023
1 parent 0b23894 commit 45079fb
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
* 2.0.
*/

import apm from 'elastic-apm-node';

import { ImplicitCollectionOptions } from '.';
import { Collector } from './collectors';
import { Collector, QUERY_MAX_SIZE } from './collectors';
import { Asset } from '../../../common/types_api';

const TRANSACTION_TYPE = 'asset_manager-implicit_collection';
const transactionName = (collectorName: string) => `asset_manager-collector_${collectorName}`;

export class CollectorRunner {
private collectors: Array<{ name: string; collector: Collector }> = [];

Expand All @@ -19,15 +24,19 @@ export class CollectorRunner {
}

async run() {
const collectorOptions = {
client: this.options.inputClient,
from: Date.now() - this.options.intervalMs,
};
const now = Date.now();

for (let i = 0; i < this.collectors.length; i++) {
const { name, collector } = this.collectors[i];
this.options.logger.info(`Collector '${name}' started`);

const transaction = apm.startTransaction(transactionName(name), TRANSACTION_TYPE);
const collectorOptions = {
from: now - this.options.intervalMs,
client: this.options.inputClient,
transaction,
};

const assets = await collector(collectorOptions)
.then((collectedAssets) => {
this.options.logger.info(`Collector '${name}' found ${collectedAssets.length} assets`);
Expand All @@ -38,6 +47,12 @@ export class CollectorRunner {
return [];
});

transaction?.addLabels({
assets_count: assets.length,
interval_ms: this.options.intervalMs,
page_size: QUERY_MAX_SIZE,
});

if (assets.length) {
const bulkBody = assets.flatMap((asset: Asset) => {
return [{ create: { _index: `assets-${asset['asset.kind']}-default` } }, asset];
Expand All @@ -58,6 +73,8 @@ export class CollectorRunner {
);
});
}

transaction?.end();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

import { APM_INDICES, LOGS_INDICES, METRICS_INDICES } from '../../../constants';
import { Asset } from '../../../../common/types_api';
import { CollectorOptions } from '.';
import { CollectorOptions, QUERY_MAX_SIZE } from '.';
import { withSpan } from './helpers';

export async function collectContainers({ client, from }: CollectorOptions) {
export async function collectContainers({ client, from, transaction }: CollectorOptions) {
const dsl = {
index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES],
size: 1000,
size: QUERY_MAX_SIZE,
collapse: {
field: 'container.id',
},
Expand Down Expand Up @@ -48,30 +49,32 @@ export async function collectContainers({ client, from }: CollectorOptions) {

const esResponse = await client.search(dsl);

const containers = esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const containerId = fields['container.id'];
const podUid = fields['kubernetes.pod.uid'];
const nodeName = fields['kubernetes.node.name'];
const containers = withSpan({ transaction, name: 'processing_response' }, () => {
return esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const containerId = fields['container.id'];
const podUid = fields['kubernetes.pod.uid'];
const nodeName = fields['kubernetes.node.name'];

const parentEan = podUid ? `pod:${podUid}` : `host:${fields['host.hostname']}`;
const parentEan = podUid ? `pod:${podUid}` : `host:${fields['host.hostname']}`;

const container: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'container',
'asset.id': containerId,
'asset.ean': `container:${containerId}`,
'asset.parents': [parentEan],
};
const container: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'container',
'asset.id': containerId,
'asset.ean': `container:${containerId}`,
'asset.parents': [parentEan],
};

if (nodeName) {
container['asset.references'] = [`host:${nodeName}`];
}
if (nodeName) {
container['asset.references'] = [`host:${nodeName}`];
}

acc.push(container);
acc.push(container);

return acc;
}, []);
return acc;
}, []);
});

return containers;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Transaction } from 'elastic-apm-node';

type CollectorSpan = 'processing_response';

interface SpanOptions {
name: CollectorSpan;
transaction: Transaction | null;
}

export function withSpan<T>(options: SpanOptions, fn: () => T) {
const span = options.transaction?.startSpan(options.name);
const result = fn();
span?.end();
return result;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@

import { APM_INDICES, METRICS_INDICES, LOGS_INDICES } from '../../../constants';
import { Asset } from '../../../../common/types_api';
import { CollectorOptions } from '.';
import { CollectorOptions, QUERY_MAX_SIZE } from '.';
import { withSpan } from './helpers';

export async function collectHosts({ client, from }: CollectorOptions): Promise<Asset[]> {
export async function collectHosts({
client,
from,
transaction,
}: CollectorOptions): Promise<Asset[]> {
const dsl = {
index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES],
size: 1000,
size: QUERY_MAX_SIZE,
collapse: { field: 'host.hostname' },
sort: [{ _score: 'desc' }, { '@timestamp': 'desc' }],
_source: false,
Expand Down Expand Up @@ -47,50 +52,52 @@ export async function collectHosts({ client, from }: CollectorOptions): Promise<

const esResponse = await client.search(dsl);

const hosts = esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const hostName = fields['host.hostname'];
const k8sNode = fields['kubernetes.node.name'];
const k8sPod = fields['kubernetes.pod.uid'];
const hosts = withSpan({ transaction, name: 'processing_response' }, () => {
return esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const hostName = fields['host.hostname'];
const k8sNode = fields['kubernetes.node.name'];
const k8sPod = fields['kubernetes.pod.uid'];

const hostEan = `host:${k8sNode || hostName}`;
const hostEan = `host:${k8sNode || hostName}`;

const host: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'host',
'asset.id': k8sNode || hostName,
'asset.name': k8sNode || hostName,
'asset.ean': hostEan,
};
const host: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'host',
'asset.id': k8sNode || hostName,
'asset.name': k8sNode || hostName,
'asset.ean': hostEan,
};

if (fields['cloud.provider']) {
host['cloud.provider'] = fields['cloud.provider'];
}
if (fields['cloud.provider']) {
host['cloud.provider'] = fields['cloud.provider'];
}

if (fields['cloud.instance.id']) {
host['cloud.instance.id'] = fields['cloud.instance.id'];
}
if (fields['cloud.instance.id']) {
host['cloud.instance.id'] = fields['cloud.instance.id'];
}

if (fields['cloud.service.name']) {
host['cloud.service.name'] = fields['cloud.service.name'];
}
if (fields['cloud.service.name']) {
host['cloud.service.name'] = fields['cloud.service.name'];
}

if (fields['cloud.region']) {
host['cloud.region'] = fields['cloud.region'];
}
if (fields['cloud.region']) {
host['cloud.region'] = fields['cloud.region'];
}

if (fields['orchestrator.cluster.name']) {
host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name'];
}
if (fields['orchestrator.cluster.name']) {
host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name'];
}

if (k8sPod) {
host['asset.children'] = [`pod:${k8sPod}`];
}
if (k8sPod) {
host['asset.children'] = [`pod:${k8sPod}`];
}

acc.push(host);
acc.push(host);

return acc;
}, []);
return acc;
}, []);
});

return hosts;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Transaction } from 'elastic-apm-node';
import { ElasticsearchClient } from '@kbn/core/server';
import { Asset } from '../../../../common/types_api';

export const QUERY_MAX_SIZE = 1000;

export interface CollectorOptions {
client: ElasticsearchClient;
from: number;
transaction: Transaction | null;
}

export type Collector = (opts: CollectorOptions) => Promise<Asset[]>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

import { APM_INDICES, LOGS_INDICES, METRICS_INDICES } from '../../../constants';
import { Asset } from '../../../../common/types_api';
import { CollectorOptions } from '.';
import { CollectorOptions, QUERY_MAX_SIZE } from '.';
import { withSpan } from './helpers';

export async function collectPods({ client, from }: CollectorOptions) {
export async function collectPods({ client, from, transaction }: CollectorOptions) {
const dsl = {
index: [APM_INDICES, LOGS_INDICES, METRICS_INDICES],
size: 1000,
size: QUERY_MAX_SIZE,
collapse: {
field: 'kubernetes.pod.uid',
},
Expand Down Expand Up @@ -46,32 +47,34 @@ export async function collectPods({ client, from }: CollectorOptions) {

const esResponse = await client.search(dsl);

const pods = esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const podUid = fields['kubernetes.pod.uid'];
const nodeName = fields['kubernetes.node.name'];
const clusterName = fields['orchestrator.cluster.name'];
const pods = withSpan({ transaction, name: 'processing_response' }, () => {
return esResponse.hits.hits.reduce<Asset[]>((acc: Asset[], hit: any) => {
const { fields = {} } = hit;
const podUid = fields['kubernetes.pod.uid'];
const nodeName = fields['kubernetes.node.name'];
const clusterName = fields['orchestrator.cluster.name'];

const pod: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'pod',
'asset.id': podUid,
'asset.ean': `pod:${podUid}`,
'asset.parents': [`host:${nodeName}`],
};
const pod: Asset = {
'@timestamp': new Date().toISOString(),
'asset.kind': 'pod',
'asset.id': podUid,
'asset.ean': `pod:${podUid}`,
'asset.parents': [`host:${nodeName}`],
};

if (fields['cloud.provider']) {
pod['cloud.provider'] = fields['cloud.provider'];
}
if (fields['cloud.provider']) {
pod['cloud.provider'] = fields['cloud.provider'];
}

if (clusterName) {
pod['orchestrator.cluster.name'] = clusterName;
}
if (clusterName) {
pod['orchestrator.cluster.name'] = clusterName;
}

acc.push(pod);
acc.push(pod);

return acc;
}, []);
return acc;
}, []);
});

return pods;
}
Loading

0 comments on commit 45079fb

Please sign in to comment.