From 8d39068bc06976bae464873f438c75011c9836dd Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 24 Mar 2020 18:13:55 +0100 Subject: [PATCH] [Upgrade Assistant] Fix edge case where reindex op can falsely be seen as stale (#60770) (#61082) * Fix edge case where reindex op is can falsely be seen as stale This is for multiple Kibana workers, to ensure that an item just coming off the queue is seen as "new" we set a "startedAt" field which will update the reindex op and give it the full timeout window. * Update tests to use new api too Co-authored-by: Elastic Machine # Conflicts: # x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts # x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts # x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts # x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts --- .../plugins/upgrade_assistant/common/types.ts | 20 +++++++ .../server/lib/reindexing/error.ts | 2 + .../server/lib/reindexing/error_symbols.ts | 1 + .../server/lib/reindexing/op_utils.ts | 3 + .../server/lib/reindexing/reindex_service.ts | 59 +++++++++++++++---- .../server/lib/reindexing/worker.ts | 31 ++++++++-- .../routes/reindex_indices/reindex_handler.ts | 12 +--- .../reindex_indices/reindex_indices.test.ts | 2 +- 8 files changed, 103 insertions(+), 27 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/common/types.ts b/x-pack/plugins/upgrade_assistant/common/types.ts index a91ae3a9e1af0..41e2314fc2a5d 100644 --- a/x-pack/plugins/upgrade_assistant/common/types.ts +++ b/x-pack/plugins/upgrade_assistant/common/types.ts @@ -30,7 +30,27 @@ export enum ReindexStatus { export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation'; export interface QueueSettings extends SavedObjectAttributes { + /** + * A Unix timestamp of when the reindex operation was enqueued. + * + * @remark + * This is used by the reindexing scheduler to determine execution + * order. + */ queuedAt: number; + + /** + * A Unix timestamp of when the reindex operation was started. + * + * @remark + * Updating this field is useful for _also_ updating the saved object "updated_at" field + * which is used to determine stale or abandoned reindex operations. + * + * For now this is used by the reindex worker scheduler to determine whether we have + * A queue item at the start of the queue. + * + */ + startedAt?: number; } export interface ReindexOptions extends SavedObjectAttributes { diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts index f0b3b9146deeb..87b0ddedc69e1 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts @@ -14,6 +14,7 @@ import { MultipleReindexJobsFound, CannotReindexSystemIndexInCurrent, ReindexCannotBeCancelled, + ReindexIsNotInQueue, } from './error_symbols'; export class ReindexError extends Error { @@ -34,6 +35,7 @@ export const error = { reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted), reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress), reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent), + reindexIsNotInQueue: createErrorFactory(ReindexIsNotInQueue), multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound), reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled), }; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts index 0004ae8520277..f9575eacb6c7f 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts @@ -12,6 +12,7 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed'); export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted'); export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress'); export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent'); +export const ReindexIsNotInQueue = Symbol('ReindexIsNotInQueue'); export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled'); export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound'); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts index dbed7de13f010..ecba02e0d5466 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({ ), }); +export const queuedOpHasStarted = (op: ReindexSavedObject) => + Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt); + export const sortAndOrderReindexOperations = flow( sortReindexOperations, orderQueuedReindexOperations diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts index bb14361f220c7..e5d054af7b1b2 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -10,7 +10,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server'; import { IndexGroup, - ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -62,7 +61,10 @@ export interface ReindexService { * @param indexName * @param opts Additional options when creating a new reindex operation */ - createReindexOperation(indexName: string, opts?: ReindexOptions): Promise; + createReindexOperation( + indexName: string, + opts?: { enqueue?: boolean } + ): Promise; /** * Retrieves all reindex operations that have the given status. @@ -101,7 +103,21 @@ export interface ReindexService { * @param indexName * @param opts As with {@link createReindexOperation} we support this setting. */ - resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise; + resumeReindexOperation( + indexName: string, + opts?: { enqueue?: boolean } + ): Promise; + + /** + * Update the update_at field on the reindex operation + * + * @remark + * Currently also sets a startedAt field on the SavedObject, not really used + * elsewhere, but is an indication that the object has started being processed. + * + * @param indexName + */ + startQueuedReindexOperation(indexName: string): Promise; /** * Cancel an in-progress reindex operation for a given index. Only allowed when the @@ -570,7 +586,7 @@ export const reindexServiceFactory = ( } }, - async createReindexOperation(indexName: string, opts?: ReindexOptions) { + async createReindexOperation(indexName: string, opts?: { enqueue: boolean }) { if (isSystemIndex(indexName)) { throw error.reindexSystemIndex( `Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.` @@ -598,7 +614,10 @@ export const reindexServiceFactory = ( } } - return actions.createReindexOp(indexName, opts); + return actions.createReindexOp( + indexName, + opts?.enqueue ? { queueSettings: { queuedAt: Date.now() } } : undefined + ); }, async findReindexOperation(indexName: string) { @@ -703,7 +722,7 @@ export const reindexServiceFactory = ( }); }, - async resumeReindexOperation(indexName: string, opts?: ReindexOptions) { + async resumeReindexOperation(indexName: string, opts?: { enqueue: boolean }) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { @@ -717,16 +736,30 @@ export const reindexServiceFactory = ( } else if (op.attributes.status !== ReindexStatus.paused) { throw new Error(`Reindex operation must be paused in order to be resumed.`); } - - const reindexOptions: ReindexOptions | undefined = opts - ? { - ...(op.attributes.reindexOptions ?? {}), - ...opts, - } - : undefined; + const queueSettings = opts?.enqueue ? { queuedAt: Date.now() } : undefined; return actions.updateReindexOp(op, { status: ReindexStatus.inProgress, + reindexOptions: queueSettings ? { queueSettings } : undefined, + }); + }); + }, + + async startQueuedReindexOperation(indexName: string) { + const reindexOp = await this.findReindexOperation(indexName); + + if (!reindexOp) { + throw error.indexNotFound(`No reindex operation found for index ${indexName}`); + } + + if (!reindexOp.attributes.reindexOptions?.queueSettings) { + throw error.reindexIsNotInQueue(`Reindex operation ${indexName} is not in the queue.`); + } + + return actions.runWhileLocked(reindexOp, async lockedReindexOp => { + const { reindexOptions } = lockedReindexOp.attributes; + reindexOptions!.queueSettings!.startedAt = Date.now(); + return actions.updateReindexOp(lockedReindexOp, { reindexOptions, }); }); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index b0311358e8f3f..06349d069a813 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -6,11 +6,11 @@ import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server'; import moment from 'moment'; import { ReindexSavedObject, ReindexStatus } from '../../../common/types'; -import { CredentialStore } from './credential_store'; +import { Credential, CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { sortAndOrderReindexOperations } from './op_utils'; +import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -130,17 +130,40 @@ export class ReindexWorker { } }; + private getCredentialScopedReindexService = (credential: Credential) => { + const fakeRequest: FakeRequest = { headers: credential }; + const scopedClusterClient = this.clusterClient.asScoped(fakeRequest); + const callAsCurrentUser = scopedClusterClient.callAsCurrentUser.bind(scopedClusterClient); + const actions = reindexActionsFactory(this.client, callAsCurrentUser); + return reindexServiceFactory( + callAsCurrentUser, + actions, + this.log, + this.licensing, + this.apmIndexPatterns + ); + }; + private updateInProgressOps = async () => { try { const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps); - const [firstOpInQueue] = queue; + let [firstOpInQueue] = queue; - if (firstOpInQueue) { + if (firstOpInQueue && !queuedOpHasStarted(firstOpInQueue)) { this.log.debug( `Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})` ); + const credential = this.credentialStore.get(firstOpInQueue); + if (credential) { + const service = this.getCredentialScopedReindexService(credential); + firstOpInQueue = await service.startQueuedReindexOperation( + firstOpInQueue.attributes.indexName + ); + // Re-associate the credentials + this.credentialStore.set(firstOpInQueue, credential); + } } this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index e640d03791cce..74c349d894839 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -8,7 +8,7 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana import { LicensingPluginSetup } from '../../../../licensing/server'; -import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types'; +import { ReindexOperation, ReindexStatus } from '../../../common/types'; import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; import { reindexServiceFactory } from '../../lib/reindexing'; @@ -53,17 +53,11 @@ export const reindexHandler = async ({ const existingOp = await reindexService.findReindexOperation(indexName); - const opts: ReindexOptions | undefined = reindexOptions - ? { - queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined, - } - : undefined; - // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. const reindexOp = existingOp && existingOp.attributes.status === ReindexStatus.paused - ? await reindexService.resumeReindexOperation(indexName, opts) - : await reindexService.createReindexOperation(indexName, opts); + ? await reindexService.resumeReindexOperation(indexName, reindexOptions) + : await reindexService.createReindexOperation(indexName, reindexOptions); // Add users credentials for the worker to use credentialStore.set(reindexOp, headers); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index 30beecd8a5de1..860a6a07f6f2e 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -263,7 +263,7 @@ describe('reindex API', () => { describe('POST /api/upgrade_assistant/reindex/batch', () => { const queueSettingsArg = { - queueSettings: { queuedAt: expect.any(Number) }, + enqueue: true, }; it('creates a collection of index operations', async () => { mockReindexService.createReindexOperation