Skip to content

Commit

Permalink
[EBT] Add flush method and call it during stop (#144925)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <[email protected]>
Resolves #140521
  • Loading branch information
afharo authored Nov 16, 2022
1 parent f90072d commit a5f5d86
Show file tree
Hide file tree
Showing 27 changed files with 278 additions and 29 deletions.
9 changes: 9 additions & 0 deletions packages/analytics/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ analytics.optIn({
})
```

### Explicit flush of the events

If, at any given point (usually testing or during shutdowns) we need to make sure that all the pending events
in the queue are sent. The `flush` API returns a promise that will resolve as soon as all events in the queue are sent.

```typescript
await analytics.flush()
```

### Shipping events

In order to report the event to an analytics tool, we need to register the shippers our application wants to use. To register a shipper use the API `registerShipper`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ describe('AnalyticsClient', () => {
});
});

afterEach(() => {
analyticsClient.shutdown();
afterEach(async () => {
await analyticsClient.shutdown();
jest.useRealTimers();
});

Expand Down Expand Up @@ -381,7 +381,7 @@ describe('AnalyticsClient', () => {

test(
'Handles errors in the shipper',
fakeSchedulers((advance) => {
fakeSchedulers(async (advance) => {
const optInMock = jest.fn().mockImplementation(() => {
throw new Error('Something went terribly wrong');
});
Expand All @@ -404,7 +404,7 @@ describe('AnalyticsClient', () => {
`Shipper "${MockedShipper.shipperName}" failed to extend the context`,
expect.any(Error)
);
expect(() => analyticsClient.shutdown()).not.toThrow();
await expect(analyticsClient.shutdown()).resolves.toBeUndefined();
expect(shutdownMock).toHaveBeenCalled();
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,20 @@ export class AnalyticsClient implements IAnalyticsClient {
this.shipperRegistered$.next();
};

public shutdown = () => {
public flush = async () => {
await Promise.all(
[...this.shippersRegistry.allShippers.entries()].map(async ([shipperName, shipper]) => {
try {
await shipper.flush();
} catch (err) {
this.initContext.logger.warn(`Failed to flush shipper "${shipperName}"`, err);
}
})
);
};

public shutdown = async () => {
await this.flush();
this.shippersRegistry.allShippers.forEach((shipper, shipperName) => {
try {
shipper.shutdown();
Expand Down
1 change: 1 addition & 0 deletions packages/analytics/client/src/analytics_client/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function createMockedAnalyticsClient(): jest.Mocked<IAnalyticsClient> {
removeContextProvider: jest.fn(),
registerShipper: jest.fn(),
telemetryCounter$: new Subject(),
flush: jest.fn(),
shutdown: jest.fn(),
};
}
Expand Down
8 changes: 6 additions & 2 deletions packages/analytics/client/src/analytics_client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ export interface IAnalyticsClient {
*/
readonly telemetryCounter$: Observable<TelemetryCounter>;
/**
* Stops the client.
* Forces all shippers to send all their enqueued events and fulfills the returned promise.
*/
shutdown: () => void;
flush: () => Promise<void>;
/**
* Stops the client. Flushing any pending events in the process.
*/
shutdown: () => Promise<void>;
}
1 change: 1 addition & 0 deletions packages/analytics/client/src/shippers/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class MockedShipper implements IShipper {
public reportEvents = jest.fn();
public extendContext = jest.fn();
public telemetryCounter$ = new Subject<TelemetryCounter>();
public flush = jest.fn();
public shutdown = jest.fn();
}

Expand Down
4 changes: 4 additions & 0 deletions packages/analytics/client/src/shippers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export interface IShipper {
* Observable to emit the stats of the processed events.
*/
telemetryCounter$?: Observable<TelemetryCounter>;
/**
* Sends all the enqueued events and fulfills the returned promise.
*/
flush: () => Promise<void>;
/**
* Shutdown the shipper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,58 @@ describe('ElasticV3BrowserShipper', () => {
})
);

test(
'calls to flush forces the client to send all the pending events',
fakeSchedulers(async (advance) => {
shipper.optIn(true);
shipper.reportEvents(events);
const counter = firstValueFrom(shipper.telemetryCounter$);
const promise = shipper.flush();
advance(0); // bufferWhen requires some sort of fake scheduling to advance (but we are not advancing 1s)
await promise;
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
keepalive: true,
method: 'POST',
query: { debug: true },
}
);
await expect(counter).resolves.toMatchInlineSnapshot(`
Object {
"code": "200",
"count": 1,
"event_type": "test-event-type",
"source": "elastic_v3_browser",
"type": "succeeded",
}
`);
})
);

test('calls to flush resolve immediately if there is nothing to send', async () => {
shipper.optIn(true);
await shipper.flush();
expect(fetchMock).toHaveBeenCalledTimes(0);
});

test('calling flush multiple times does not keep hanging', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
await expect(shipper.flush()).resolves.toBe(undefined);
await Promise.all([shipper.flush(), shipper.flush()]);
});

test('calling flush after shutdown does not keep hanging', async () => {
shipper.shutdown();
await expect(shipper.flush()).resolves.toBe(undefined);
});

test('calls to reportEvents call `fetch` when shutting down if optIn value is set to true', async () => {
shipper.reportEvents(events);
shipper.optIn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
* Side Public License, v 1.
*/

import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs';
import {
BehaviorSubject,
interval,
Subject,
bufferWhen,
concatMap,
skipWhile,
firstValueFrom,
map,
merge,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
Event,
Expand Down Expand Up @@ -39,6 +49,8 @@ export class ElasticV3BrowserShipper implements IShipper {
private readonly url: string;

private readonly internalQueue$ = new Subject<Event>();
private readonly flush$ = new Subject<void>();
private readonly queueFlushed$ = new Subject<void>();

private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private clusterUuid: string = 'UNKNOWN';
Expand Down Expand Up @@ -92,25 +104,48 @@ export class ElasticV3BrowserShipper implements IShipper {
});
}

/**
* Triggers a flush of the internal queue to attempt to send any events held in the queue
* and resolves the returned promise once the queue is emptied.
*/
public async flush() {
if (this.flush$.isStopped) {
// If called after shutdown, return straight away
return;
}

const promise = firstValueFrom(this.queueFlushed$);
this.flush$.next();
await promise;
}

/**
* Shuts down the shipper.
* Triggers a flush of the internal queue to attempt to send any events held in the queue.
*/
public shutdown() {
this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events.
this.flush$.complete();
}

private setUpInternalQueueSubscriber() {
this.internalQueue$
.pipe(
// Buffer events for 1 second or until we have an optIn value
bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))),
// Discard any events if we are not opted in
skipWhile(() => this.isOptedIn$.value === false),
// Skip empty buffers
filter((events) => events.length > 0),
// Send events
concatMap(async (events) => this.sendEvents(events))
bufferWhen(() =>
merge(
this.flush$,
interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))
)
),
// Send events (one batch at a time)
concatMap(async (events) => {
// Only send if opted-in and there's anything to send
if (this.isOptedIn$.value === true && events.length > 0) {
await this.sendEvents(events);
}
}),
map(() => this.queueFlushed$.next())
)
.subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,4 +580,45 @@ describe('ElasticV3ServerShipper', () => {
);
});
});

describe('flush method', () => {
test('resolves straight away if it should not send anything', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
});

test('resolves when all the ongoing requests are complete', async () => {
shipper.optIn(true);
shipper.reportEvents(events);
expect(fetchMock).toHaveBeenCalledTimes(0);
fetchMock.mockImplementation(async () => {
// eslint-disable-next-line dot-notation
expect(shipper['inFlightRequests$'].value).toBe(1);
});
await expect(shipper.flush()).resolves.toBe(undefined);
expect(fetchMock).toHaveBeenCalledWith(
'https://telemetry-staging.elastic.co/v3/send/test-channel',
{
body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n',
headers: {
'content-type': 'application/x-ndjson',
'x-elastic-cluster-id': 'UNKNOWN',
'x-elastic-stack-version': '1.2.3',
},
method: 'POST',
query: { debug: true },
}
);
});

test('calling flush multiple times does not keep hanging', async () => {
await expect(shipper.flush()).resolves.toBe(undefined);
await expect(shipper.flush()).resolves.toBe(undefined);
await Promise.all([shipper.flush(), shipper.flush()]);
});

test('calling flush after shutdown does not keep hanging', async () => {
shipper.shutdown();
await expect(shipper.flush()).resolves.toBe(undefined);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
BehaviorSubject,
exhaustMap,
mergeMap,
skip,
firstValueFrom,
} from 'rxjs';
import type {
AnalyticsClientInitContext,
Expand Down Expand Up @@ -63,6 +65,8 @@ export class ElasticV3ServerShipper implements IShipper {

private readonly internalQueue: Event[] = [];
private readonly shutdown$ = new ReplaySubject<void>(1);
private readonly flush$ = new Subject<void>();
private readonly inFlightRequests$ = new BehaviorSubject<number>(0);
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);

private readonly url: string;
Expand Down Expand Up @@ -152,12 +156,33 @@ export class ElasticV3ServerShipper implements IShipper {
this.internalQueue.push(...events);
}

/**
* Triggers a flush of the internal queue to attempt to send any events held in the queue
* and resolves the returned promise once the queue is emptied.
*/
public async flush() {
if (this.flush$.isStopped) {
// If called after shutdown, return straight away
return;
}

const promise = firstValueFrom(
this.inFlightRequests$.pipe(
skip(1), // Skipping the first value because BehaviourSubjects always emit the current value on subscribe.
filter((count) => count === 0) // Wait until all the inflight requests are completed.
)
);
this.flush$.next();
await promise;
}

/**
* Shuts down the shipper.
* Triggers a flush of the internal queue to attempt to send any events held in the queue.
*/
public shutdown() {
this.shutdown$.next();
this.flush$.complete();
this.shutdown$.complete();
this.isOptedIn$.complete();
}
Expand Down Expand Up @@ -226,17 +251,26 @@ export class ElasticV3ServerShipper implements IShipper {
takeUntil(this.shutdown$),
map(() => ({ shouldFlush: false }))
),
// Whenever a `flush` request comes in
this.flush$.pipe(map(() => ({ shouldFlush: true }))),
// Attempt to send one last time on shutdown, flushing the queue
this.shutdown$.pipe(map(() => ({ shouldFlush: true })))
)
.pipe(
// Only move ahead if it's opted-in and online, and there are some events in the queue
filter(
() =>
filter(() => {
const shouldSendAnything =
this.isOptedIn$.value === true &&
this.firstTimeOffline === null &&
this.internalQueue.length > 0
),
this.internalQueue.length > 0;

// If it should not send anything, re-emit the inflight request observable just in case it's already 0
if (!shouldSendAnything) {
this.inFlightRequests$.next(this.inFlightRequests$.value);
}

return shouldSendAnything;
}),

// Send the events:
// 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions.
Expand Down Expand Up @@ -298,6 +332,7 @@ export class ElasticV3ServerShipper implements IShipper {

private async sendEvents(events: Event[]) {
this.initContext.logger.debug(`Reporting ${events.length} events...`);
this.inFlightRequests$.next(this.inFlightRequests$.value + 1);
try {
const code = await this.makeRequest(events);
this.reportTelemetryCounters(events, { code });
Expand All @@ -308,6 +343,7 @@ export class ElasticV3ServerShipper implements IShipper {
this.reportTelemetryCounters(events, { code: error.code, error });
this.firstTimeOffline = undefined;
}
this.inFlightRequests$.next(Math.max(0, this.inFlightRequests$.value - 1));
}

private async makeRequest(events: Event[]): Promise<string> {
Expand Down
Loading

0 comments on commit a5f5d86

Please sign in to comment.