Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Upgrade Assistant] Server-side batch reindexing #58598

Merged
merged 21 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6c7f5d7
Added server side logic for handling batch reindex
jloleysens Feb 26, 2020
cb0f786
Remove literal string interpolation from translation
jloleysens Feb 26, 2020
d1e503a
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Feb 27, 2020
79aa01a
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Feb 28, 2020
17f39e7
Merge branch 'master' of github.com:elastic/kibana into ua/batch-rein…
jloleysens Mar 2, 2020
6407b6a
Refactor return value of batch endpoint
jloleysens Mar 2, 2020
b4263fd
First iteration of batch queues
jloleysens Mar 3, 2020
8214a69
Single queue
jloleysens Mar 3, 2020
d805855
Clean up old batch queue implementation
jloleysens Mar 3, 2020
c4cd048
Slight refactor
jloleysens Mar 3, 2020
0a64e0d
Revert batch queues implementation
jloleysens Mar 4, 2020
b9c93a6
Introduction of QueueSettings
jloleysens Mar 4, 2020
23ab266
Updated worker logic to handle items in queue in series
jloleysens Mar 4, 2020
6cf9acc
Refactor /batch endpoint response to "enqueued" not "started"
jloleysens Mar 4, 2020
02ae1b2
Fixed jest tests
jloleysens Mar 4, 2020
4d57be9
Refactor worker refresh operations for readability
jloleysens Mar 5, 2020
0fd95c2
Add batch API integration test
jloleysens Mar 5, 2020
2f29404
Added a new endpoint: GET batch/queue
jloleysens Mar 5, 2020
e0f3a0e
Reset the queuedAt timestamp on resume
jloleysens Mar 5, 2020
5cd2270
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Mar 5, 2020
63f6659
Fix jest test
jloleysens Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions x-pack/plugins/upgrade_assistant/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ export enum ReindexStatus {
}

export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';

export interface QueueSettings extends SavedObjectAttributes {
queuedAt: number;
}

export interface ReindexOptions extends SavedObjectAttributes {
/**
* Set this key to configure a reindex operation as part of a
* batch to be run in series.
*/
queueSettings?: QueueSettings;
}

export interface ReindexOperation extends SavedObjectAttributes {
indexName: string;
newIndexName: string;
Expand All @@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes {

// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;

/**
* Options for the reindexing strategy.
*
* @remark
* Marked as optional for backwards compatibility. We should still
* be able to handle older ReindexOperation objects.
*/
reindexOptions?: ReindexOptions;
}

export type ReindexSavedObject = SavedObject<ReindexOperation>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ export const esVersionCheck = async (
}
};

