From 85546e5fad8837946574f75f28a615f199371790 Mon Sep 17 00:00:00 2001 From: Or Ouziel Date: Wed, 16 Nov 2022 15:29:46 +0200 Subject: [PATCH 1/4] [Cloud Posture] change empty state link to add-integration screen (#145342) --- .../common/navigation/use_navigate_to_cis_integration.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/cloud_security_posture/public/common/navigation/use_navigate_to_cis_integration.ts b/x-pack/plugins/cloud_security_posture/public/common/navigation/use_navigate_to_cis_integration.ts index 76d85983369dd..a19be2f8fd629 100644 --- a/x-pack/plugins/cloud_security_posture/public/common/navigation/use_navigate_to_cis_integration.ts +++ b/x-pack/plugins/cloud_security_posture/public/common/navigation/use_navigate_to_cis_integration.ts @@ -6,6 +6,7 @@ */ import { pagePathGetters, pkgKeyFromPackageInfo } from '@kbn/fleet-plugin/public'; +import { CLOUD_SECURITY_POSTURE_PACKAGE_NAME } from '../../../common/constants'; import { useCisKubernetesIntegration } from '../api/use_cis_kubernetes_integration'; import { useKibana } from '../hooks/use_kibana'; @@ -16,7 +17,8 @@ export const useCISIntegrationLink = (): string | undefined => { if (!cisIntegration.isSuccess) return; const path = pagePathGetters - .integration_details_overview({ + .add_integration_to_policy({ + integration: CLOUD_SECURITY_POSTURE_PACKAGE_NAME, pkgkey: pkgKeyFromPackageInfo({ name: cisIntegration.data.item.name, version: cisIntegration.data.item.version, From 86efac3632c20590e8a8a9d4a1196ee8de3a6206 Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Wed, 16 Nov 2022 14:33:40 +0100 Subject: [PATCH 2/4] [ML] Explain Log Rate Spikes: Use seed for random sampler agg. (#145088) - Moves sample probability calculation to util `getSampleProbability()` - Adds a seed to random sampler aggs to get deterministic results. --- x-pack/packages/ml/agg_utils/index.ts | 2 ++ .../build_random_sampler_aggregation.test.ts | 1 + .../src/build_random_sampler_aggregation.ts | 3 ++ x-pack/packages/ml/agg_utils/src/constants.ts | 10 ++++++ .../src/get_sample_probability.test.ts | 35 +++++++++++++++++++ .../agg_utils/src/get_sample_probability.ts | 18 ++++++++++ .../queries/fetch_change_point_p_values.ts | 3 +- .../routes/queries/fetch_frequent_items.ts | 3 +- .../server/routes/queries/fetch_index_info.ts | 10 ++---- 9 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 x-pack/packages/ml/agg_utils/src/constants.ts create mode 100644 x-pack/packages/ml/agg_utils/src/get_sample_probability.test.ts create mode 100644 x-pack/packages/ml/agg_utils/src/get_sample_probability.ts diff --git a/x-pack/packages/ml/agg_utils/index.ts b/x-pack/packages/ml/agg_utils/index.ts index 444a59cbf0dc4..c20a31f703ff4 100644 --- a/x-pack/packages/ml/agg_utils/index.ts +++ b/x-pack/packages/ml/agg_utils/index.ts @@ -5,9 +5,11 @@ * 2.0. */ +export { RANDOM_SAMPLER_SEED } from './src/constants'; export { buildSamplerAggregation } from './src/build_sampler_aggregation'; export { fetchAggIntervals } from './src/fetch_agg_intervals'; export { fetchHistogramsForFields } from './src/fetch_histograms_for_fields'; +export { getSampleProbability } from './src/get_sample_probability'; export { getSamplerAggregationsResponsePath } from './src/get_sampler_aggregations_response_path'; export { numberValidator } from './src/validate_number'; diff --git a/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.test.ts b/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.test.ts index 3da955e219ef0..cdc242cd4248e 100644 --- a/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.test.ts +++ b/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.test.ts @@ -19,6 +19,7 @@ describe('buildRandomSamplerAggregation', () => { sample: { random_sampler: { probability: 0.01, + seed: 3867412, }, aggs: testAggs, }, diff --git a/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.ts b/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.ts index 63c048fecf42c..c663ec8db5c54 100644 --- a/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.ts +++ b/x-pack/packages/ml/agg_utils/src/build_random_sampler_aggregation.ts @@ -7,6 +7,8 @@ import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { RANDOM_SAMPLER_SEED } from './constants'; + /** * Wraps the supplied aggregations in a random sampler aggregation. * A supplied sample probability of 1 indicates no sampling, and the aggs are returned as-is. @@ -24,6 +26,7 @@ export function buildRandomSamplerAggregation( // @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer` random_sampler: { probability: sampleProbability, + seed: RANDOM_SAMPLER_SEED, }, aggs, }, diff --git a/x-pack/packages/ml/agg_utils/src/constants.ts b/x-pack/packages/ml/agg_utils/src/constants.ts new file mode 100644 index 0000000000000..6631438a47c93 --- /dev/null +++ b/x-pack/packages/ml/agg_utils/src/constants.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +// For the technical preview of Explain Log Rate Spikes we use a hard coded seed. +// In future versions we might use a user specific seed or let the user costumise it. +export const RANDOM_SAMPLER_SEED = 3867412; diff --git a/x-pack/packages/ml/agg_utils/src/get_sample_probability.test.ts b/x-pack/packages/ml/agg_utils/src/get_sample_probability.test.ts new file mode 100644 index 0000000000000..ce6ea9e083cbb --- /dev/null +++ b/x-pack/packages/ml/agg_utils/src/get_sample_probability.test.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { getSampleProbability } from './get_sample_probability'; + +describe('getSampleProbability', () => { + test('returns sample probability of 1 for docs up to required minimum doc count', () => { + expect(getSampleProbability(0)).toEqual(1); + expect(getSampleProbability(1)).toEqual(1); + expect(getSampleProbability(10)).toEqual(1); + expect(getSampleProbability(100)).toEqual(1); + expect(getSampleProbability(1000)).toEqual(1); + expect(getSampleProbability(10000)).toEqual(1); + expect(getSampleProbability(50000)).toEqual(1); + }); + test('returns sample probability of 0.5 for docs in range 50001-100000', () => { + expect(getSampleProbability(50001)).toEqual(0.5); + expect(getSampleProbability(100000)).toEqual(0.5); + }); + test('returns sample probability based on total docs ratio', () => { + expect(getSampleProbability(100001)).toEqual(0.4999950000499995); + expect(getSampleProbability(1000000)).toEqual(0.05); + expect(getSampleProbability(1000001)).toEqual(0.04999995000005); + expect(getSampleProbability(2000000)).toEqual(0.025); + expect(getSampleProbability(5000000)).toEqual(0.01); + expect(getSampleProbability(10000000)).toEqual(0.005); + expect(getSampleProbability(100000000)).toEqual(0.0005); + expect(getSampleProbability(1000000000)).toEqual(0.00005); + expect(getSampleProbability(10000000000)).toEqual(0.000005); + }); +}); diff --git a/x-pack/packages/ml/agg_utils/src/get_sample_probability.ts b/x-pack/packages/ml/agg_utils/src/get_sample_probability.ts new file mode 100644 index 0000000000000..4f1e1be948a02 --- /dev/null +++ b/x-pack/packages/ml/agg_utils/src/get_sample_probability.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +const SAMPLE_PROBABILITY_MIN_DOC_COUNT = 50000; + +export function getSampleProbability(totalDocCount: number) { + let sampleProbability = 1; + + if (totalDocCount > SAMPLE_PROBABILITY_MIN_DOC_COUNT) { + sampleProbability = Math.min(0.5, SAMPLE_PROBABILITY_MIN_DOC_COUNT / totalDocCount); + } + + return sampleProbability; +} diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts index 6400cc08ca4db..65ac7e648eec2 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts @@ -9,7 +9,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { ElasticsearchClient } from '@kbn/core/server'; import type { Logger } from '@kbn/logging'; -import { ChangePoint } from '@kbn/ml-agg-utils'; +import { type ChangePoint, RANDOM_SAMPLER_SEED } from '@kbn/ml-agg-utils'; import { isPopulatedObject } from '@kbn/ml-is-populated-object'; import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants'; import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes'; @@ -85,6 +85,7 @@ export const getChangePointRequest = ( // @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer` random_sampler: { probability: sampleProbability, + seed: RANDOM_SAMPLER_SEED, }, aggs: pValueAgg, }, diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts index da4b8bbe5e792..ff1fba16f28f2 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts @@ -11,7 +11,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { Logger } from '@kbn/logging'; -import type { ChangePoint, FieldValuePair } from '@kbn/ml-agg-utils'; +import { type ChangePoint, type FieldValuePair, RANDOM_SAMPLER_SEED } from '@kbn/ml-agg-utils'; import { isPopulatedObject } from '@kbn/ml-is-populated-object'; const FREQUENT_ITEMS_FIELDS_LIMIT = 15; @@ -127,6 +127,7 @@ export async function fetchFrequentItems( // @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer` random_sampler: { probability: sampleProbability, + seed: RANDOM_SAMPLER_SEED, }, aggs: frequentItemsAgg, }, diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_index_info.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_index_info.ts index f1444ef5972b2..93378911c7201 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_index_info.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_index_info.ts @@ -8,8 +8,8 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { ES_FIELD_TYPES } from '@kbn/field-types'; - import type { ElasticsearchClient } from '@kbn/core/server'; +import { getSampleProbability } from '@kbn/ml-agg-utils'; import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes'; @@ -20,7 +20,6 @@ import { getRequestBase } from './get_request_base'; // `x-pack/plugins/apm/server/routes/correlations/queries/fetch_duration_field_candidates.ts` const POPULATED_DOC_COUNT_SAMPLE_SIZE = 1000; -const SAMPLE_PROBABILITY_MIN_DOC_COUNT = 50000; const SUPPORTED_ES_FIELD_TYPES = [ ES_FIELD_TYPES.KEYWORD, @@ -96,12 +95,7 @@ export const fetchIndexInfo = async ( }); const totalDocCount = (resp.hits.total as estypes.SearchTotalHits).value; - - let sampleProbability = 1; - - if (totalDocCount > SAMPLE_PROBABILITY_MIN_DOC_COUNT) { - sampleProbability = Math.min(0.5, SAMPLE_PROBABILITY_MIN_DOC_COUNT / totalDocCount); - } + const sampleProbability = getSampleProbability(totalDocCount); return { fieldCandidates: [...finalFieldCandidates], sampleProbability, totalDocCount }; }; From f90072df0c17e7af489b83e5d18ca08e100cd199 Mon Sep 17 00:00:00 2001 From: Or Ouziel Date: Wed, 16 Nov 2022 15:34:08 +0200 Subject: [PATCH 3/4] [Cloud Posture] update styles for rules table (#145335) --- .../public/pages/rules/rules_container.tsx | 2 +- .../public/pages/rules/rules_table_header.tsx | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_container.tsx b/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_container.tsx index 84f39bb150c26..98b1655c7195d 100644 --- a/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_container.tsx +++ b/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_container.tsx @@ -95,7 +95,7 @@ export const RulesContainer = () => { return (
- + setRulesQuery((currentQuery) => ({ ...currentQuery, search: value }))} searchValue={rulesQuery.search} diff --git a/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_table_header.tsx b/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_table_header.tsx index e57404e63880e..5f6e3887647c8 100644 --- a/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_table_header.tsx +++ b/x-pack/plugins/cloud_security_posture/public/pages/rules/rules_table_header.tsx @@ -53,15 +53,16 @@ const SearchField = ({ useDebounce(() => search(localValue), SEARCH_DEBOUNCE_MS, [localValue]); return ( - + setLocalValue(e.target.value)} style={{ minWidth: 150 }} + fullWidth /> ); From a5f5d8682e892d6dfee33821defd414d408e1020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Fern=C3=A1ndez=20Haro?= Date: Wed, 16 Nov 2022 14:48:11 +0100 Subject: [PATCH 4/4] [EBT] Add `flush` method and call it during `stop` (#144925) Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Resolves https://github.com/elastic/kibana/issues/140521 --- packages/analytics/client/README.md | 9 ++++ .../analytics_client/analytics_client.test.ts | 8 +-- .../src/analytics_client/analytics_client.ts | 15 +++++- .../client/src/analytics_client/mocks.ts | 1 + .../client/src/analytics_client/types.ts | 8 ++- .../analytics/client/src/shippers/mocks.ts | 1 + .../analytics/client/src/shippers/types.ts | 4 ++ .../browser/src/browser_shipper.test.ts | 52 +++++++++++++++++++ .../elastic_v3/browser/src/browser_shipper.ts | 51 +++++++++++++++--- .../server/src/server_shipper.test.ts | 41 +++++++++++++++ .../elastic_v3/server/src/server_shipper.ts | 44 ++++++++++++++-- .../fullstory/src/fullstory_shipper.ts | 6 +++ .../gainsight/src/gainsight_shipper.ts | 6 +++ .../src/analytics_service.test.mocks.ts | 1 + .../src/analytics_service.ts | 7 ++- .../analytics/core-analytics-browser/index.ts | 6 ++- .../core-analytics-browser/src/types.ts | 18 ++++++- .../src/analytics_service.ts | 4 +- .../core-analytics-server/src/contracts.ts | 4 +- .../core-root-server-internal/src/server.ts | 2 +- .../public/custom_shipper.ts | 1 + .../server/custom_shipper.ts | 1 + .../public/custom_shipper.ts | 3 ++ .../analytics_plugin_a/public/plugin.ts | 2 + .../analytics_plugin_a/public/types.ts | 1 + .../server/custom_shipper.ts | 3 ++ .../tests/analytics_from_the_browser.ts | 8 ++- 27 files changed, 278 insertions(+), 29 deletions(-) diff --git a/packages/analytics/client/README.md b/packages/analytics/client/README.md index f5cc0d5efa889..e51795faa6a03 100644 --- a/packages/analytics/client/README.md +++ b/packages/analytics/client/README.md @@ -133,6 +133,15 @@ analytics.optIn({ }) ``` +### Explicit flush of the events + +If, at any given point (usually testing or during shutdowns) we need to make sure that all the pending events +in the queue are sent. The `flush` API returns a promise that will resolve as soon as all events in the queue are sent. + +```typescript +await analytics.flush() +``` + ### Shipping events In order to report the event to an analytics tool, we need to register the shippers our application wants to use. To register a shipper use the API `registerShipper`: diff --git a/packages/analytics/client/src/analytics_client/analytics_client.test.ts b/packages/analytics/client/src/analytics_client/analytics_client.test.ts index 601f94aa1e243..efe32683ee468 100644 --- a/packages/analytics/client/src/analytics_client/analytics_client.test.ts +++ b/packages/analytics/client/src/analytics_client/analytics_client.test.ts @@ -30,8 +30,8 @@ describe('AnalyticsClient', () => { }); }); - afterEach(() => { - analyticsClient.shutdown(); + afterEach(async () => { + await analyticsClient.shutdown(); jest.useRealTimers(); }); @@ -381,7 +381,7 @@ describe('AnalyticsClient', () => { test( 'Handles errors in the shipper', - fakeSchedulers((advance) => { + fakeSchedulers(async (advance) => { const optInMock = jest.fn().mockImplementation(() => { throw new Error('Something went terribly wrong'); }); @@ -404,7 +404,7 @@ describe('AnalyticsClient', () => { `Shipper "${MockedShipper.shipperName}" failed to extend the context`, expect.any(Error) ); - expect(() => analyticsClient.shutdown()).not.toThrow(); + await expect(analyticsClient.shutdown()).resolves.toBeUndefined(); expect(shutdownMock).toHaveBeenCalled(); }) ); diff --git a/packages/analytics/client/src/analytics_client/analytics_client.ts b/packages/analytics/client/src/analytics_client/analytics_client.ts index 57741f098c6ac..9e0c559cbdc55 100644 --- a/packages/analytics/client/src/analytics_client/analytics_client.ts +++ b/packages/analytics/client/src/analytics_client/analytics_client.ts @@ -238,7 +238,20 @@ export class AnalyticsClient implements IAnalyticsClient { this.shipperRegistered$.next(); }; - public shutdown = () => { + public flush = async () => { + await Promise.all( + [...this.shippersRegistry.allShippers.entries()].map(async ([shipperName, shipper]) => { + try { + await shipper.flush(); + } catch (err) { + this.initContext.logger.warn(`Failed to flush shipper "${shipperName}"`, err); + } + }) + ); + }; + + public shutdown = async () => { + await this.flush(); this.shippersRegistry.allShippers.forEach((shipper, shipperName) => { try { shipper.shutdown(); diff --git a/packages/analytics/client/src/analytics_client/mocks.ts b/packages/analytics/client/src/analytics_client/mocks.ts index 221ca0ff3872f..d09bfa67dee82 100644 --- a/packages/analytics/client/src/analytics_client/mocks.ts +++ b/packages/analytics/client/src/analytics_client/mocks.ts @@ -18,6 +18,7 @@ function createMockedAnalyticsClient(): jest.Mocked { removeContextProvider: jest.fn(), registerShipper: jest.fn(), telemetryCounter$: new Subject(), + flush: jest.fn(), shutdown: jest.fn(), }; } diff --git a/packages/analytics/client/src/analytics_client/types.ts b/packages/analytics/client/src/analytics_client/types.ts index 9a25f821b70a3..5726bf0046687 100644 --- a/packages/analytics/client/src/analytics_client/types.ts +++ b/packages/analytics/client/src/analytics_client/types.ts @@ -216,7 +216,11 @@ export interface IAnalyticsClient { */ readonly telemetryCounter$: Observable; /** - * Stops the client. + * Forces all shippers to send all their enqueued events and fulfills the returned promise. */ - shutdown: () => void; + flush: () => Promise; + /** + * Stops the client. Flushing any pending events in the process. + */ + shutdown: () => Promise; } diff --git a/packages/analytics/client/src/shippers/mocks.ts b/packages/analytics/client/src/shippers/mocks.ts index 4660ae9d27e72..fccdd4788f7d9 100644 --- a/packages/analytics/client/src/shippers/mocks.ts +++ b/packages/analytics/client/src/shippers/mocks.ts @@ -23,6 +23,7 @@ class MockedShipper implements IShipper { public reportEvents = jest.fn(); public extendContext = jest.fn(); public telemetryCounter$ = new Subject(); + public flush = jest.fn(); public shutdown = jest.fn(); } diff --git a/packages/analytics/client/src/shippers/types.ts b/packages/analytics/client/src/shippers/types.ts index 67fe2c54bd77e..c1e2ab8a81153 100644 --- a/packages/analytics/client/src/shippers/types.ts +++ b/packages/analytics/client/src/shippers/types.ts @@ -32,6 +32,10 @@ export interface IShipper { * Observable to emit the stats of the processed events. */ telemetryCounter$?: Observable; + /** + * Sends all the enqueued events and fulfills the returned promise. + */ + flush: () => Promise; /** * Shutdown the shipper. */ diff --git a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts index 47728a99a511a..e82ff3c45b1fa 100644 --- a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts +++ b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts @@ -161,6 +161,58 @@ describe('ElasticV3BrowserShipper', () => { }) ); + test( + 'calls to flush forces the client to send all the pending events', + fakeSchedulers(async (advance) => { + shipper.optIn(true); + shipper.reportEvents(events); + const counter = firstValueFrom(shipper.telemetryCounter$); + const promise = shipper.flush(); + advance(0); // bufferWhen requires some sort of fake scheduling to advance (but we are not advancing 1s) + await promise; + expect(fetchMock).toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { + body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', + headers: { + 'content-type': 'application/x-ndjson', + 'x-elastic-cluster-id': 'UNKNOWN', + 'x-elastic-stack-version': '1.2.3', + }, + keepalive: true, + method: 'POST', + query: { debug: true }, + } + ); + await expect(counter).resolves.toMatchInlineSnapshot(` + Object { + "code": "200", + "count": 1, + "event_type": "test-event-type", + "source": "elastic_v3_browser", + "type": "succeeded", + } + `); + }) + ); + + test('calls to flush resolve immediately if there is nothing to send', async () => { + shipper.optIn(true); + await shipper.flush(); + expect(fetchMock).toHaveBeenCalledTimes(0); + }); + + test('calling flush multiple times does not keep hanging', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + await expect(shipper.flush()).resolves.toBe(undefined); + await Promise.all([shipper.flush(), shipper.flush()]); + }); + + test('calling flush after shutdown does not keep hanging', async () => { + shipper.shutdown(); + await expect(shipper.flush()).resolves.toBe(undefined); + }); + test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => { shipper.reportEvents(events); shipper.optIn(true); diff --git a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts index 53f19910eab75..185ce37072be0 100644 --- a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts +++ b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts @@ -6,7 +6,17 @@ * Side Public License, v 1. */ -import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs'; +import { + BehaviorSubject, + interval, + Subject, + bufferWhen, + concatMap, + skipWhile, + firstValueFrom, + map, + merge, +} from 'rxjs'; import type { AnalyticsClientInitContext, Event, @@ -39,6 +49,8 @@ export class ElasticV3BrowserShipper implements IShipper { private readonly url: string; private readonly internalQueue$ = new Subject(); + private readonly flush$ = new Subject(); + private readonly queueFlushed$ = new Subject(); private readonly isOptedIn$ = new BehaviorSubject(undefined); private clusterUuid: string = 'UNKNOWN'; @@ -92,25 +104,48 @@ export class ElasticV3BrowserShipper implements IShipper { }); } + /** + * Triggers a flush of the internal queue to attempt to send any events held in the queue + * and resolves the returned promise once the queue is emptied. + */ + public async flush() { + if (this.flush$.isStopped) { + // If called after shutdown, return straight away + return; + } + + const promise = firstValueFrom(this.queueFlushed$); + this.flush$.next(); + await promise; + } + /** * Shuts down the shipper. * Triggers a flush of the internal queue to attempt to send any events held in the queue. */ public shutdown() { this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events. + this.flush$.complete(); } private setUpInternalQueueSubscriber() { this.internalQueue$ .pipe( // Buffer events for 1 second or until we have an optIn value - bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))), - // Discard any events if we are not opted in - skipWhile(() => this.isOptedIn$.value === false), - // Skip empty buffers - filter((events) => events.length > 0), - // Send events - concatMap(async (events) => this.sendEvents(events)) + bufferWhen(() => + merge( + this.flush$, + interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined)) + ) + ), + // Send events (one batch at a time) + concatMap(async (events) => { + // Only send if opted-in and there's anything to send + if (this.isOptedIn$.value === true && events.length > 0) { + await this.sendEvents(events); + } + }), + map(() => this.queueFlushed$.next()) ) .subscribe(); } diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts index 091be6cb96e83..b9002892ec53e 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts @@ -580,4 +580,45 @@ describe('ElasticV3ServerShipper', () => { ); }); }); + + describe('flush method', () => { + test('resolves straight away if it should not send anything', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + }); + + test('resolves when all the ongoing requests are complete', async () => { + shipper.optIn(true); + shipper.reportEvents(events); + expect(fetchMock).toHaveBeenCalledTimes(0); + fetchMock.mockImplementation(async () => { + // eslint-disable-next-line dot-notation + expect(shipper['inFlightRequests$'].value).toBe(1); + }); + await expect(shipper.flush()).resolves.toBe(undefined); + expect(fetchMock).toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { + body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', + headers: { + 'content-type': 'application/x-ndjson', + 'x-elastic-cluster-id': 'UNKNOWN', + 'x-elastic-stack-version': '1.2.3', + }, + method: 'POST', + query: { debug: true }, + } + ); + }); + + test('calling flush multiple times does not keep hanging', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + await expect(shipper.flush()).resolves.toBe(undefined); + await Promise.all([shipper.flush(), shipper.flush()]); + }); + + test('calling flush after shutdown does not keep hanging', async () => { + shipper.shutdown(); + await expect(shipper.flush()).resolves.toBe(undefined); + }); + }); }); diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts index 34ebe134adcf7..cb6e689dd893f 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts @@ -22,6 +22,8 @@ import { BehaviorSubject, exhaustMap, mergeMap, + skip, + firstValueFrom, } from 'rxjs'; import type { AnalyticsClientInitContext, @@ -63,6 +65,8 @@ export class ElasticV3ServerShipper implements IShipper { private readonly internalQueue: Event[] = []; private readonly shutdown$ = new ReplaySubject(1); + private readonly flush$ = new Subject(); + private readonly inFlightRequests$ = new BehaviorSubject(0); private readonly isOptedIn$ = new BehaviorSubject(undefined); private readonly url: string; @@ -152,12 +156,33 @@ export class ElasticV3ServerShipper implements IShipper { this.internalQueue.push(...events); } + /** + * Triggers a flush of the internal queue to attempt to send any events held in the queue + * and resolves the returned promise once the queue is emptied. + */ + public async flush() { + if (this.flush$.isStopped) { + // If called after shutdown, return straight away + return; + } + + const promise = firstValueFrom( + this.inFlightRequests$.pipe( + skip(1), // Skipping the first value because BehaviourSubjects always emit the current value on subscribe. + filter((count) => count === 0) // Wait until all the inflight requests are completed. + ) + ); + this.flush$.next(); + await promise; + } + /** * Shuts down the shipper. * Triggers a flush of the internal queue to attempt to send any events held in the queue. */ public shutdown() { this.shutdown$.next(); + this.flush$.complete(); this.shutdown$.complete(); this.isOptedIn$.complete(); } @@ -226,17 +251,26 @@ export class ElasticV3ServerShipper implements IShipper { takeUntil(this.shutdown$), map(() => ({ shouldFlush: false })) ), + // Whenever a `flush` request comes in + this.flush$.pipe(map(() => ({ shouldFlush: true }))), // Attempt to send one last time on shutdown, flushing the queue this.shutdown$.pipe(map(() => ({ shouldFlush: true }))) ) .pipe( // Only move ahead if it's opted-in and online, and there are some events in the queue - filter( - () => + filter(() => { + const shouldSendAnything = this.isOptedIn$.value === true && this.firstTimeOffline === null && - this.internalQueue.length > 0 - ), + this.internalQueue.length > 0; + + // If it should not send anything, re-emit the inflight request observable just in case it's already 0 + if (!shouldSendAnything) { + this.inFlightRequests$.next(this.inFlightRequests$.value); + } + + return shouldSendAnything; + }), // Send the events: // 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions. @@ -298,6 +332,7 @@ export class ElasticV3ServerShipper implements IShipper { private async sendEvents(events: Event[]) { this.initContext.logger.debug(`Reporting ${events.length} events...`); + this.inFlightRequests$.next(this.inFlightRequests$.value + 1); try { const code = await this.makeRequest(events); this.reportTelemetryCounters(events, { code }); @@ -308,6 +343,7 @@ export class ElasticV3ServerShipper implements IShipper { this.reportTelemetryCounters(events, { code: error.code, error }); this.firstTimeOffline = undefined; } + this.inFlightRequests$.next(Math.max(0, this.inFlightRequests$.value - 1)); } private async makeRequest(events: Event[]): Promise { diff --git a/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts b/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts index 0bf00e91c7d0e..e60686937884a 100644 --- a/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts +++ b/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts @@ -135,6 +135,12 @@ export class FullStoryShipper implements IShipper { }); } + /** + * Flushes all internal queues of the shipper. + * It doesn't really do anything inside because this shipper doesn't hold any internal queues. + */ + public async flush() {} + /** * Shuts down the shipper. * It doesn't really do anything inside because this shipper doesn't hold any internal queues. diff --git a/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts b/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts index a12f373fcd388..157cfaee22f0c 100644 --- a/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts +++ b/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts @@ -93,6 +93,12 @@ export class GainsightShipper implements IShipper { }); } + /** + * Flushes all internal queues of the shipper. + * It doesn't really do anything inside because this shipper doesn't hold any internal queues. + */ + public async flush() {} + /** * Shuts down the shipper. * It doesn't really do anything inside because this shipper doesn't hold any internal queues. diff --git a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts index 3d98cf4392926..7f32ea7ed41a6 100644 --- a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts +++ b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts @@ -17,6 +17,7 @@ export const analyticsClientMock: jest.Mocked = { removeContextProvider: jest.fn(), registerShipper: jest.fn(), telemetryCounter$: new Subject(), + flush: jest.fn(), shutdown: jest.fn(), }; diff --git a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts index 0dcd49bd69fcc..60656e9dfd1cb 100644 --- a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts +++ b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts @@ -45,6 +45,9 @@ export class AnalyticsService { this.registerBrowserInfoAnalyticsContext(); this.subscriptionsHandler.add(trackClicks(this.analyticsClient, core.env.mode.dev)); this.subscriptionsHandler.add(trackViewportSize(this.analyticsClient)); + + // Register a flush method in the browser so CI can explicitly call it before closing the browser. + window.__kbnAnalytics = { flush: () => this.analyticsClient.flush() }; } public setup({ injectedMetadata }: AnalyticsServiceSetupDeps): AnalyticsServiceSetup { @@ -69,9 +72,9 @@ export class AnalyticsService { }; } - public stop() { + public async stop() { this.subscriptionsHandler.unsubscribe(); - this.analyticsClient.shutdown(); + await this.analyticsClient.shutdown(); } /** diff --git a/packages/core/analytics/core-analytics-browser/index.ts b/packages/core/analytics/core-analytics-browser/index.ts index 2484f37bbf563..331f1695d9f20 100644 --- a/packages/core/analytics/core-analytics-browser/index.ts +++ b/packages/core/analytics/core-analytics-browser/index.ts @@ -6,4 +6,8 @@ * Side Public License, v 1. */ -export type { AnalyticsServiceSetup, AnalyticsServiceStart } from './src/types'; +export type { + AnalyticsServiceSetup, + AnalyticsServiceStart, + KbnAnalyticsWindowApi, +} from './src/types'; diff --git a/packages/core/analytics/core-analytics-browser/src/types.ts b/packages/core/analytics/core-analytics-browser/src/types.ts index e18a5faba8fbc..dbc35043613cd 100644 --- a/packages/core/analytics/core-analytics-browser/src/types.ts +++ b/packages/core/analytics/core-analytics-browser/src/types.ts @@ -13,7 +13,7 @@ import type { AnalyticsClient } from '@kbn/analytics-client'; * {@link AnalyticsClient} * @public */ -export type AnalyticsServiceSetup = Omit; +export type AnalyticsServiceSetup = Omit; /** * Exposes the public APIs of the AnalyticsClient during the start phase @@ -24,3 +24,19 @@ export type AnalyticsServiceStart = Pick< AnalyticsClient, 'optIn' | 'reportEvent' | 'telemetryCounter$' >; + +/** + * API exposed through `window.__kbnAnalytics` + */ +export interface KbnAnalyticsWindowApi { + /** + * Returns a promise that resolves when all the events in the queue have been sent. + */ + flush: AnalyticsClient['flush']; +} + +declare global { + interface Window { + __kbnAnalytics: KbnAnalyticsWindowApi; + } +} diff --git a/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts b/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts index 46b0726660e4c..141f5b9970c0b 100644 --- a/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts +++ b/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts @@ -65,8 +65,8 @@ export class AnalyticsService { }; } - public stop() { - this.analyticsClient.shutdown(); + public async stop() { + await this.analyticsClient.shutdown(); } /** diff --git a/packages/core/analytics/core-analytics-server/src/contracts.ts b/packages/core/analytics/core-analytics-server/src/contracts.ts index 1b297b197374d..4879b988b1752 100644 --- a/packages/core/analytics/core-analytics-server/src/contracts.ts +++ b/packages/core/analytics/core-analytics-server/src/contracts.ts @@ -13,14 +13,14 @@ import type { AnalyticsClient } from '@kbn/analytics-client'; * {@link AnalyticsClient} * @public */ -export type AnalyticsServicePreboot = Omit; +export type AnalyticsServicePreboot = Omit; /** * Exposes the public APIs of the AnalyticsClient during the setup phase. * {@link AnalyticsClient} * @public */ -export type AnalyticsServiceSetup = Omit; +export type AnalyticsServiceSetup = Omit; /** * Exposes the public APIs of the AnalyticsClient during the start phase diff --git a/packages/core/root/core-root-server-internal/src/server.ts b/packages/core/root/core-root-server-internal/src/server.ts index d4ae597c953ff..4cdfabddf2697 100644 --- a/packages/core/root/core-root-server-internal/src/server.ts +++ b/packages/core/root/core-root-server-internal/src/server.ts @@ -430,7 +430,7 @@ export class Server { public async stop() { this.log.debug('stopping server'); - this.analytics.stop(); + await this.analytics.stop(); await this.http.stop(); // HTTP server has to stop before savedObjects and ES clients are closed to be able to gracefully attempt to resolve any pending requests await this.plugins.stop(); await this.savedObjects.stop(); diff --git a/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts index 97bf37749c256..df876da5e9cce 100644 --- a/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts @@ -27,5 +27,6 @@ export class CustomShipper implements IShipper { }); } optIn(isOptedIn: boolean) {} + async flush() {} shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts index c76f30c94572e..c1ed593673f81 100644 --- a/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts @@ -27,5 +27,6 @@ export class CustomShipper implements IShipper { }); } optIn(isOptedIn: boolean) {} + async flush() {} shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts index bc96b9aba6aff..1bdc8b7e343fc 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts @@ -39,5 +39,8 @@ export class CustomShipper implements IShipper { extendContext(newContext: EventContext) { this.actions$.next({ action: 'extendContext', meta: newContext }); } + async flush() { + this.actions$.next({ action: 'flush', meta: {} }); + } shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts index 963b829f6af86..a64847e086265 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts @@ -74,6 +74,8 @@ export class AnalyticsPluginA implements Plugin { setOptIn(optIn: boolean) { analytics.optIn({ global: { enabled: optIn } }); }, + getFlushAction: async () => + firstValueFrom(this.actions$.pipe(filter(({ action }) => action === 'flush'))), }; registerContextProvider({ diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts index 197113f5368b6..42464622f131a 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts @@ -13,6 +13,7 @@ declare global { interface Window { __analyticsPluginA__: { getActionsUntilReportTestPluginLifecycleEvent: () => Promise; + getFlushAction: () => Promise; stats: TelemetryCounter[]; setOptIn: (optIn: boolean) => void; }; diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts index 3b91cf4b51d1b..4c7836a811cf2 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts @@ -39,5 +39,8 @@ export class CustomShipper implements IShipper { extendContext(newContext: EventContext) { this.actions$.next({ action: 'extendContext', meta: newContext }); } + async flush() { + this.actions$.next({ action: 'flush', meta: {} }); + } shutdown() {} } diff --git a/test/analytics/tests/analytics_from_the_browser.ts b/test/analytics/tests/analytics_from_the_browser.ts index d38bbc42701ea..9636e1854b0c1 100644 --- a/test/analytics/tests/analytics_from_the_browser.ts +++ b/test/analytics/tests/analytics_from_the_browser.ts @@ -9,8 +9,8 @@ import expect from '@kbn/expect'; import type { Event, TelemetryCounter } from '@kbn/core/server'; import type { Action } from '@kbn/analytics-plugin-a-plugin/public/custom_shipper'; -import type { FtrProviderContext } from '../services'; import '@kbn/analytics-plugin-a-plugin/public/types'; +import type { FtrProviderContext } from '../services'; export default function ({ getService, getPageObjects }: FtrProviderContext) { const { common } = getPageObjects(['common']); @@ -170,6 +170,12 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { expect(event.context.user_agent).to.be.a('string'); }); + it('should call flush when using the window-exposed flush method', async () => { + await browser.execute(() => window.__kbnAnalytics.flush()); + const action = await browser.execute(() => window.__analyticsPluginA__.getFlushAction()); + expect(action).to.eql({ action: 'flush', meta: {} }); + }); + describe('Test helpers capabilities', () => { it('should return the count of the events', async () => { const eventCount = await ebtUIHelper.getEventCount({