diff --git a/x-pack/plugins/apm/common/__snapshots__/apm_telemetry.test.ts.snap b/x-pack/plugins/apm/common/__snapshots__/apm_telemetry.test.ts.snap index 1d8cfa28aea75..708758f2c6e58 100644 --- a/x-pack/plugins/apm/common/__snapshots__/apm_telemetry.test.ts.snap +++ b/x-pack/plugins/apm/common/__snapshots__/apm_telemetry.test.ts.snap @@ -527,6 +527,70 @@ exports[`APM telemetry helpers getApmTelemetry generates a JSON object with the } } }, + "aggregated_transactions": { + "properties": { + "current_implementation": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + }, + "no_observer_name": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + }, + "no_rum": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + }, + "no_rum_no_observer_name": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + }, + "only_rum": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + }, + "only_rum_no_observer_name": { + "properties": { + "expected_metric_document_count": { + "type": "long" + }, + "transaction_count": { + "type": "long" + } + } + } + } + }, "cloud": { "properties": { "availability_zone": { @@ -647,12 +711,14 @@ exports[`APM telemetry helpers getApmTelemetry generates a JSON object with the "client": { "properties": { "geo": { - "properites": { + "properties": { "country_iso_code": { - "rum": { - "properties": { - "1d": { - "type": "long" + "properties": { + "rum": { + "properties": { + "1d": { + "type": "long" + } } } } @@ -831,6 +897,17 @@ exports[`APM telemetry helpers getApmTelemetry generates a JSON object with the }, "tasks": { "properties": { + "aggregated_transactions": { + "properties": { + "took": { + "properties": { + "ms": { + "type": "long" + } + } + } + } + }, "agent_configuration": { "properties": { "took": { diff --git a/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap b/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap index f7f2836745384..7c42fb6f12a54 100644 --- a/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap +++ b/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap @@ -70,6 +70,8 @@ exports[`Error METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; exports[`Error OBSERVER_LISTENING 1`] = `undefined`; +exports[`Error OBSERVER_NAME 1`] = `"an observer"`; + exports[`Error OBSERVER_VERSION_MAJOR 1`] = `8`; exports[`Error PARENT_ID 1`] = `"parentId"`; @@ -216,6 +218,8 @@ exports[`Span METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; exports[`Span OBSERVER_LISTENING 1`] = `undefined`; +exports[`Span OBSERVER_NAME 1`] = `"an observer"`; + exports[`Span OBSERVER_VERSION_MAJOR 1`] = `8`; exports[`Span PARENT_ID 1`] = `"parentId"`; @@ -362,6 +366,8 @@ exports[`Transaction METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; exports[`Transaction OBSERVER_LISTENING 1`] = `undefined`; +exports[`Transaction OBSERVER_NAME 1`] = `"an observer"`; + exports[`Transaction OBSERVER_VERSION_MAJOR 1`] = `8`; exports[`Transaction PARENT_ID 1`] = `"parentId"`; diff --git a/x-pack/plugins/apm/common/apm_telemetry.test.ts b/x-pack/plugins/apm/common/apm_telemetry.test.ts index 035c546a5b49a..38f8e17d07485 100644 --- a/x-pack/plugins/apm/common/apm_telemetry.test.ts +++ b/x-pack/plugins/apm/common/apm_telemetry.test.ts @@ -4,7 +4,10 @@ * you may not use this file except in compliance with the Elastic License. */ -import { getApmTelemetryMapping } from './apm_telemetry'; +import { + getApmTelemetryMapping, + mergeApmTelemetryMapping, +} from './apm_telemetry'; // Add this snapshot serializer for this test. The default snapshot serializer // prints "Object" next to objects in the JSON output, but we want to be able to @@ -43,4 +46,43 @@ describe('APM telemetry helpers', () => { }).toMatchSnapshot(); }); }); + + describe('mergeApmTelemetryMapping', () => { + describe('with an invalid mapping', () => { + it('throws an error', () => { + expect(() => mergeApmTelemetryMapping({})).toThrowError(); + }); + }); + + describe('with a valid mapping', () => { + it('merges the mapping', () => { + // This is "valid" in the sense that it has all of the deep fields + // needed to merge. It's not a valid mapping opbject. + const validTelemetryMapping = { + mappings: { + properties: { + stack_stats: { + properties: { + kibana: { + properties: { + plugins: { + properties: { + apm: getApmTelemetryMapping(), + }, + }, + }, + }, + }, + }, + }, + }, + }; + + expect( + mergeApmTelemetryMapping(validTelemetryMapping)?.mappings.properties + .stack_stats.properties.kibana.properties.plugins.properties.apm + ).toEqual(getApmTelemetryMapping()); + }); + }); + }); }); diff --git a/x-pack/plugins/apm/common/apm_telemetry.ts b/x-pack/plugins/apm/common/apm_telemetry.ts index 5fb6414674d1c..318b956cd3b3e 100644 --- a/x-pack/plugins/apm/common/apm_telemetry.ts +++ b/x-pack/plugins/apm/common/apm_telemetry.ts @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import { produce } from 'immer'; import { AGENT_NAMES } from './agent_name'; /** @@ -73,6 +74,13 @@ export function getApmTelemetryMapping() { }, }; + const aggregatedTransactionsProperties = { + properties: { + expected_metric_document_count: long, + transaction_count: long, + }, + }; + return { properties: { agents: { @@ -90,6 +98,16 @@ export function getApmTelemetryMapping() { {} ), }, + aggregated_transactions: { + properties: { + current_implementation: aggregatedTransactionsProperties, + no_observer_name: aggregatedTransactionsProperties, + no_rum: aggregatedTransactionsProperties, + no_rum_no_observer_name: aggregatedTransactionsProperties, + only_rum: aggregatedTransactionsProperties, + only_rum_no_observer_name: aggregatedTransactionsProperties, + }, + }, cloud: { properties: { availability_zone: keyword, @@ -117,8 +135,8 @@ export function getApmTelemetryMapping() { client: { properties: { geo: { - properites: { - country_iso_code: { rum: oneDayProperties }, + properties: { + country_iso_code: { properties: { rum: oneDayProperties } }, }, }, }, @@ -204,6 +222,7 @@ export function getApmTelemetryMapping() { }, tasks: { properties: { + aggregated_transactions: tookProperties, agent_configuration: tookProperties, agents: tookProperties, cardinality: tookProperties, @@ -230,3 +249,16 @@ export function getApmTelemetryMapping() { }, }; } + +/** + * Merge a telemetry mapping object (from https://github.com/elastic/telemetry/blob/master/config/templates/xpack-phone-home.json) + * with the output from `getApmTelemetryMapping`. + */ +export function mergeApmTelemetryMapping( + xpackPhoneHomeMapping: Record +) { + return produce(xpackPhoneHomeMapping, (draft: Record) => { + draft.mappings.properties.stack_stats.properties.kibana.properties.plugins.properties.apm = getApmTelemetryMapping(); + return draft; + }); +} diff --git a/x-pack/plugins/apm/common/elasticsearch_fieldnames.test.ts b/x-pack/plugins/apm/common/elasticsearch_fieldnames.test.ts index a9eb95cf37d4d..2b9b2c85e0bb5 100644 --- a/x-pack/plugins/apm/common/elasticsearch_fieldnames.test.ts +++ b/x-pack/plugins/apm/common/elasticsearch_fieldnames.test.ts @@ -16,6 +16,7 @@ describe('Transaction', () => { '@timestamp': new Date().toString(), '@metadata': 'whatever', observer: { + name: 'an observer', version: 'whatever', version_major: 8, }, @@ -72,6 +73,7 @@ describe('Span', () => { '@timestamp': new Date().toString(), '@metadata': 'whatever', observer: { + name: 'an observer', version: 'whatever', version_major: 8, }, @@ -124,6 +126,7 @@ describe('Error', () => { const errorDoc: AllowUnknownProperties = { '@metadata': 'whatever', observer: { + name: 'an observer', version: 'whatever', version_major: 8, }, diff --git a/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts b/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts index d8d3827909b07..610a32e8e9b99 100644 --- a/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts +++ b/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts @@ -31,6 +31,7 @@ export const USER_AGENT_NAME = 'user_agent.name'; export const DESTINATION_ADDRESS = 'destination.address'; +export const OBSERVER_NAME = 'observer.name'; export const OBSERVER_VERSION_MAJOR = 'observer.version_major'; export const OBSERVER_LISTENING = 'observer.listening'; export const PROCESSOR_EVENT = 'processor.event'; diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts index ea2b57c01acff..eafd0f04b9d10 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts @@ -4,8 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { tasks } from './tasks'; +import { AGENT_NAME } from '../../../../common/elasticsearch_fieldnames'; import { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices'; +import { tasks } from './tasks'; describe('data telemetry collection tasks', () => { const indices = { @@ -15,6 +16,109 @@ describe('data telemetry collection tasks', () => { 'apm_oss.transactionIndices': 'apm-8.0.0-transaction', } as ApmIndicesConfig; + describe('aggregated_transactions', () => { + const task = tasks.find((t) => t.name === 'aggregated_transactions'); + + it('returns aggregated transaction counts', async () => { + // This mock implementation returns different values based on the parameters, + // which should simulate all the queries that are done. For most of them we'll + // simulate the number of buckets by using the length of the key, but for a + // couple we'll simulate being paginated by returning an after_key. + const search = jest.fn().mockImplementation((params) => { + const isRumResult = + params.body.query.bool.filter && + params.body.query.bool.filter.some( + (filter: any) => + filter.terms && filter.terms[AGENT_NAME]?.includes('rum-js') + ); + const isNonRumResult = + params.body.query.bool.filter && + params.body.query.bool.filter.some( + (filter: any) => + filter.terms && !filter.terms[AGENT_NAME]?.includes('rum-js') + ); + const isPagedResult = + !!params.body.aggs?.current_implementation?.composite.after || + !!params.body.aggs?.no_observer_name?.composite.after; + const isTotalResult = 'track_total_hits' in params.body; + const key = Object.keys(params.body.aggs ?? [])[0]; + + if (isRumResult) { + if (isTotalResult) { + return Promise.resolve({ hits: { total: { value: 3000 } } }); + } + } + + if (isNonRumResult) { + if (isTotalResult) { + return Promise.resolve({ hits: { total: { value: 2000 } } }); + } + } + + if (isPagedResult && key) { + return Promise.resolve({ + hits: { total: { value: key.length } }, + aggregations: { [key]: { buckets: [{}] } }, + }); + } + + if (isTotalResult) { + return Promise.resolve({ hits: { total: { value: 1000 } } }); + } + + if ( + key === 'current_implementation' || + (key === 'no_observer_name' && !isPagedResult) + ) { + return Promise.resolve({ + hits: { total: { value: key.length } }, + aggregations: { + [key]: { after_key: {}, buckets: key.split('').map((_) => ({})) }, + }, + }); + } + + if (key) { + return Promise.resolve({ + hits: { total: { value: key.length } }, + aggregations: { + [key]: { buckets: key.split('').map((_) => ({})) }, + }, + }); + } + }); + + expect(await task?.executor({ indices, search } as any)).toEqual({ + aggregated_transactions: { + current_implementation: { + expected_metric_document_count: 23, + transaction_count: 1000, + }, + no_observer_name: { + expected_metric_document_count: 17, + transaction_count: 1000, + }, + no_rum: { + expected_metric_document_count: 6, + transaction_count: 2000, + }, + no_rum_no_observer_name: { + expected_metric_document_count: 23, + transaction_count: 2000, + }, + only_rum: { + expected_metric_document_count: 8, + transaction_count: 3000, + }, + only_rum_no_observer_name: { + expected_metric_document_count: 25, + transaction_count: 3000, + }, + }, + }); + }); + }); + describe('cloud', () => { const task = tasks.find((t) => t.name === 'cloud'); diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts index 2ecb5a935893f..840f47b043418 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts @@ -5,7 +5,7 @@ */ import { flatten, merge, sortBy, sum } from 'lodash'; import { TelemetryTask } from '.'; -import { AGENT_NAMES } from '../../../../common/agent_name'; +import { AGENT_NAMES, RUM_AGENTS } from '../../../../common/agent_name'; import { AGENT_NAME, AGENT_VERSION, @@ -13,9 +13,14 @@ import { CLOUD_AVAILABILITY_ZONE, CLOUD_PROVIDER, CLOUD_REGION, + CONTAINER_ID, ERROR_GROUP_ID, + HOST_NAME, + OBSERVER_NAME, PARENT_ID, + POD_NAME, PROCESSOR_EVENT, + SERVICE_ENVIRONMENT, SERVICE_FRAMEWORK_NAME, SERVICE_FRAMEWORK_VERSION, SERVICE_LANGUAGE_NAME, @@ -23,9 +28,14 @@ import { SERVICE_NAME, SERVICE_RUNTIME_NAME, SERVICE_RUNTIME_VERSION, + SERVICE_VERSION, TRANSACTION_NAME, + TRANSACTION_RESULT, + TRANSACTION_TYPE, + USER_AGENT_NAME, USER_AGENT_ORIGINAL, } from '../../../../common/elasticsearch_fieldnames'; +import { ESFilter } from '../../../../typings/elasticsearch'; import { APMError } from '../../../../typings/es_schemas/ui/apm_error'; import { AgentName } from '../../../../typings/es_schemas/ui/fields/agent'; import { Span } from '../../../../typings/es_schemas/ui/span'; @@ -39,6 +49,166 @@ const range1d = { range: { '@timestamp': { gte: 'now-1d' } } }; const timeout = '5m'; export const tasks: TelemetryTask[] = [ + { + name: 'aggregated_transactions', + // Record the number of metric documents we can expect in different scenarios. We simulate this by requesting data for 1m, + // adding a composite aggregation on a number of fields and counting the number of buckets. The resulting count is an + // approximation of the amount of metric documents that will be created. We record both the expected metric document count plus + // the transaction count for that time range. + executor: async ({ indices, search }) => { + async function getBucketCountFromPaginatedQuery( + key: string, + filter: ESFilter[], + count: number = 0, + after?: any + ) { + const params = { + index: [indices['apm_oss.transactionIndices']], + body: { + size: 0, + timeout, + query: { bool: { filter } }, + aggs: { + [key]: { + composite: { + ...(after ? { after } : {}), + size: 10000, + sources: fieldMap[key].map((field) => ({ + [field]: { terms: { field, missing_bucket: true } }, + })), + }, + }, + }, + }, + }; + const result = await search(params); + let nextAfter: any; + + if (result.aggregations) { + nextAfter = result.aggregations[key].after_key; + count += result.aggregations[key].buckets.length; + } + + if (nextAfter) { + count = await getBucketCountFromPaginatedQuery( + key, + filter, + count, + nextAfter + ); + } + + return count; + } + + async function totalSearch(filter: ESFilter[]) { + const result = await search({ + index: [indices['apm_oss.transactionIndices']], + body: { + size: 0, + timeout, + query: { bool: { filter } }, + track_total_hits: true, + }, + }); + + return result.hits.total.value; + } + + const nonRumAgentNames = AGENT_NAMES.filter( + (name) => !RUM_AGENTS.includes(name) + ); + + const filter: ESFilter[] = [ + { term: { [PROCESSOR_EVENT]: 'transaction' } }, + { range: { '@timestamp': { gte: 'now-1m' } } }, + ]; + const noRumFilter = [ + ...filter, + { terms: { [AGENT_NAME]: nonRumAgentNames } }, + ]; + const rumFilter = [...filter, { terms: { [AGENT_NAME]: RUM_AGENTS } }]; + + const baseFields = [ + TRANSACTION_NAME, + TRANSACTION_RESULT, + TRANSACTION_TYPE, + AGENT_NAME, + SERVICE_ENVIRONMENT, + SERVICE_VERSION, + HOST_NAME, + CONTAINER_ID, + POD_NAME, + ]; + + const fieldMap: Record = { + current_implementation: [OBSERVER_NAME, ...baseFields, USER_AGENT_NAME], + no_observer_name: [...baseFields, USER_AGENT_NAME], + no_rum: [OBSERVER_NAME, ...baseFields], + no_rum_no_observer_name: baseFields, + only_rum: [OBSERVER_NAME, ...baseFields, USER_AGENT_NAME], + only_rum_no_observer_name: [...baseFields, USER_AGENT_NAME], + }; + + // It would be more performant to do these in parallel, but we have different filters and keys and it's easier to + // understand if we make the code slower and longer + const countMap: Record = { + current_implementation: await getBucketCountFromPaginatedQuery( + 'current_implementation', + filter + ), + no_observer_name: await getBucketCountFromPaginatedQuery( + 'no_observer_name', + filter + ), + no_rum: await getBucketCountFromPaginatedQuery('no_rum', noRumFilter), + no_rum_no_observer_name: await getBucketCountFromPaginatedQuery( + 'no_rum_no_observer_name', + noRumFilter + ), + only_rum: await getBucketCountFromPaginatedQuery('only_rum', rumFilter), + only_rum_no_observer_name: await getBucketCountFromPaginatedQuery( + 'only_rum_no_observer_name', + rumFilter + ), + }; + + const [allCount, noRumCount, rumCount] = await Promise.all([ + totalSearch(filter), + totalSearch(noRumFilter), + totalSearch(rumFilter), + ]); + + return { + aggregated_transactions: { + current_implementation: { + transaction_count: allCount, + expected_metric_document_count: countMap.current_implementation, + }, + no_observer_name: { + transaction_count: allCount, + expected_metric_document_count: countMap.no_observer_name, + }, + no_rum: { + transaction_count: noRumCount, + expected_metric_document_count: countMap.no_rum, + }, + no_rum_no_observer_name: { + transaction_count: noRumCount, + expected_metric_document_count: countMap.no_rum_no_observer_name, + }, + only_rum: { + transaction_count: rumCount, + expected_metric_document_count: countMap.only_rum, + }, + only_rum_no_observer_name: { + transaction_count: rumCount, + expected_metric_document_count: countMap.only_rum_no_observer_name, + }, + }, + }; + }, + }, { name: 'cloud', executor: async ({ indices, search }) => { @@ -742,10 +912,7 @@ export const tasks: TelemetryTask[] = [ timeout, query: { bool: { - filter: [ - range1d, - { terms: { [AGENT_NAME]: ['rum-js', 'js-base'] } }, - ], + filter: [range1d, { terms: { [AGENT_NAME]: RUM_AGENTS } }], }, }, aggs: { diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/types.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/types.ts index 4c376aac52f5b..82e4d1e395ed3 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/types.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/types.ts @@ -15,6 +15,11 @@ export interface TimeframeMap { export type TimeframeMap1d = Pick; export type TimeframeMapAll = Pick; +export interface AggregatedTransactionsCounts { + expected_metric_document_count: number; + transaction_count: number; +} + export type APMDataTelemetry = DeepPartial<{ has_any_services: boolean; services_per_agent: Record; @@ -25,6 +30,14 @@ export type APMDataTelemetry = DeepPartial<{ patch: number; }; }; + aggregated_transactions: { + current_implementation: AggregatedTransactionsCounts; + no_observer_name: AggregatedTransactionsCounts; + no_rum: AggregatedTransactionsCounts; + no_rum_no_observer_name: AggregatedTransactionsCounts; + only_rum: AggregatedTransactionsCounts; + only_rum_no_observer_name: AggregatedTransactionsCounts; + }; cloud: { availability_zone: string[]; provider: string[]; @@ -108,6 +121,7 @@ export type APMDataTelemetry = DeepPartial<{ }; }; tasks: Record< + | 'aggregated_transactions' | 'cloud' | 'processor_events' | 'agent_configuration'