From a5f5d8682e892d6dfee33821defd414d408e1020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Fern=C3=A1ndez=20Haro?= Date: Wed, 16 Nov 2022 14:48:11 +0100 Subject: [PATCH] [EBT] Add `flush` method and call it during `stop` (#144925) Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Resolves https://github.com/elastic/kibana/issues/140521 --- packages/analytics/client/README.md | 9 ++++ .../analytics_client/analytics_client.test.ts | 8 +-- .../src/analytics_client/analytics_client.ts | 15 +++++- .../client/src/analytics_client/mocks.ts | 1 + .../client/src/analytics_client/types.ts | 8 ++- .../analytics/client/src/shippers/mocks.ts | 1 + .../analytics/client/src/shippers/types.ts | 4 ++ .../browser/src/browser_shipper.test.ts | 52 +++++++++++++++++++ .../elastic_v3/browser/src/browser_shipper.ts | 51 +++++++++++++++--- .../server/src/server_shipper.test.ts | 41 +++++++++++++++ .../elastic_v3/server/src/server_shipper.ts | 44 ++++++++++++++-- .../fullstory/src/fullstory_shipper.ts | 6 +++ .../gainsight/src/gainsight_shipper.ts | 6 +++ .../src/analytics_service.test.mocks.ts | 1 + .../src/analytics_service.ts | 7 ++- .../analytics/core-analytics-browser/index.ts | 6 ++- .../core-analytics-browser/src/types.ts | 18 ++++++- .../src/analytics_service.ts | 4 +- .../core-analytics-server/src/contracts.ts | 4 +- .../core-root-server-internal/src/server.ts | 2 +- .../public/custom_shipper.ts | 1 + .../server/custom_shipper.ts | 1 + .../public/custom_shipper.ts | 3 ++ .../analytics_plugin_a/public/plugin.ts | 2 + .../analytics_plugin_a/public/types.ts | 1 + .../server/custom_shipper.ts | 3 ++ .../tests/analytics_from_the_browser.ts | 8 ++- 27 files changed, 278 insertions(+), 29 deletions(-) diff --git a/packages/analytics/client/README.md b/packages/analytics/client/README.md index f5cc0d5efa889..e51795faa6a03 100644 --- a/packages/analytics/client/README.md +++ b/packages/analytics/client/README.md @@ -133,6 +133,15 @@ analytics.optIn({ }) ``` +### Explicit flush of the events + +If, at any given point (usually testing or during shutdowns) we need to make sure that all the pending events +in the queue are sent. The `flush` API returns a promise that will resolve as soon as all events in the queue are sent. + +```typescript +await analytics.flush() +``` + ### Shipping events In order to report the event to an analytics tool, we need to register the shippers our application wants to use. To register a shipper use the API `registerShipper`: diff --git a/packages/analytics/client/src/analytics_client/analytics_client.test.ts b/packages/analytics/client/src/analytics_client/analytics_client.test.ts index 601f94aa1e243..efe32683ee468 100644 --- a/packages/analytics/client/src/analytics_client/analytics_client.test.ts +++ b/packages/analytics/client/src/analytics_client/analytics_client.test.ts @@ -30,8 +30,8 @@ describe('AnalyticsClient', () => { }); }); - afterEach(() => { - analyticsClient.shutdown(); + afterEach(async () => { + await analyticsClient.shutdown(); jest.useRealTimers(); }); @@ -381,7 +381,7 @@ describe('AnalyticsClient', () => { test( 'Handles errors in the shipper', - fakeSchedulers((advance) => { + fakeSchedulers(async (advance) => { const optInMock = jest.fn().mockImplementation(() => { throw new Error('Something went terribly wrong'); }); @@ -404,7 +404,7 @@ describe('AnalyticsClient', () => { `Shipper "${MockedShipper.shipperName}" failed to extend the context`, expect.any(Error) ); - expect(() => analyticsClient.shutdown()).not.toThrow(); + await expect(analyticsClient.shutdown()).resolves.toBeUndefined(); expect(shutdownMock).toHaveBeenCalled(); }) ); diff --git a/packages/analytics/client/src/analytics_client/analytics_client.ts b/packages/analytics/client/src/analytics_client/analytics_client.ts index 57741f098c6ac..9e0c559cbdc55 100644 --- a/packages/analytics/client/src/analytics_client/analytics_client.ts +++ b/packages/analytics/client/src/analytics_client/analytics_client.ts @@ -238,7 +238,20 @@ export class AnalyticsClient implements IAnalyticsClient { this.shipperRegistered$.next(); }; - public shutdown = () => { + public flush = async () => { + await Promise.all( + [...this.shippersRegistry.allShippers.entries()].map(async ([shipperName, shipper]) => { + try { + await shipper.flush(); + } catch (err) { + this.initContext.logger.warn(`Failed to flush shipper "${shipperName}"`, err); + } + }) + ); + }; + + public shutdown = async () => { + await this.flush(); this.shippersRegistry.allShippers.forEach((shipper, shipperName) => { try { shipper.shutdown(); diff --git a/packages/analytics/client/src/analytics_client/mocks.ts b/packages/analytics/client/src/analytics_client/mocks.ts index 221ca0ff3872f..d09bfa67dee82 100644 --- a/packages/analytics/client/src/analytics_client/mocks.ts +++ b/packages/analytics/client/src/analytics_client/mocks.ts @@ -18,6 +18,7 @@ function createMockedAnalyticsClient(): jest.Mocked { removeContextProvider: jest.fn(), registerShipper: jest.fn(), telemetryCounter$: new Subject(), + flush: jest.fn(), shutdown: jest.fn(), }; } diff --git a/packages/analytics/client/src/analytics_client/types.ts b/packages/analytics/client/src/analytics_client/types.ts index 9a25f821b70a3..5726bf0046687 100644 --- a/packages/analytics/client/src/analytics_client/types.ts +++ b/packages/analytics/client/src/analytics_client/types.ts @@ -216,7 +216,11 @@ export interface IAnalyticsClient { */ readonly telemetryCounter$: Observable; /** - * Stops the client. + * Forces all shippers to send all their enqueued events and fulfills the returned promise. */ - shutdown: () => void; + flush: () => Promise; + /** + * Stops the client. Flushing any pending events in the process. + */ + shutdown: () => Promise; } diff --git a/packages/analytics/client/src/shippers/mocks.ts b/packages/analytics/client/src/shippers/mocks.ts index 4660ae9d27e72..fccdd4788f7d9 100644 --- a/packages/analytics/client/src/shippers/mocks.ts +++ b/packages/analytics/client/src/shippers/mocks.ts @@ -23,6 +23,7 @@ class MockedShipper implements IShipper { public reportEvents = jest.fn(); public extendContext = jest.fn(); public telemetryCounter$ = new Subject(); + public flush = jest.fn(); public shutdown = jest.fn(); } diff --git a/packages/analytics/client/src/shippers/types.ts b/packages/analytics/client/src/shippers/types.ts index 67fe2c54bd77e..c1e2ab8a81153 100644 --- a/packages/analytics/client/src/shippers/types.ts +++ b/packages/analytics/client/src/shippers/types.ts @@ -32,6 +32,10 @@ export interface IShipper { * Observable to emit the stats of the processed events. */ telemetryCounter$?: Observable; + /** + * Sends all the enqueued events and fulfills the returned promise. + */ + flush: () => Promise; /** * Shutdown the shipper. */ diff --git a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts index 47728a99a511a..e82ff3c45b1fa 100644 --- a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts +++ b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.test.ts @@ -161,6 +161,58 @@ describe('ElasticV3BrowserShipper', () => { }) ); + test( + 'calls to flush forces the client to send all the pending events', + fakeSchedulers(async (advance) => { + shipper.optIn(true); + shipper.reportEvents(events); + const counter = firstValueFrom(shipper.telemetryCounter$); + const promise = shipper.flush(); + advance(0); // bufferWhen requires some sort of fake scheduling to advance (but we are not advancing 1s) + await promise; + expect(fetchMock).toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { + body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', + headers: { + 'content-type': 'application/x-ndjson', + 'x-elastic-cluster-id': 'UNKNOWN', + 'x-elastic-stack-version': '1.2.3', + }, + keepalive: true, + method: 'POST', + query: { debug: true }, + } + ); + await expect(counter).resolves.toMatchInlineSnapshot(` + Object { + "code": "200", + "count": 1, + "event_type": "test-event-type", + "source": "elastic_v3_browser", + "type": "succeeded", + } + `); + }) + ); + + test('calls to flush resolve immediately if there is nothing to send', async () => { + shipper.optIn(true); + await shipper.flush(); + expect(fetchMock).toHaveBeenCalledTimes(0); + }); + + test('calling flush multiple times does not keep hanging', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + await expect(shipper.flush()).resolves.toBe(undefined); + await Promise.all([shipper.flush(), shipper.flush()]); + }); + + test('calling flush after shutdown does not keep hanging', async () => { + shipper.shutdown(); + await expect(shipper.flush()).resolves.toBe(undefined); + }); + test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => { shipper.reportEvents(events); shipper.optIn(true); diff --git a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts index 53f19910eab75..185ce37072be0 100644 --- a/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts +++ b/packages/analytics/shippers/elastic_v3/browser/src/browser_shipper.ts @@ -6,7 +6,17 @@ * Side Public License, v 1. */ -import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs'; +import { + BehaviorSubject, + interval, + Subject, + bufferWhen, + concatMap, + skipWhile, + firstValueFrom, + map, + merge, +} from 'rxjs'; import type { AnalyticsClientInitContext, Event, @@ -39,6 +49,8 @@ export class ElasticV3BrowserShipper implements IShipper { private readonly url: string; private readonly internalQueue$ = new Subject(); + private readonly flush$ = new Subject(); + private readonly queueFlushed$ = new Subject(); private readonly isOptedIn$ = new BehaviorSubject(undefined); private clusterUuid: string = 'UNKNOWN'; @@ -92,25 +104,48 @@ export class ElasticV3BrowserShipper implements IShipper { }); } + /** + * Triggers a flush of the internal queue to attempt to send any events held in the queue + * and resolves the returned promise once the queue is emptied. + */ + public async flush() { + if (this.flush$.isStopped) { + // If called after shutdown, return straight away + return; + } + + const promise = firstValueFrom(this.queueFlushed$); + this.flush$.next(); + await promise; + } + /** * Shuts down the shipper. * Triggers a flush of the internal queue to attempt to send any events held in the queue. */ public shutdown() { this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events. + this.flush$.complete(); } private setUpInternalQueueSubscriber() { this.internalQueue$ .pipe( // Buffer events for 1 second or until we have an optIn value - bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))), - // Discard any events if we are not opted in - skipWhile(() => this.isOptedIn$.value === false), - // Skip empty buffers - filter((events) => events.length > 0), - // Send events - concatMap(async (events) => this.sendEvents(events)) + bufferWhen(() => + merge( + this.flush$, + interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined)) + ) + ), + // Send events (one batch at a time) + concatMap(async (events) => { + // Only send if opted-in and there's anything to send + if (this.isOptedIn$.value === true && events.length > 0) { + await this.sendEvents(events); + } + }), + map(() => this.queueFlushed$.next()) ) .subscribe(); } diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts index 091be6cb96e83..b9002892ec53e 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts @@ -580,4 +580,45 @@ describe('ElasticV3ServerShipper', () => { ); }); }); + + describe('flush method', () => { + test('resolves straight away if it should not send anything', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + }); + + test('resolves when all the ongoing requests are complete', async () => { + shipper.optIn(true); + shipper.reportEvents(events); + expect(fetchMock).toHaveBeenCalledTimes(0); + fetchMock.mockImplementation(async () => { + // eslint-disable-next-line dot-notation + expect(shipper['inFlightRequests$'].value).toBe(1); + }); + await expect(shipper.flush()).resolves.toBe(undefined); + expect(fetchMock).toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { + body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', + headers: { + 'content-type': 'application/x-ndjson', + 'x-elastic-cluster-id': 'UNKNOWN', + 'x-elastic-stack-version': '1.2.3', + }, + method: 'POST', + query: { debug: true }, + } + ); + }); + + test('calling flush multiple times does not keep hanging', async () => { + await expect(shipper.flush()).resolves.toBe(undefined); + await expect(shipper.flush()).resolves.toBe(undefined); + await Promise.all([shipper.flush(), shipper.flush()]); + }); + + test('calling flush after shutdown does not keep hanging', async () => { + shipper.shutdown(); + await expect(shipper.flush()).resolves.toBe(undefined); + }); + }); }); diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts index 34ebe134adcf7..cb6e689dd893f 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts @@ -22,6 +22,8 @@ import { BehaviorSubject, exhaustMap, mergeMap, + skip, + firstValueFrom, } from 'rxjs'; import type { AnalyticsClientInitContext, @@ -63,6 +65,8 @@ export class ElasticV3ServerShipper implements IShipper { private readonly internalQueue: Event[] = []; private readonly shutdown$ = new ReplaySubject(1); + private readonly flush$ = new Subject(); + private readonly inFlightRequests$ = new BehaviorSubject(0); private readonly isOptedIn$ = new BehaviorSubject(undefined); private readonly url: string; @@ -152,12 +156,33 @@ export class ElasticV3ServerShipper implements IShipper { this.internalQueue.push(...events); } + /** + * Triggers a flush of the internal queue to attempt to send any events held in the queue + * and resolves the returned promise once the queue is emptied. + */ + public async flush() { + if (this.flush$.isStopped) { + // If called after shutdown, return straight away + return; + } + + const promise = firstValueFrom( + this.inFlightRequests$.pipe( + skip(1), // Skipping the first value because BehaviourSubjects always emit the current value on subscribe. + filter((count) => count === 0) // Wait until all the inflight requests are completed. + ) + ); + this.flush$.next(); + await promise; + } + /** * Shuts down the shipper. * Triggers a flush of the internal queue to attempt to send any events held in the queue. */ public shutdown() { this.shutdown$.next(); + this.flush$.complete(); this.shutdown$.complete(); this.isOptedIn$.complete(); } @@ -226,17 +251,26 @@ export class ElasticV3ServerShipper implements IShipper { takeUntil(this.shutdown$), map(() => ({ shouldFlush: false })) ), + // Whenever a `flush` request comes in + this.flush$.pipe(map(() => ({ shouldFlush: true }))), // Attempt to send one last time on shutdown, flushing the queue this.shutdown$.pipe(map(() => ({ shouldFlush: true }))) ) .pipe( // Only move ahead if it's opted-in and online, and there are some events in the queue - filter( - () => + filter(() => { + const shouldSendAnything = this.isOptedIn$.value === true && this.firstTimeOffline === null && - this.internalQueue.length > 0 - ), + this.internalQueue.length > 0; + + // If it should not send anything, re-emit the inflight request observable just in case it's already 0 + if (!shouldSendAnything) { + this.inFlightRequests$.next(this.inFlightRequests$.value); + } + + return shouldSendAnything; + }), // Send the events: // 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions. @@ -298,6 +332,7 @@ export class ElasticV3ServerShipper implements IShipper { private async sendEvents(events: Event[]) { this.initContext.logger.debug(`Reporting ${events.length} events...`); + this.inFlightRequests$.next(this.inFlightRequests$.value + 1); try { const code = await this.makeRequest(events); this.reportTelemetryCounters(events, { code }); @@ -308,6 +343,7 @@ export class ElasticV3ServerShipper implements IShipper { this.reportTelemetryCounters(events, { code: error.code, error }); this.firstTimeOffline = undefined; } + this.inFlightRequests$.next(Math.max(0, this.inFlightRequests$.value - 1)); } private async makeRequest(events: Event[]): Promise { diff --git a/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts b/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts index 0bf00e91c7d0e..e60686937884a 100644 --- a/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts +++ b/packages/analytics/shippers/fullstory/src/fullstory_shipper.ts @@ -135,6 +135,12 @@ export class FullStoryShipper implements IShipper { }); } + /** + * Flushes all internal queues of the shipper. + * It doesn't really do anything inside because this shipper doesn't hold any internal queues. + */ + public async flush() {} + /** * Shuts down the shipper. * It doesn't really do anything inside because this shipper doesn't hold any internal queues. diff --git a/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts b/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts index a12f373fcd388..157cfaee22f0c 100644 --- a/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts +++ b/packages/analytics/shippers/gainsight/src/gainsight_shipper.ts @@ -93,6 +93,12 @@ export class GainsightShipper implements IShipper { }); } + /** + * Flushes all internal queues of the shipper. + * It doesn't really do anything inside because this shipper doesn't hold any internal queues. + */ + public async flush() {} + /** * Shuts down the shipper. * It doesn't really do anything inside because this shipper doesn't hold any internal queues. diff --git a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts index 3d98cf4392926..7f32ea7ed41a6 100644 --- a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts +++ b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.test.mocks.ts @@ -17,6 +17,7 @@ export const analyticsClientMock: jest.Mocked = { removeContextProvider: jest.fn(), registerShipper: jest.fn(), telemetryCounter$: new Subject(), + flush: jest.fn(), shutdown: jest.fn(), }; diff --git a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts index 0dcd49bd69fcc..60656e9dfd1cb 100644 --- a/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts +++ b/packages/core/analytics/core-analytics-browser-internal/src/analytics_service.ts @@ -45,6 +45,9 @@ export class AnalyticsService { this.registerBrowserInfoAnalyticsContext(); this.subscriptionsHandler.add(trackClicks(this.analyticsClient, core.env.mode.dev)); this.subscriptionsHandler.add(trackViewportSize(this.analyticsClient)); + + // Register a flush method in the browser so CI can explicitly call it before closing the browser. + window.__kbnAnalytics = { flush: () => this.analyticsClient.flush() }; } public setup({ injectedMetadata }: AnalyticsServiceSetupDeps): AnalyticsServiceSetup { @@ -69,9 +72,9 @@ export class AnalyticsService { }; } - public stop() { + public async stop() { this.subscriptionsHandler.unsubscribe(); - this.analyticsClient.shutdown(); + await this.analyticsClient.shutdown(); } /** diff --git a/packages/core/analytics/core-analytics-browser/index.ts b/packages/core/analytics/core-analytics-browser/index.ts index 2484f37bbf563..331f1695d9f20 100644 --- a/packages/core/analytics/core-analytics-browser/index.ts +++ b/packages/core/analytics/core-analytics-browser/index.ts @@ -6,4 +6,8 @@ * Side Public License, v 1. */ -export type { AnalyticsServiceSetup, AnalyticsServiceStart } from './src/types'; +export type { + AnalyticsServiceSetup, + AnalyticsServiceStart, + KbnAnalyticsWindowApi, +} from './src/types'; diff --git a/packages/core/analytics/core-analytics-browser/src/types.ts b/packages/core/analytics/core-analytics-browser/src/types.ts index e18a5faba8fbc..dbc35043613cd 100644 --- a/packages/core/analytics/core-analytics-browser/src/types.ts +++ b/packages/core/analytics/core-analytics-browser/src/types.ts @@ -13,7 +13,7 @@ import type { AnalyticsClient } from '@kbn/analytics-client'; * {@link AnalyticsClient} * @public */ -export type AnalyticsServiceSetup = Omit; +export type AnalyticsServiceSetup = Omit; /** * Exposes the public APIs of the AnalyticsClient during the start phase @@ -24,3 +24,19 @@ export type AnalyticsServiceStart = Pick< AnalyticsClient, 'optIn' | 'reportEvent' | 'telemetryCounter$' >; + +/** + * API exposed through `window.__kbnAnalytics` + */ +export interface KbnAnalyticsWindowApi { + /** + * Returns a promise that resolves when all the events in the queue have been sent. + */ + flush: AnalyticsClient['flush']; +} + +declare global { + interface Window { + __kbnAnalytics: KbnAnalyticsWindowApi; + } +} diff --git a/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts b/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts index 46b0726660e4c..141f5b9970c0b 100644 --- a/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts +++ b/packages/core/analytics/core-analytics-server-internal/src/analytics_service.ts @@ -65,8 +65,8 @@ export class AnalyticsService { }; } - public stop() { - this.analyticsClient.shutdown(); + public async stop() { + await this.analyticsClient.shutdown(); } /** diff --git a/packages/core/analytics/core-analytics-server/src/contracts.ts b/packages/core/analytics/core-analytics-server/src/contracts.ts index 1b297b197374d..4879b988b1752 100644 --- a/packages/core/analytics/core-analytics-server/src/contracts.ts +++ b/packages/core/analytics/core-analytics-server/src/contracts.ts @@ -13,14 +13,14 @@ import type { AnalyticsClient } from '@kbn/analytics-client'; * {@link AnalyticsClient} * @public */ -export type AnalyticsServicePreboot = Omit; +export type AnalyticsServicePreboot = Omit; /** * Exposes the public APIs of the AnalyticsClient during the setup phase. * {@link AnalyticsClient} * @public */ -export type AnalyticsServiceSetup = Omit; +export type AnalyticsServiceSetup = Omit; /** * Exposes the public APIs of the AnalyticsClient during the start phase diff --git a/packages/core/root/core-root-server-internal/src/server.ts b/packages/core/root/core-root-server-internal/src/server.ts index d4ae597c953ff..4cdfabddf2697 100644 --- a/packages/core/root/core-root-server-internal/src/server.ts +++ b/packages/core/root/core-root-server-internal/src/server.ts @@ -430,7 +430,7 @@ export class Server { public async stop() { this.log.debug('stopping server'); - this.analytics.stop(); + await this.analytics.stop(); await this.http.stop(); // HTTP server has to stop before savedObjects and ES clients are closed to be able to gracefully attempt to resolve any pending requests await this.plugins.stop(); await this.savedObjects.stop(); diff --git a/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts index 97bf37749c256..df876da5e9cce 100644 --- a/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_ftr_helpers/public/custom_shipper.ts @@ -27,5 +27,6 @@ export class CustomShipper implements IShipper { }); } optIn(isOptedIn: boolean) {} + async flush() {} shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts index c76f30c94572e..c1ed593673f81 100644 --- a/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_ftr_helpers/server/custom_shipper.ts @@ -27,5 +27,6 @@ export class CustomShipper implements IShipper { }); } optIn(isOptedIn: boolean) {} + async flush() {} shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts index bc96b9aba6aff..1bdc8b7e343fc 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/custom_shipper.ts @@ -39,5 +39,8 @@ export class CustomShipper implements IShipper { extendContext(newContext: EventContext) { this.actions$.next({ action: 'extendContext', meta: newContext }); } + async flush() { + this.actions$.next({ action: 'flush', meta: {} }); + } shutdown() {} } diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts index 963b829f6af86..a64847e086265 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/plugin.ts @@ -74,6 +74,8 @@ export class AnalyticsPluginA implements Plugin { setOptIn(optIn: boolean) { analytics.optIn({ global: { enabled: optIn } }); }, + getFlushAction: async () => + firstValueFrom(this.actions$.pipe(filter(({ action }) => action === 'flush'))), }; registerContextProvider({ diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts index 197113f5368b6..42464622f131a 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/public/types.ts @@ -13,6 +13,7 @@ declare global { interface Window { __analyticsPluginA__: { getActionsUntilReportTestPluginLifecycleEvent: () => Promise; + getFlushAction: () => Promise; stats: TelemetryCounter[]; setOptIn: (optIn: boolean) => void; }; diff --git a/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts b/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts index 3b91cf4b51d1b..4c7836a811cf2 100644 --- a/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts +++ b/test/analytics/fixtures/plugins/analytics_plugin_a/server/custom_shipper.ts @@ -39,5 +39,8 @@ export class CustomShipper implements IShipper { extendContext(newContext: EventContext) { this.actions$.next({ action: 'extendContext', meta: newContext }); } + async flush() { + this.actions$.next({ action: 'flush', meta: {} }); + } shutdown() {} } diff --git a/test/analytics/tests/analytics_from_the_browser.ts b/test/analytics/tests/analytics_from_the_browser.ts index d38bbc42701ea..9636e1854b0c1 100644 --- a/test/analytics/tests/analytics_from_the_browser.ts +++ b/test/analytics/tests/analytics_from_the_browser.ts @@ -9,8 +9,8 @@ import expect from '@kbn/expect'; import type { Event, TelemetryCounter } from '@kbn/core/server'; import type { Action } from '@kbn/analytics-plugin-a-plugin/public/custom_shipper'; -import type { FtrProviderContext } from '../services'; import '@kbn/analytics-plugin-a-plugin/public/types'; +import type { FtrProviderContext } from '../services'; export default function ({ getService, getPageObjects }: FtrProviderContext) { const { common } = getPageObjects(['common']); @@ -170,6 +170,12 @@ export default function ({ getService, getPageObjects }: FtrProviderContext) { expect(event.context.user_agent).to.be.a('string'); }); + it('should call flush when using the window-exposed flush method', async () => { + await browser.execute(() => window.__kbnAnalytics.flush()); + const action = await browser.execute(() => window.__analyticsPluginA__.getFlushAction()); + expect(action).to.eql({ action: 'flush', meta: {} }); + }); + describe('Test helpers capabilities', () => { it('should return the count of the events', async () => { const eventCount = await ebtUIHelper.getEventCount({