-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
worker.ts
264 lines (229 loc) · 9.96 KB
/
worker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';
import { SecurityPluginStart } from '../../../../security/server';
import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { Credential, CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations, queuedOpHasStarted, isQueuedOp } from './op_utils';
const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
const PAUSE_WINDOW = POLL_INTERVAL * 4;
/**
* To avoid running the worker loop very tightly and causing a CPU bottleneck we use this
* padding to simulate an asynchronous sleep. See the description of the tight loop below.
*/
const WORKER_PADDING_MS = 1000;
/**
* A singleton worker that will coordinate two polling loops:
* (1) A longer loop that polls for reindex operations that are in progress. If any are found, loop (2) is started.
* (2) A tighter loop that pushes each in progress reindex operation through ReindexService.processNextStep. If all
* updated reindex operations are complete, this loop will terminate.
*
* The worker can also be forced to start loop (2) by calling forceRefresh(). This is done when we know a new reindex
* operation has been started.
*
* This worker can be ran on multiple nodes without conflicts or dropped jobs. Reindex operations are locked by the
* ReindexService and if any operation is locked longer than the ReindexService's timeout, it is assumed to have been
* locked by a node that is no longer running (crashed or shutdown). In this case, another node may safely acquire
* the lock for this reindex operation.
*/
export class ReindexWorker {
private static workerSingleton?: ReindexWorker;
private continuePolling: boolean = false;
private updateOperationLoopRunning: boolean = false;
private timeout?: NodeJS.Timeout;
private inProgressOps: ReindexSavedObject[] = [];
private readonly reindexService: ReindexService;
private readonly log: Logger;
private readonly security: SecurityPluginStart;
constructor(
private client: SavedObjectsClientContract,
private credentialStore: CredentialStore,
private clusterClient: IClusterClient,
log: Logger,
private licensing: LicensingPluginSetup,
security: SecurityPluginStart
) {
this.log = log.get('reindex_worker');
this.security = security;
if (ReindexWorker.workerSingleton) {
throw new Error(`More than one ReindexWorker cannot be created.`);
}
const callAsInternalUser = this.clusterClient.asInternalUser;
this.reindexService = reindexServiceFactory(
callAsInternalUser,
reindexActionsFactory(this.client, callAsInternalUser),
log,
this.licensing
);
ReindexWorker.workerSingleton = this;
}
/**
* Begins loop (1) to begin checking for in progress reindex operations.
*/
public start = () => {
this.log.debug('Starting worker...');
this.continuePolling = true;
this.pollForOperations();
};
/**
* Stops the worker from processing any further reindex operations.
*/
public stop = () => {
this.log.debug('Stopping worker...');
if (this.timeout) {
clearTimeout(this.timeout);
}
this.updateOperationLoopRunning = false;
this.continuePolling = false;
};
/**
* Should be called immediately after this server has started a new reindex operation.
*/
public forceRefresh = () => {
this.refresh();
};
/**
* Returns whether or not the given ReindexOperation is in the worker's queue.
*/
public includes = (reindexOp: ReindexSavedObject) => {
return this.inProgressOps.map((o) => o.id).includes(reindexOp.id);
};
/**
* Runs an async loop until all inProgress jobs are complete or failed.
*/
private startUpdateOperationLoop = async (): Promise<void> => {
this.updateOperationLoopRunning = true;
try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
if (
this.inProgressOps.length &&
this.inProgressOps.every((op) => !this.credentialStore.get(op))
) {
// TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added.
// This scheduler should be revisited in future.
await new Promise((resolve) => setTimeout(resolve, WORKER_PADDING_MS));
}
}
} finally {
this.updateOperationLoopRunning = false;
}
};
private pollForOperations = async () => {
this.log.debug(`Polling for reindex operations`);
await this.refresh();
if (this.continuePolling) {
this.timeout = setTimeout(this.pollForOperations, POLL_INTERVAL);
}
};
private getCredentialScopedReindexService = (credential: Credential) => {
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
};
private updateInProgressOps = async () => {
try {
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);
let [firstOpInQueue] = queue;
if (firstOpInQueue && !queuedOpHasStarted(firstOpInQueue)) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
const credential = this.credentialStore.get(firstOpInQueue);
if (credential) {
const service = this.getCredentialScopedReindexService(credential);
firstOpInQueue = await service.startQueuedReindexOperation(
firstOpInQueue.attributes.indexName
);
// Re-associate the credentials
this.credentialStore.update({
reindexOp: firstOpInQueue,
security: this.security,
credential,
});
}
}
this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
} catch (e) {
this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`);
this.inProgressOps = [];
}
};
private refresh = async () => {
await this.updateInProgressOps();
// If there are operations in progress and we're not already updating operations, kick off the update loop
if (!this.updateOperationLoopRunning) {
this.startUpdateOperationLoop();
}
};
private lastCheckedQueuedOpId: string | undefined;
private processNextStep = async (reindexOp: ReindexSavedObject): Promise<void> => {
const credential = this.credentialStore.get(reindexOp);
if (!credential) {
// If this is a queued reindex op, and we know there can only ever be one in progress at a
// given time, there is a small chance it may have just reached the front of the queue so
// we give it a chance to be updated by another worker with credentials by making this a
// noop once. If it has not been updated by the next loop we will mark it paused if it
// falls outside of PAUSE_WINDOW.
if (isQueuedOp(reindexOp)) {
if (this.lastCheckedQueuedOpId !== reindexOp.id) {
this.lastCheckedQueuedOpId = reindexOp.id;
return;
}
}
// This indicates that no Kibana nodes currently have credentials to update this job.
const now = moment();
const updatedAt = moment(reindexOp.updated_at);
if (updatedAt < now.subtract(PAUSE_WINDOW)) {
await this.reindexService.pauseReindexOperation(reindexOp.attributes.indexName);
return;
} else {
// If it has been updated recently, we assume another node has the necessary credentials,
// and this becomes a noop.
return;
}
}
// Setup a ReindexService specific to these credentials.
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
const service = reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
reindexOp = await swallowExceptions(service.processNextStep, this.log)(reindexOp);
// Update credential store with most recent state.
this.credentialStore.update({ reindexOp, security: this.security, credential });
};
}
/**
* Swallows any exceptions that may occur during the reindex process. This prevents any errors from
* stopping the worker from continuing to process more jobs.
*/
const swallowExceptions =
(func: (reindexOp: ReindexSavedObject) => Promise<ReindexSavedObject>, log: Logger) =>
async (reindexOp: ReindexSavedObject) => {
try {
return await func(reindexOp);
} catch (e) {
if (reindexOp.attributes.locked) {
log.debug(`Skipping reindexOp with unexpired lock: ${reindexOp.id}`);
} else {
log.warn(`Error when trying to process reindexOp (${reindexOp.id}): ${e.toString()}`);
}
return reindexOp;
}
};