Skip to content

Commit

Permalink
[Upgrade Assistant] Server-side batch reindexing (#58598) (#59532)
Browse files Browse the repository at this point in the history
* Added server side logic for handling batch reindex

* Remove literal string interpolation from translation

* Refactor return value of batch endpoint

"sucesses" does not communicate accurately what has happened.
"started" more closely reflects what has happened.

* First iteration of batch queues

* Single queue

Changed the batchqueues implementation to only using a single queue
 - since there is only one ES that it is interacting with.

Before continuing with this work, just making sure that these pre-
cautions are necessary!

* Clean up old batch queue implementation

* Slight refactor

* Revert batch queues implementation

* Introduction of QueueSettings

Queue settings can be set on a reindex operation and set a
timemstamp value on the reindex operation for the scheduler
to use down the line for ordering operations and running them
in series

* Updated worker logic to handle items in queue in series

* Refactor /batch endpoint response to "enqueued" not "started"

* Fixed jest tests

* Refactor worker refresh operations for readability

Created a new file op_utils where logic repsonsible for sorting
and ordering reindex operation saved objects is.

* Add batch API integration test

Also assert that reindexing is happening in the expected order

* Added a new endpoint: GET batch/queue

This allows users of the API to see what the current queue state
is for visibility. Using the queue endpoint int he API integration
tests for batch too.

* Reset the queuedAt timestamp on resume

If a reindexOperation is being resumed and put in a queue we
also need to reset the queuedAt timestamp to respect the new
batch queue ordering.

* Fix jest test

Added 'undefined' as the second optional param to
resumeIndexOperation call.

Co-authored-by: Elastic Machine <[email protected]>
# Conflicts:
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts
#	x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts
#	x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts
#	x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
jloleysens and elasticmachine authored Mar 7, 2020
1 parent 769d08a commit f7595fd
Show file tree
Hide file tree
Showing 15 changed files with 548 additions and 82 deletions.
22 changes: 22 additions & 0 deletions x-pack/plugins/upgrade_assistant/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ export enum ReindexStatus {
}

export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';

export interface QueueSettings extends SavedObjectAttributes {
queuedAt: number;
}

export interface ReindexOptions extends SavedObjectAttributes {
/**
* Set this key to configure a reindex operation as part of a
* batch to be run in series.
*/
queueSettings?: QueueSettings;
}

export interface ReindexOperation extends SavedObjectAttributes {
indexName: string;
newIndexName: string;
Expand All @@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes {

// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;

/**
* Options for the reindexing strategy.
*
* @remark
* Marked as optional for backwards compatibility. We should still
* be able to handle older ReindexOperation objects.
*/
reindexOptions?: ReindexOptions;
}

export type ReindexSavedObject = SavedObject<ReindexOperation>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ export const esVersionCheck = async (
}
};

export const versionCheckHandlerWrapper = (handler: RequestHandler<any, any, any>) => async (
export const versionCheckHandlerWrapper = <P, Q, B>(handler: RequestHandler<P, Q, B>) => async (
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) => {
const errorResponse = await esVersionCheck(ctx, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ReindexAlreadyInProgress,
MultipleReindexJobsFound,
CannotReindexSystemIndexInCurrent,
ReindexCannotBeCancelled,
} from './error_symbols';

export class ReindexError extends Error {
Expand All @@ -34,4 +35,5 @@ export const error = {
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');

export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');
56 changes: 56 additions & 0 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flow } from 'fp-ts/lib/function';
import { ReindexSavedObject } from '../../../common/types';

export interface SortedReindexSavedObjects {
/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* have no inherent order and are considered to be processed in parallel.
*/
parallel: ReindexSavedObject[];

/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* are consistently ordered (see {@link orderQueuedReindexOperations}) and should be
* processed in order.
*/
queue: ReindexSavedObject[];
}

const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => {
const parallel: ReindexSavedObject[] = [];
const queue: ReindexSavedObject[] = [];
for (const op of ops) {
if (op.attributes.reindexOptions?.queueSettings) {
queue.push(op);
} else {
parallel.push(op);
}
}

return {
parallel,
queue,
};
};
const orderQueuedReindexOperations = ({
parallel,
queue,
}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({
parallel,
// Sort asc
queue: queue.sort(
(a, b) =>
a.attributes.reindexOptions!.queueSettings!.queuedAt -
b.attributes.reindexOptions!.queueSettings!.queuedAt
),
});

export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
);
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexOperation,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand All @@ -34,8 +35,9 @@ export interface ReindexActions {
/**
* Creates a new reindexOp, does not perform any pre-flight checks.
* @param indexName
* @param opts Options for the reindex operation
*/
createReindexOp(indexName: string): Promise<ReindexSavedObject>;
createReindexOp(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Deletes a reindexOp.
Expand Down Expand Up @@ -156,7 +158,7 @@ export const reindexActionsFactory = (

// ----- Public interface
return {
async createReindexOp(indexName: string) {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
Expand All @@ -167,6 +169,7 @@ export const reindexActionsFactory = (
reindexTaskPercComplete: null,
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
});
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ describe('reindexService', () => {

await service.createReindexOperation('myIndex');

expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex');
expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined);
});

it('fails if index does not exist', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { first } from 'rxjs/operators';

import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand Down Expand Up @@ -54,8 +55,9 @@ export interface ReindexService {
/**
* Creates a new reindex operation for a given index.
* @param indexName
* @param opts
*/
createReindexOperation(indexName: string): Promise<ReindexSavedObject>;
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Retrieves all reindex operations that have the given status.
Expand Down Expand Up @@ -86,8 +88,9 @@ export interface ReindexService {
/**
* Resumes the paused reindex operation for a given index.
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string): Promise<ReindexSavedObject>;
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
Expand Down Expand Up @@ -537,7 +540,7 @@ export const reindexServiceFactory = (
}
},

async createReindexOperation(indexName: string) {
async createReindexOperation(indexName: string, opts?: ReindexOptions) {
if (isSystemIndex(indexName)) {
throw error.reindexSystemIndex(
`Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.`
Expand Down Expand Up @@ -565,7 +568,7 @@ export const reindexServiceFactory = (
}
}

return actions.createReindexOp(indexName);
return actions.createReindexOp(indexName, opts);
},

async findReindexOperation(indexName: string) {
Expand Down Expand Up @@ -653,7 +656,7 @@ export const reindexServiceFactory = (
});
},

async resumeReindexOperation(indexName: string) {
async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
Expand All @@ -668,27 +671,32 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

return actions.updateReindexOp(op, { status: ReindexStatus.inProgress });
return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts,
});
});
},

async cancelReindexing(indexName: string) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
throw new Error(`No reindex operation found for index ${indexName}`);
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
} else if (reindexOp.attributes.status !== ReindexStatus.inProgress) {
throw new Error(`Reindex operation is not in progress`);
throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`);
} else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) {
throw new Error(`Reindex operation is not current waiting for reindex task to complete`);
throw error.reindexCannotBeCancelled(
`Reindex operation is not currently waiting for reindex task to complete`
);
}

const resp = await callAsUser('tasks.cancel', {
taskId: reindexOp.attributes.reindexTaskId,
});

if (resp.node_failures && resp.node_failures.length > 0) {
throw new Error(`Could not cancel reindex.`);
throw error.reindexCannotBeCancelled(`Could not cancel reindex.`);
}

return reindexOp;
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
*/
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';

import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
Expand Down Expand Up @@ -107,15 +107,17 @@ export class ReindexWorker {
private startUpdateOperationLoop = async () => {
this.updateOperationLoopRunning = true;

while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);

// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
}
} finally {
this.updateOperationLoopRunning = false;
}

this.updateOperationLoopRunning = false;
};

private pollForOperations = async () => {
Expand All @@ -128,14 +130,28 @@ export class ReindexWorker {
}
};

private refresh = async () => {
private updateInProgressOps = async () => {
try {
this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);

const [firstOpInQueue] = queue;

if (firstOpInQueue) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
}

this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
} catch (e) {
this.log.debug(`Could not fetch reindex operations from Elasticsearch`);
this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`);
this.inProgressOps = [];
}
};

private refresh = async () => {
await this.updateInProgressOps();
// If there are operations in progress and we're not already updating operations, kick off the update loop
if (!this.updateOperationLoopRunning) {
this.startUpdateOperationLoop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';
Loading

0 comments on commit f7595fd

Please sign in to comment.