export const versionCheckHandlerWrapper = (handler: RequestHandler<any, any, any>) => async (
export const versionCheckHandlerWrapper = <P, Q, B>(handler: RequestHandler<P, Q, B>) => async (
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) => {
const errorResponse = await esVersionCheck(ctx, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ReindexTaskFailed,
ReindexAlreadyInProgress,
MultipleReindexJobsFound,
ReindexCannotBeCancelled,
} from './error_symbols';

export class ReindexError extends Error {
Expand All @@ -32,4 +33,5 @@ export const error = {
reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted),
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ export const CannotCreateIndex = Symbol('CannotCreateIndex');
export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');

export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');
56 changes: 56 additions & 0 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flow } from 'fp-ts/lib/function';
import { ReindexSavedObject } from '../../../common/types';

export interface SortedReindexSavedObjects {
/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* have no inherent order and are considered to be processed in parallel.
*/
parallel: ReindexSavedObject[];

/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* are consistently ordered (see {@link orderQueuedReindexOperations}) and should be
* processed in order.
*/
queue: ReindexSavedObject[];
}

const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => {
const parallel: ReindexSavedObject[] = [];
const queue: ReindexSavedObject[] = [];
for (const op of ops) {
if (op.attributes.reindexOptions?.queueSettings) {
queue.push(op);
} else {
parallel.push(op);
}
}

return {
parallel,
queue,
};
};
const orderQueuedReindexOperations = ({
parallel,
queue,
}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({
parallel,
// Sort asc
queue: queue.sort(
(a, b) =>
a.attributes.reindexOptions!.queueSettings!.queuedAt -
b.attributes.reindexOptions!.queueSettings!.queuedAt
),
});

export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
);
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexOperation,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand All @@ -34,8 +35,9 @@ export interface ReindexActions {
/**
* Creates a new reindexOp, does not perform any pre-flight checks.
* @param indexName
* @param opts Options for the reindex operation
*/
createReindexOp(indexName: string): Promise<ReindexSavedObject>;
createReindexOp(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Deletes a reindexOp.
Expand Down Expand Up @@ -150,7 +152,7 @@ export const reindexActionsFactory = (

// ----- Public interface
return {
async createReindexOp(indexName: string) {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
Expand All @@ -161,6 +163,7 @@ export const reindexActionsFactory = (
reindexTaskPercComplete: null,
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
});
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ describe('reindexService', () => {

await service.createReindexOperation('myIndex');

expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex');
expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined);
});

it('fails if index does not exist', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { first } from 'rxjs/operators';

import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand Down Expand Up @@ -51,8 +52,9 @@ export interface ReindexService {
/**
* Creates a new reindex operation for a given index.
* @param indexName
* @param opts
*/
createReindexOperation(indexName: string): Promise<ReindexSavedObject>;
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Retrieves all reindex operations that have the given status.
Expand Down Expand Up @@ -83,8 +85,9 @@ export interface ReindexService {
/**
* Resumes the paused reindex operation for a given index.
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string): Promise<ReindexSavedObject>;
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
Expand Down Expand Up @@ -517,7 +520,7 @@ export const reindexServiceFactory = (
}
},

async createReindexOperation(indexName: string) {
async createReindexOperation(indexName: string, opts?: ReindexOptions) {
const indexExists = await callAsUser('indices.exists', { index: indexName });
if (!indexExists) {
throw error.indexNotFound(`Index ${indexName} does not exist in this cluster.`);
Expand All @@ -539,7 +542,7 @@ export const reindexServiceFactory = (
}
}

return actions.createReindexOp(indexName);
return actions.createReindexOp(indexName, opts);
},

async findReindexOperation(indexName: string) {
Expand Down Expand Up @@ -627,7 +630,7 @@ export const reindexServiceFactory = (
});
},

async resumeReindexOperation(indexName: string) {
async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
Expand All @@ -642,27 +645,32 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

return actions.updateReindexOp(op, { status: ReindexStatus.inProgress });
return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts,
});
});
},

async cancelReindexing(indexName: string) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
throw new Error(`No reindex operation found for index ${indexName}`);
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
} else if (reindexOp.attributes.status !== ReindexStatus.inProgress) {
throw new Error(`Reindex operation is not in progress`);
throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`);
} else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) {
throw new Error(`Reindex operation is not current waiting for reindex task to complete`);
throw error.reindexCannotBeCancelled(
`Reindex operation is not currently waiting for reindex task to complete`
);
}

const resp = await callAsUser('tasks.cancel', {
taskId: reindexOp.attributes.reindexTaskId,
});

if (resp.node_failures && resp.node_failures.length > 0) {
throw new Error(`Could not cancel reindex.`);
throw error.reindexCannotBeCancelled(`Could not cancel reindex.`);
}

return reindexOp;
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
*/
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';

import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
Expand Down Expand Up @@ -105,15 +105,17 @@ export class ReindexWorker {
private startUpdateOperationLoop = async () => {
this.updateOperationLoopRunning = true;

while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);

// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
}
} finally {
this.updateOperationLoopRunning = false;
}

this.updateOperationLoopRunning = false;
};

private pollForOperations = async () => {
Expand All @@ -126,14 +128,28 @@ export class ReindexWorker {
}
};

private refresh = async () => {
private updateInProgressOps = async () => {
try {
this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);

const [firstOpInQueue] = queue;

if (firstOpInQueue) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
}

this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
} catch (e) {
this.log.debug(`Could not fetch reindex operations from Elasticsearch`);
this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`);
this.inProgressOps = [];
}
};

private refresh = async () => {
await this.updateInProgressOps();
// If there are operations in progress and we're not already updating operations, kick off the update loop
if (!this.updateOperationLoopRunning) {
this.startUpdateOperationLoop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';
Loading