-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
Copy pathbrowser_shipper.ts
152 lines (135 loc) · 4.84 KB
/
browser_shipper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { BehaviorSubject, interval, Subject, bufferWhen, concatMap, filter, skipWhile } from 'rxjs';
import type {
AnalyticsClientInitContext,
Event,
EventContext,
IShipper,
TelemetryCounter,
} from '@kbn/analytics-client';
import { ElasticV3ShipperOptions, ErrorWithCode } from '@kbn/analytics-shippers-elastic-v3-common';
import {
buildHeaders,
buildUrl,
createTelemetryCounterHelper,
eventsToNDJSON,
} from '@kbn/analytics-shippers-elastic-v3-common';
/**
* Elastic V3 shipper to use in the browser.
*/
export class ElasticV3BrowserShipper implements IShipper {
/** Shipper's unique name */
public static shipperName = 'elastic_v3_browser';
/** Observable to emit the stats of the processed events. */
public readonly telemetryCounter$ = new Subject<TelemetryCounter>();
private readonly reportTelemetryCounters = createTelemetryCounterHelper(
this.telemetryCounter$,
ElasticV3BrowserShipper.shipperName
);
private readonly url: string;
private readonly internalQueue$ = new Subject<Event>();
private readonly isOptedIn$ = new BehaviorSubject<boolean | undefined>(undefined);
private clusterUuid: string = 'UNKNOWN';
private licenseId: string | undefined;
/**
* Creates a new instance of the {@link ElasticV3BrowserShipper}.
* @param options {@link ElasticV3ShipperOptions}
* @param initContext {@link AnalyticsClientInitContext}
*/
constructor(
private readonly options: ElasticV3ShipperOptions,
private readonly initContext: AnalyticsClientInitContext
) {
this.setUpInternalQueueSubscriber();
this.url = buildUrl({
sendTo: options.sendTo ?? initContext.sendTo,
channelName: options.channelName,
});
}
/**
* Uses the `cluster_uuid` and `license_id` from the context to hold them in memory for the generation of the headers
* used later on in the HTTP request.
* @param newContext The full new context to set {@link EventContext}
*/
public extendContext(newContext: EventContext) {
if (newContext.cluster_uuid) {
this.clusterUuid = newContext.cluster_uuid;
}
if (newContext.license_id) {
this.licenseId = newContext.license_id;
}
}
/**
* When `false`, it flushes the internal queue and stops sending events.
* @param isOptedIn `true` for resume sending events. `false` to stop.
*/
public optIn(isOptedIn: boolean) {
this.isOptedIn$.next(isOptedIn);
}
/**
* Enqueues the events to be sent to in a batched approach.
* @param events batched events {@link Event}
*/
public reportEvents(events: Event[]) {
events.forEach((event) => {
this.internalQueue$.next(event);
});
}
/**
* Shuts down the shipper.
* Triggers a flush of the internal queue to attempt to send any events held in the queue.
*/
public shutdown() {
this.internalQueue$.complete(); // NOTE: When completing the observable, the buffer logic does not wait and releases any buffered events.
}
private setUpInternalQueueSubscriber() {
this.internalQueue$
.pipe(
// Buffer events for 1 second or until we have an optIn value
bufferWhen(() => interval(1000).pipe(skipWhile(() => this.isOptedIn$.value === undefined))),
// Discard any events if we are not opted in
skipWhile(() => this.isOptedIn$.value === false),
// Skip empty buffers
filter((events) => events.length > 0),
// Send events
concatMap(async (events) => this.sendEvents(events))
)
.subscribe();
}
private async sendEvents(events: Event[]) {
try {
const code = await this.makeRequest(events);
this.reportTelemetryCounters(events, { code });
} catch (error) {
this.reportTelemetryCounters(events, { code: error.code, error });
}
}
private async makeRequest(events: Event[]): Promise<string> {
const response = await fetch(this.url, {
method: 'POST',
body: eventsToNDJSON(events),
headers: buildHeaders(this.clusterUuid, this.options.version, this.licenseId),
...(this.options.debug && { query: { debug: true } }),
// Allow the request to outlive the page in case the tab is closed
keepalive: true,
});
if (this.options.debug) {
this.initContext.logger.debug(
`[${ElasticV3BrowserShipper.shipperName}]: ${response.status} - ${await response.text()}`
);
}
if (!response.ok) {
throw new ErrorWithCode(
`${response.status} - ${await response.text()}`,
`${response.status}`
);
}
return `${response.status}`;
}
}