From 80de8c35e01ff4636d9132373d9679faeeb30690 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:38:09 +0200 Subject: [PATCH 1/9] metrics indices --- constants.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/constants.ts b/constants.ts index 341c898..1bd859d 100644 --- a/constants.ts +++ b/constants.ts @@ -1,8 +1,10 @@ export const LOGS_INDICES = 'logs-*,filebeat-*'; export const APM_INDICES = 'traces-*,apm*,metrics-apm*'; +export const METRICS_INDICES = 'metrics-*,metricbeat-*'; export const REMOTE_LOGS_INDICES = 'remote_cluster:logs-*,remote_cluster:filebeat-*'; export const REMOTE_APM_INDICES = 'remote_cluster:traces-*,remote_cluster:apm*,remote_cluster:metrics-apm*'; +export const REMOTE_METRICS_INDICES = 'remote_cluster:metrics-*,remote_cluster:metricbeat-*'; export function getLogsIndices() { if (process.env.ES_IS_CCS === "true") { @@ -18,4 +20,12 @@ export function getApmIndices() { } else { return APM_INDICES; } -} \ No newline at end of file +} + +export function getMetricsIndices() { + if (process.env.ES_IS_CCS === "true") { + return REMOTE_METRICS_INDICES; + } else { + return METRICS_INDICES; + } +} From e0d46818c70503f8d87164b329243196f6b501d6 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:38:19 +0200 Subject: [PATCH 2/9] data stream template --- lib/assets_index_template.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/assets_index_template.ts b/lib/assets_index_template.ts index ee898d1..362d00a 100644 --- a/lib/assets_index_template.ts +++ b/lib/assets_index_template.ts @@ -4,6 +4,7 @@ export const assetsIndexTemplateConfig: IndicesPutIndexTemplateRequest = { name: 'assets', index_patterns: ['assets*'], priority: 100, + data_stream: {}, template: { settings: {}, mappings: { From 75cbcdffb81c190d0dcb0d2e8923121adbd0f0cc Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:38:41 +0200 Subject: [PATCH 3/9] remove env variables check --- lib/es_client.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/es_client.ts b/lib/es_client.ts index 382eed5..b88187e 100644 --- a/lib/es_client.ts +++ b/lib/es_client.ts @@ -36,12 +36,6 @@ export async function getEsClient(options: AssetClientOptions) { return singletonClient; } - if (!process.env.ES_USERNAME || !process.env.ES_PASSWORD) { - throw new Error('Please provide username and password for Elasticsearch via ES_USERNAME and ES_PASSWORD env vars'); - } - - const tlsRejectUnauthorized = (process.env.ASSETS_READ_ES_TLS_REJECT_UNAUTHORIZED === "false") ? false : true; - singletonClient = new AssetClient(options); // await template creation? From ecb91da8727ea64b7ee58fa417b1fe643c719094 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:39:02 +0200 Subject: [PATCH 4/9] some types --- types.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/types.ts b/types.ts index 86775c7..057786c 100644 --- a/types.ts +++ b/types.ts @@ -3,11 +3,17 @@ export interface SimpleAsset { 'asset.ean': string; 'asset.type': T; 'asset.id': string; + 'asset.kind': string; 'asset.name'?: string; 'asset.parents'?: string[]; 'asset.children'?: string[]; 'asset.references'?: string[]; 'cloud.provider'?: string; + 'cloud.service.name'?: string; + 'cloud.region'?: string; + 'cloud.instance.id'?: string; 'orchestrator.cluster.name'?: string; 'service.environment'?: string; -} \ No newline at end of file +} + +export type HostType = 'host' | 'aws.ec2' | 'k8s.node' | 'gcp.gce'; \ No newline at end of file From eccec91aff5d5c99d8ad4f1583c0b2713139abbb Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:40:55 +0200 Subject: [PATCH 5/9] collect hosts and write to target es --- lib/collectHostsFromMetrics.ts | 101 +++++++++++++++++++++++++++++++++ run.ts | 20 ++++--- 2 files changed, 112 insertions(+), 9 deletions(-) create mode 100644 lib/collectHostsFromMetrics.ts diff --git a/lib/collectHostsFromMetrics.ts b/lib/collectHostsFromMetrics.ts new file mode 100644 index 0000000..7c4065f --- /dev/null +++ b/lib/collectHostsFromMetrics.ts @@ -0,0 +1,101 @@ +import { Client } from "@elastic/elasticsearch"; +import { getMetricsIndices } from "../constants"; +import { SimpleAsset, HostType } from "../types"; + +interface CollectHosts { + hosts: SimpleAsset[]; +} + +export async function collectHosts({ esClient }: { esClient: Client }): Promise[]> { + const dsl = { + index: [getMetricsIndices()], + size: 1000, + collapse: { + field: 'host.hostname' + }, + sort: [ + { + "@timestamp": "desc" + } + ], + _source: false, + fields: [ + 'kubernetes.*', + 'cloud.*', + 'orchestrator.cluster.name', + 'host.hostname' + ], + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + gte: 'now-1h' + } + } + } + ], + must: [ + { + exists: { + field: 'host.hostname' + } + }, + ] + } + } + }; + + console.log(JSON.stringify(dsl)); + const esResponse = await esClient.search(dsl); + + const assets = esResponse.hits.hits.reduce((acc, hit) => { + const { fields = {} } = hit; + const hostName = fields['host.hostname']; + const hostType: HostType = getHostType(fields); + + const host: SimpleAsset = { + '@timestamp': new Date(), + 'asset.type': hostType, + 'asset.kind': 'host', + 'asset.id': hostName, + 'asset.name': hostName, + 'asset.ean': `${hostType}:${hostName}`, + }; + + if (fields['cloud.provider']) { + host['cloud.provider'] = fields['cloud.provider']; + } + + if (fields['cloud.instance.id']) { + host['cloud.instance.id'] = fields['cloud.instance.id']; + } + + if (fields['cloud.service.name']) { + host['cloud.service.name'] = fields['cloud.service.name']; + } + + if (fields['cloud.region']) { + host['cloud.region'] = fields['cloud.region']; + } + + if (fields['orchestrator.cluster.name']) { + host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name']; + } + + acc.hosts.push(host); + + return acc; + }, { hosts: [] }); + + return assets.hosts; +} + +function getHostType(fields: any): HostType { + if (fields['cloud.provider'] && fields['cloud.service.name']) { + return `${fields['cloud.provider']}.${fields['cloud.service.name']}`.toLowerCase() as HostType; + } + + return 'host'; +}; diff --git a/run.ts b/run.ts index 5780ce3..c5e5f3a 100644 --- a/run.ts +++ b/run.ts @@ -1,22 +1,24 @@ import yargs from "yargs/yargs"; -import { collectServicesFromSummaries } from "./lib/collectServicesFromSummaries"; +import { collectHosts } from "./lib/collectHostsFromMetrics"; import { AssetClient, getEsClient } from "./lib/es_client"; +import { HostType, SimpleAsset } from "./types"; import config from "./config/config.json"; main(); async function etl(esClient: AssetClient) { - // Read services from summaries - const { services, fullServices } = await collectServicesFromSummaries({ esClient }); + const hosts = await collectHosts({ esClient: esClient.reader }); - // Convert services to assets - // TBA + const bulkBody = hosts.flatMap((asset: SimpleAsset) => { + return [ + { create: { _index: `assets-${asset['asset.type']}-default` } }, + asset, + ]; + }) - // Write service assets to ES using `esClient.writeBatch()` or `esClient.writer.*` - // TBA + const response = await esClient.writer.bulk({ body: bulkBody }); - console.log(JSON.stringify(services)); - console.log(JSON.stringify(fullServices)); + console.log(`wrote ${response.items.length} assets; errors ? ${response.errors}`); } async function main() { From 2a4157555ec358d56467bf2ac657f3adff6fd889 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 27 Apr 2023 23:41:16 +0200 Subject: [PATCH 6/9] package-lock --- package-lock.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index 8962c8b..50e877b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "elastic-assets-etl", + "name": "elastic-asset-etl-poc", "lockfileVersion": 2, "requires": true, "packages": { From e4ee25d82fe49f74c57e97ed651e106928e5db7a Mon Sep 17 00:00:00 2001 From: klacabane Date: Tue, 2 May 2023 18:23:21 +0200 Subject: [PATCH 7/9] individual asset queries --- lib/collectContainers.ts | 75 +++++++++++++ lib/collectHostsFromMetrics.ts | 49 ++++---- lib/collectPods.ts | 60 ++-------- lib/collectServices.ts | 197 ++++++++++++--------------------- run.ts | 35 ++++-- types.ts | 3 +- 6 files changed, 211 insertions(+), 208 deletions(-) create mode 100644 lib/collectContainers.ts diff --git a/lib/collectContainers.ts b/lib/collectContainers.ts new file mode 100644 index 0000000..e0f0080 --- /dev/null +++ b/lib/collectContainers.ts @@ -0,0 +1,75 @@ +import { Client } from "@elastic/elasticsearch"; +import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; +import { SimpleAsset } from "../types"; + +interface CollectContainers { + containers: SimpleAsset<'container'>[]; +} + +export async function collectContainers({ esClient }: { esClient: Client }) { + const dsl = { + index: [getLogsIndices(), getApmIndices(), getMetricsIndices()], + size: 1000, + collapse: { + field: 'container.id' + }, + sort: [ + { '_score': 'desc' }, + { '@timestamp': 'desc' } + ], + _source: false, + fields: [ + 'kubernetes.*', + 'cloud.provider', + 'orchestrator.cluster.name', + 'host.name', + 'host.hostname' + ], + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + gte: 'now-1h' + } + } + } + ], + must: [ + { exists: { field: 'kubernetes.container.id' } }, + { exists: { field: 'kubernetes.pod.uid' } }, + { exists: { field: 'host.hostname' } }, + ] + } + } + }; + + const esResponse = await esClient.search(dsl); + + // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each + const docs = esResponse.hits.hits.reduce((acc, hit) => { + const { fields = {} } = hit; + const containerId = fields['container.id']; + const podUid = fields['kubernetes.pod.uid']; + const nodeName = fields['kubernetes.node.name']; + + const parentEan = podUid ? `k8s.pod:${podUid}` : fields['host.hostname']; + + const container: SimpleAsset<'container'> = { + '@timestamp': new Date(), + 'asset.type': 'container', + 'asset.kind': 'container', + 'asset.id': containerId, + 'asset.ean': `container:${containerId}`, + 'asset.parents': parentEan && [parentEan], + }; + + acc.containers.push(container); + + return acc; + }, { containers: [] }); + + return docs.containers; +} + diff --git a/lib/collectHostsFromMetrics.ts b/lib/collectHostsFromMetrics.ts index 7c4065f..3c22973 100644 --- a/lib/collectHostsFromMetrics.ts +++ b/lib/collectHostsFromMetrics.ts @@ -1,5 +1,5 @@ import { Client } from "@elastic/elasticsearch"; -import { getMetricsIndices } from "../constants"; +import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset, HostType } from "../types"; interface CollectHosts { @@ -8,22 +8,21 @@ interface CollectHosts { export async function collectHosts({ esClient }: { esClient: Client }): Promise[]> { const dsl = { - index: [getMetricsIndices()], + index: [getMetricsIndices(), getLogsIndices(), getApmIndices()], size: 1000, - collapse: { - field: 'host.hostname' - }, + collapse: { field: 'host.hostname' }, sort: [ - { - "@timestamp": "desc" - } + { "_score": "desc" }, + { "@timestamp": "desc" } ], _source: false, fields: [ - 'kubernetes.*', + '@timestamp', 'cloud.*', + 'container.*', + 'host.hostname', + 'kubernetes.*', 'orchestrator.cluster.name', - 'host.hostname' ], query: { bool: { @@ -37,31 +36,35 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< } ], must: [ - { - exists: { - field: 'host.hostname' - } - }, + { exists: { field: 'host.hostname' } }, + ], + should: [ + { exists: { field: 'kubernetes.node.name' } }, + { exists: { field: 'kubernetes.pod.uid' } }, + { exists: { field: 'container.id' } } ] } } }; - console.log(JSON.stringify(dsl)); const esResponse = await esClient.search(dsl); + // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each const assets = esResponse.hits.hits.reduce((acc, hit) => { const { fields = {} } = hit; const hostName = fields['host.hostname']; - const hostType: HostType = getHostType(fields); + const k8sNode = fields['kubernetes.node.name']; + const k8sPod = fields['kubernetes.pod.uid']; + + const hostEan = `${k8sNode ? 'k8s.node:' + k8sNode : 'host:' + hostName}`; const host: SimpleAsset = { '@timestamp': new Date(), - 'asset.type': hostType, + 'asset.type': k8sNode ? 'k8s.node' : 'host', 'asset.kind': 'host', - 'asset.id': hostName, - 'asset.name': hostName, - 'asset.ean': `${hostType}:${hostName}`, + 'asset.id': k8sNode || hostName, + 'asset.name': k8sNode || hostName, + 'asset.ean': hostEan, }; if (fields['cloud.provider']) { @@ -84,6 +87,10 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< host['orchestrator.cluster.name'] = fields['orchestrator.cluster.name']; } + if (k8sPod) { + host['asset.children'] = [`k8s.pod:${k8sPod}`]; + } + acc.hosts.push(host); return acc; diff --git a/lib/collectPods.ts b/lib/collectPods.ts index 7b9adb6..2defc4c 100644 --- a/lib/collectPods.ts +++ b/lib/collectPods.ts @@ -1,32 +1,26 @@ import { Client } from "@elastic/elasticsearch"; -import { getApmIndices, getLogsIndices } from "../constants"; +import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset } from "../types"; interface CollectPodsAndNodes { pods: SimpleAsset<'k8s.pod'>[]; - nodes: SimpleAsset<'k8s.node'>[]; } export async function collectPods({ esClient }: { esClient: Client }) { // STEP ONE: Query pods that reference their k8s nodes const dsl = { - index: [getLogsIndices(), getApmIndices()], + index: [getLogsIndices(), getApmIndices(), getMetricsIndices()], size: 1000, collapse: { field: 'kubernetes.pod.uid' }, sort: [ - { - "@timestamp": "desc" // TODO: Switch to ASC with a hard-coded "one hour ago" value, then use "search_after" to process all results? - } + { '_score': 'desc' }, + { '@timestamp': 'desc' } ], _source: false, fields: [ - 'kubernetes.pod.uid', - 'kubneretes.pod.name', - 'kubernetes.node.id', - 'kubernetes.node.name', - 'kubernetes.namespace', + 'kubernetes.*', 'cloud.provider', 'orchestrator.cluster.name', 'host.name', @@ -44,22 +38,13 @@ export async function collectPods({ esClient }: { esClient: Client }) { } ], must: [ - { - exists: { - field: 'kubernetes.pod.uid' - } - }, - { - exists: { - field: 'kubernetes.node.name' - } - } + { exists: { field: 'kubernetes.pod.uid' } }, + { exists: { field: 'kubernetes.node.name' } } ] } } }; - console.log(JSON.stringify(dsl)); const esResponse = await esClient.search(dsl); // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each @@ -72,6 +57,7 @@ export async function collectPods({ esClient }: { esClient: Client }) { const pod: SimpleAsset<'k8s.pod'> = { '@timestamp': new Date(), 'asset.type': 'k8s.pod', + 'asset.kind': 'pod', 'asset.id': podUid, 'asset.ean': `k8s.pod:${podUid}`, 'asset.parents': [`k8s.node:${nodeName}`] @@ -87,32 +73,8 @@ export async function collectPods({ esClient }: { esClient: Client }) { acc.pods.push(pod); - const foundNode = acc.nodes.find((collectedNode) => collectedNode['asset.ean'] === `k8s.node:${nodeName}`); - - if (foundNode) { - if (foundNode['asset.children']) { - foundNode['asset.children'].push(`k8s.pod:${podUid}`); - } else { - foundNode['asset.children'] = [`k8s.pod:${podUid}`]; - } - } else { - const node: SimpleAsset<'k8s.node'> = { - '@timestamp': new Date(), - 'asset.type': 'k8s.node', - 'asset.id': nodeName, - 'asset.ean': `k8s.node:${nodeName}`, - 'asset.children': [`k8s.pod:${podUid}`] - }; - - if (clusterName) { - node['asset.parents'] = [`k8s.cluster:${clusterName}`]; - } - - acc.nodes.push(node); - } - return acc; - }, { pods: [], nodes: [] }); + }, { pods: [] }); - return { esResponse, docs }; -} \ No newline at end of file + return docs.pods; +} diff --git a/lib/collectServices.ts b/lib/collectServices.ts index c18a396..12d445e 100644 --- a/lib/collectServices.ts +++ b/lib/collectServices.ts @@ -1,174 +1,115 @@ import { Client } from "@elastic/elasticsearch"; -import { getApmIndices, getLogsIndices } from "../constants"; +import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset } from "../types"; interface CollectServices { services: SimpleAsset<'service'>[]; - containers: SimpleAsset<'container'>[]; } -export async function collectServices({ esClient }: { esClient: Client }) { +const MISSING_KEY = "__unknown__"; + +/** + * service.name|service.environment + * -> containers|hostname + */ + +export async function collectServices({ esClient }: { esClient: Client }): Promise[]> { // STEP ONE: Query pods that reference their k8s nodes const dsl = { - index: [getApmIndices()], - size: 1000, - collapse: { - field: 'service.name' - }, - sort: [ + index: [getApmIndices(), getLogsIndices(), getMetricsIndices()].concat(','), + "size": 0, + "sort": [ { - "@timestamp": "desc" // TODO: Switch to ASC with a hard-coded "one hour ago" value, then use "search_after" to process all results? + "@timestamp": "desc" } ], - _source: false, - fields: [ - 'service.name', - 'service.environment', - 'container.*', - 'kubernetes.pod.uid', - 'kubneretes.pod.name', - 'kubernetes.node.id', - 'kubernetes.node.name', - 'kubernetes.namespace', - 'cloud.provider', - 'orchestrator.cluster.name', - 'host.name', - 'host.hostname' - ], - query: { - bool: { - filter: [ + "_source": false, + "query": { + "bool": { + "filter": [ { - range: { - '@timestamp': { - gte: 'now-1h' + "range": { + "@timestamp": { + "gte": "now-1h" } } } ], - must: [ + "must": [ { - exists: { - field: 'service.name' + "exists": { + "field": "service.name" } } - ], - should: [ - { - exists: { - field: 'container.id' - } - }, - { - exists: { - field: 'kubernetes.pod.uid' - } - }, - { - exists: { - field: 'host.name' + ] + } + }, + "aggs": { + "service_environment": { + "multi_terms": { + "size": 100, + "terms": [ + { + "field": "service.name" + }, + { + "field": "service.environment", + "missing": MISSING_KEY } - }, - { - exists: { - field: 'host.hostname' + ] + }, + "aggs": { + "container_host": { + "multi_terms": { + "size": 100, + "terms": [ + { "field": "container.id", "missing": MISSING_KEY }, + { "field": "host.hostname", "missing": MISSING_KEY } + ] } } - ], - minimum_should_match: 1 + } } } }; - console.log(JSON.stringify(dsl)); + const esResponse = await esClient.search(dsl); + const serviceEnvironment = esResponse.aggregations?.service_environment as { buckets: any[] }; - const docs = esResponse.hits.hits.reduce((acc, hit) => { - const { fields = {} } = hit; - const serviceName = fields['service.name']; - const serviceEnvironment = fields['service.environment']; - const containerId = fields['container.id']; - const podUid = fields['kubernetes.pod.uid']; - const nodeName = fields['kubernetes.node.name']; + const docs = serviceEnvironment.buckets.reduce((acc: any, hit: any) => { + const [serviceName, environment] = hit.key; + const containerHosts = hit.container_host.buckets; - const serviceEan = `service:${serviceName}`; - const containerEan = containerId ? `container:${containerId}` : null; - const podEan = podUid ? `k8s.pod:${podUid}` : null; - const nodeEan = nodeName ? `k8s.node:${nodeName}` : null; const service: SimpleAsset<'service'> = { '@timestamp': new Date(), 'asset.type': 'service', + 'asset.kind': 'service', 'asset.id': serviceName, - 'asset.ean': serviceEan, + 'asset.ean': `service:${serviceName}`, 'asset.references': [], - 'service.environment': serviceEnvironment // TODO: Should this be part of the service's ID/EAN? + 'asset.parents': [], }; - if (containerEan) { - service['asset.parents'] = [containerEan]; - } - - if (fields['cloud.provider']) { - service['cloud.provider'] = fields['cloud.provider']; + if (environment != MISSING_KEY) { + service['service.environment'] = environment; } - if (podEan) { - service['asset.references']?.push(podEan); - } - - if (nodeEan) { - service['asset.references']?.push(nodeEan); - } - - acc.services.push(service); - - if (!containerEan) { - return acc; - } - - const foundContainer = acc.containers.find((collectedContainer) => collectedContainer['asset.ean'] === containerEan); - - if (foundContainer) { - if (foundContainer['asset.children']) { - foundContainer['asset.children'].push(serviceEan); - } else { - foundContainer['asset.children'] = [serviceEan]; - } - - if (podEan) { - if (foundContainer['asset.parents']) { - foundContainer['asset.parents'].push(podEan); - } else { - foundContainer['asset.parents'] = [podEan]; - } + containerHosts.forEach((hit: any) => { + const [containerId, hostname] = hit.key; + if (containerId !== MISSING_KEY) { + service['asset.parents']?.push(`container:${containerId}`); } - if (nodeEan) { - foundContainer['asset.references']?.push(nodeEan); + if (hostname !== MISSING_KEY) { + service['asset.references']?.push(`host:${hostname}`); } - } else { - const container: SimpleAsset<'container'> = { - '@timestamp': new Date(), - 'asset.type': 'container', - 'asset.id': containerId, - 'asset.ean': containerEan, - 'asset.references': [], - 'asset.children': [serviceEan] - }; + }); - if (podEan) { - container['asset.parents'] = [podEan]; - } - - if (nodeEan) { - container['asset.references']?.push(nodeEan); - } - - acc.containers.push(container); - } + acc.services.push(service); return acc; - }, { services: [], containers: [] }); + }, { services: [] }); - return { esResponse, docs }; -} \ No newline at end of file + return [...docs.services]; +} diff --git a/run.ts b/run.ts index c5e5f3a..00d3aa6 100644 --- a/run.ts +++ b/run.ts @@ -1,24 +1,41 @@ import yargs from "yargs/yargs"; import { collectHosts } from "./lib/collectHostsFromMetrics"; +import { collectServices } from "./lib/collectServices"; import { AssetClient, getEsClient } from "./lib/es_client"; import { HostType, SimpleAsset } from "./types"; import config from "./config/config.json"; +import { collectPods } from "./lib/collectPods"; +import { collectContainers } from "./lib/collectContainers"; main(); +const assetToBulkOperation = (asset: SimpleAsset) => { + return [ + { create: { _index: `assets-${asset['asset.type']}-default` } }, + asset, + ]; +}; + async function etl(esClient: AssetClient) { - const hosts = await collectHosts({ esClient: esClient.reader }); + await collectHosts({ esClient: esClient.reader }) + .then(assets => assets.flatMap(assetToBulkOperation)) + .then(bulkBody => esClient.writer.bulk({ body: bulkBody })) + .then(response => console.log(`wrote ${response.items.length} hosts; errors ? ${response.errors}`)); - const bulkBody = hosts.flatMap((asset: SimpleAsset) => { - return [ - { create: { _index: `assets-${asset['asset.type']}-default` } }, - asset, - ]; - }) + await collectPods({ esClient: esClient.reader }) + .then(assets => assets.flatMap(assetToBulkOperation)) + .then(bulkBody => esClient.writer.bulk({ body: bulkBody })) + .then(response => console.log(`wrote ${response.items.length} pods; errors ? ${response.errors}`)); - const response = await esClient.writer.bulk({ body: bulkBody }); + await collectContainers({ esClient: esClient.reader }) + .then(assets => assets.flatMap(assetToBulkOperation)) + .then(bulkBody => esClient.writer.bulk({ body: bulkBody })) + .then(response => console.log(`wrote ${response.items.length} containers; errors ? ${response.errors}`)); - console.log(`wrote ${response.items.length} assets; errors ? ${response.errors}`); + await collectServices({ esClient: esClient.reader }) + .then(assets => assets.flatMap(assetToBulkOperation)) + .then(bulkBody => esClient.writer.bulk({ body: bulkBody })) + .then(response => console.log(`wrote ${response.items.length} services; errors ? ${response.errors}`)); } async function main() { diff --git a/types.ts b/types.ts index 057786c..0296e36 100644 --- a/types.ts +++ b/types.ts @@ -14,6 +14,7 @@ export interface SimpleAsset { 'cloud.instance.id'?: string; 'orchestrator.cluster.name'?: string; 'service.environment'?: string; + 'kubernetes.node.hostname'?: string; } -export type HostType = 'host' | 'aws.ec2' | 'k8s.node' | 'gcp.gce'; \ No newline at end of file +export type HostType = 'host' | 'aws.ec2' | 'k8s.node' | 'gcp.gce'; From 0a8a91e09229de875892323878165e66d92b7600 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 4 May 2023 13:30:49 +0200 Subject: [PATCH 8/9] cleanup --- constants.ts | 2 +- lib/collectContainers.ts | 3 +-- lib/{collectHostsFromMetrics.ts => collectHosts.ts} | 9 --------- lib/collectPods.ts | 3 --- lib/collectServices.ts | 8 +------- run.ts | 6 +++--- 6 files changed, 6 insertions(+), 25 deletions(-) rename lib/{collectHostsFromMetrics.ts => collectHosts.ts} (88%) diff --git a/constants.ts b/constants.ts index 1bd859d..f8131aa 100644 --- a/constants.ts +++ b/constants.ts @@ -1,5 +1,5 @@ export const LOGS_INDICES = 'logs-*,filebeat-*'; -export const APM_INDICES = 'traces-*,apm*,metrics-apm*'; +export const APM_INDICES = 'traces-*,apm*,metrics-apm*,logs-apm*'; export const METRICS_INDICES = 'metrics-*,metricbeat-*'; export const REMOTE_LOGS_INDICES = 'remote_cluster:logs-*,remote_cluster:filebeat-*'; diff --git a/lib/collectContainers.ts b/lib/collectContainers.ts index e0f0080..8251a51 100644 --- a/lib/collectContainers.ts +++ b/lib/collectContainers.ts @@ -36,7 +36,7 @@ export async function collectContainers({ esClient }: { esClient: Client }) { } } ], - must: [ + should: [ { exists: { field: 'kubernetes.container.id' } }, { exists: { field: 'kubernetes.pod.uid' } }, { exists: { field: 'host.hostname' } }, @@ -47,7 +47,6 @@ export async function collectContainers({ esClient }: { esClient: Client }) { const esResponse = await esClient.search(dsl); - // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each const docs = esResponse.hits.hits.reduce((acc, hit) => { const { fields = {} } = hit; const containerId = fields['container.id']; diff --git a/lib/collectHostsFromMetrics.ts b/lib/collectHosts.ts similarity index 88% rename from lib/collectHostsFromMetrics.ts rename to lib/collectHosts.ts index 3c22973..f9e3b8d 100644 --- a/lib/collectHostsFromMetrics.ts +++ b/lib/collectHosts.ts @@ -49,7 +49,6 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< const esResponse = await esClient.search(dsl); - // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each const assets = esResponse.hits.hits.reduce((acc, hit) => { const { fields = {} } = hit; const hostName = fields['host.hostname']; @@ -98,11 +97,3 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< return assets.hosts; } - -function getHostType(fields: any): HostType { - if (fields['cloud.provider'] && fields['cloud.service.name']) { - return `${fields['cloud.provider']}.${fields['cloud.service.name']}`.toLowerCase() as HostType; - } - - return 'host'; -}; diff --git a/lib/collectPods.ts b/lib/collectPods.ts index 2defc4c..a766a63 100644 --- a/lib/collectPods.ts +++ b/lib/collectPods.ts @@ -7,7 +7,6 @@ interface CollectPodsAndNodes { } export async function collectPods({ esClient }: { esClient: Client }) { - // STEP ONE: Query pods that reference their k8s nodes const dsl = { index: [getLogsIndices(), getApmIndices(), getMetricsIndices()], size: 1000, @@ -15,7 +14,6 @@ export async function collectPods({ esClient }: { esClient: Client }) { field: 'kubernetes.pod.uid' }, sort: [ - { '_score': 'desc' }, { '@timestamp': 'desc' } ], _source: false, @@ -47,7 +45,6 @@ export async function collectPods({ esClient }: { esClient: Client }) { const esResponse = await esClient.search(dsl); - // STEP TWO: Loop over collected pod documents and create a pod asset doc AND a node asset doc for each const docs = esResponse.hits.hits.reduce((acc, hit) => { const { fields = {} } = hit; const podUid = fields['kubernetes.pod.uid']; diff --git a/lib/collectServices.ts b/lib/collectServices.ts index 12d445e..9bab9de 100644 --- a/lib/collectServices.ts +++ b/lib/collectServices.ts @@ -8,15 +8,9 @@ interface CollectServices { const MISSING_KEY = "__unknown__"; -/** - * service.name|service.environment - * -> containers|hostname - */ - export async function collectServices({ esClient }: { esClient: Client }): Promise[]> { - // STEP ONE: Query pods that reference their k8s nodes const dsl = { - index: [getApmIndices(), getLogsIndices(), getMetricsIndices()].concat(','), + index: getApmIndices(), "size": 0, "sort": [ { diff --git a/run.ts b/run.ts index 00d3aa6..38212e7 100644 --- a/run.ts +++ b/run.ts @@ -1,11 +1,11 @@ import yargs from "yargs/yargs"; -import { collectHosts } from "./lib/collectHostsFromMetrics"; +import { collectHosts } from "./lib/collectHosts"; import { collectServices } from "./lib/collectServices"; +import { collectPods } from "./lib/collectPods"; +import { collectContainers } from "./lib/collectContainers"; import { AssetClient, getEsClient } from "./lib/es_client"; import { HostType, SimpleAsset } from "./types"; import config from "./config/config.json"; -import { collectPods } from "./lib/collectPods"; -import { collectContainers } from "./lib/collectContainers"; main(); From 10530b8426b1563a3766150e405d45c235332e87 Mon Sep 17 00:00:00 2001 From: klacabane Date: Thu, 4 May 2023 13:53:13 +0200 Subject: [PATCH 9/9] remove asset.type property --- lib/collectContainers.ts | 9 ++++----- lib/collectHosts.ts | 13 ++++++------- lib/collectPods.ts | 13 ++++++------- lib/collectServices.ts | 7 +++---- run.ts | 6 +++--- types.ts | 4 +--- 6 files changed, 23 insertions(+), 29 deletions(-) diff --git a/lib/collectContainers.ts b/lib/collectContainers.ts index 8251a51..ed87bc2 100644 --- a/lib/collectContainers.ts +++ b/lib/collectContainers.ts @@ -3,7 +3,7 @@ import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset } from "../types"; interface CollectContainers { - containers: SimpleAsset<'container'>[]; + containers: SimpleAsset[]; } export async function collectContainers({ esClient }: { esClient: Client }) { @@ -53,15 +53,14 @@ export async function collectContainers({ esClient }: { esClient: Client }) { const podUid = fields['kubernetes.pod.uid']; const nodeName = fields['kubernetes.node.name']; - const parentEan = podUid ? `k8s.pod:${podUid}` : fields['host.hostname']; + const parentEan = podUid ? `pod:${podUid}` : `host:${fields['host.hostname']}`; - const container: SimpleAsset<'container'> = { + const container: SimpleAsset = { '@timestamp': new Date(), - 'asset.type': 'container', 'asset.kind': 'container', 'asset.id': containerId, 'asset.ean': `container:${containerId}`, - 'asset.parents': parentEan && [parentEan], + 'asset.parents': [parentEan], }; acc.containers.push(container); diff --git a/lib/collectHosts.ts b/lib/collectHosts.ts index f9e3b8d..7c8fb0b 100644 --- a/lib/collectHosts.ts +++ b/lib/collectHosts.ts @@ -1,12 +1,12 @@ import { Client } from "@elastic/elasticsearch"; import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; -import { SimpleAsset, HostType } from "../types"; +import { SimpleAsset } from "../types"; interface CollectHosts { - hosts: SimpleAsset[]; + hosts: SimpleAsset[]; } -export async function collectHosts({ esClient }: { esClient: Client }): Promise[]> { +export async function collectHosts({ esClient }: { esClient: Client }): Promise { const dsl = { index: [getMetricsIndices(), getLogsIndices(), getApmIndices()], size: 1000, @@ -55,11 +55,10 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< const k8sNode = fields['kubernetes.node.name']; const k8sPod = fields['kubernetes.pod.uid']; - const hostEan = `${k8sNode ? 'k8s.node:' + k8sNode : 'host:' + hostName}`; + const hostEan = `host:${k8sNode || hostName}`; - const host: SimpleAsset = { + const host: SimpleAsset = { '@timestamp': new Date(), - 'asset.type': k8sNode ? 'k8s.node' : 'host', 'asset.kind': 'host', 'asset.id': k8sNode || hostName, 'asset.name': k8sNode || hostName, @@ -87,7 +86,7 @@ export async function collectHosts({ esClient }: { esClient: Client }): Promise< } if (k8sPod) { - host['asset.children'] = [`k8s.pod:${k8sPod}`]; + host['asset.children'] = [`pod:${k8sPod}`]; } acc.hosts.push(host); diff --git a/lib/collectPods.ts b/lib/collectPods.ts index a766a63..dc719f2 100644 --- a/lib/collectPods.ts +++ b/lib/collectPods.ts @@ -2,8 +2,8 @@ import { Client } from "@elastic/elasticsearch"; import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset } from "../types"; -interface CollectPodsAndNodes { - pods: SimpleAsset<'k8s.pod'>[]; +interface CollectPods { + pods: SimpleAsset[]; } export async function collectPods({ esClient }: { esClient: Client }) { @@ -45,19 +45,18 @@ export async function collectPods({ esClient }: { esClient: Client }) { const esResponse = await esClient.search(dsl); - const docs = esResponse.hits.hits.reduce((acc, hit) => { + const docs = esResponse.hits.hits.reduce((acc, hit) => { const { fields = {} } = hit; const podUid = fields['kubernetes.pod.uid']; const nodeName = fields['kubernetes.node.name']; const clusterName = fields['orchestrator.cluster.name']; - const pod: SimpleAsset<'k8s.pod'> = { + const pod: SimpleAsset = { '@timestamp': new Date(), - 'asset.type': 'k8s.pod', 'asset.kind': 'pod', 'asset.id': podUid, - 'asset.ean': `k8s.pod:${podUid}`, - 'asset.parents': [`k8s.node:${nodeName}`] + 'asset.ean': `pod:${podUid}`, + 'asset.parents': [`host:${nodeName}`] }; if (fields['cloud.provider']) { diff --git a/lib/collectServices.ts b/lib/collectServices.ts index 9bab9de..dba45a6 100644 --- a/lib/collectServices.ts +++ b/lib/collectServices.ts @@ -3,12 +3,12 @@ import { getApmIndices, getLogsIndices, getMetricsIndices } from "../constants"; import { SimpleAsset } from "../types"; interface CollectServices { - services: SimpleAsset<'service'>[]; + services: SimpleAsset[]; } const MISSING_KEY = "__unknown__"; -export async function collectServices({ esClient }: { esClient: Client }): Promise[]> { +export async function collectServices({ esClient }: { esClient: Client }): Promise { const dsl = { index: getApmIndices(), "size": 0, @@ -75,9 +75,8 @@ export async function collectServices({ esClient }: { esClient: Client }): Promi const [serviceName, environment] = hit.key; const containerHosts = hit.container_host.buckets; - const service: SimpleAsset<'service'> = { + const service: SimpleAsset = { '@timestamp': new Date(), - 'asset.type': 'service', 'asset.kind': 'service', 'asset.id': serviceName, 'asset.ean': `service:${serviceName}`, diff --git a/run.ts b/run.ts index 38212e7..037c43a 100644 --- a/run.ts +++ b/run.ts @@ -4,14 +4,14 @@ import { collectServices } from "./lib/collectServices"; import { collectPods } from "./lib/collectPods"; import { collectContainers } from "./lib/collectContainers"; import { AssetClient, getEsClient } from "./lib/es_client"; -import { HostType, SimpleAsset } from "./types"; +import { SimpleAsset } from "./types"; import config from "./config/config.json"; main(); -const assetToBulkOperation = (asset: SimpleAsset) => { +const assetToBulkOperation = (asset: SimpleAsset) => { return [ - { create: { _index: `assets-${asset['asset.type']}-default` } }, + { create: { _index: `assets-${asset['asset.kind']}-default` } }, asset, ]; }; diff --git a/types.ts b/types.ts index 0296e36..0965c15 100644 --- a/types.ts +++ b/types.ts @@ -1,7 +1,6 @@ -export interface SimpleAsset { +export interface SimpleAsset { '@timestamp': Date; 'asset.ean': string; - 'asset.type': T; 'asset.id': string; 'asset.kind': string; 'asset.name'?: string; @@ -17,4 +16,3 @@ export interface SimpleAsset { 'kubernetes.node.hostname'?: string; } -export type HostType = 'host' | 'aws.ec2' | 'k8s.node' | 'gcp.gce';