diff --git a/x-pack/plugins/upgrade_assistant/common/types.ts b/x-pack/plugins/upgrade_assistant/common/types.ts index a91ae3a9e1af0..41e2314fc2a5d 100644 --- a/x-pack/plugins/upgrade_assistant/common/types.ts +++ b/x-pack/plugins/upgrade_assistant/common/types.ts @@ -30,7 +30,27 @@ export enum ReindexStatus { export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation'; export interface QueueSettings extends SavedObjectAttributes { + /** + * A Unix timestamp of when the reindex operation was enqueued. + * + * @remark + * This is used by the reindexing scheduler to determine execution + * order. + */ queuedAt: number; + + /** + * A Unix timestamp of when the reindex operation was started. + * + * @remark + * Updating this field is useful for _also_ updating the saved object "updated_at" field + * which is used to determine stale or abandoned reindex operations. + * + * For now this is used by the reindex worker scheduler to determine whether we have + * A queue item at the start of the queue. + * + */ + startedAt?: number; } export interface ReindexOptions extends SavedObjectAttributes { diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts index f0b3b9146deeb..87b0ddedc69e1 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts @@ -14,6 +14,7 @@ import { MultipleReindexJobsFound, CannotReindexSystemIndexInCurrent, ReindexCannotBeCancelled, + ReindexIsNotInQueue, } from './error_symbols'; export class ReindexError extends Error { @@ -34,6 +35,7 @@ export const error = { reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted), reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress), reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent), + reindexIsNotInQueue: createErrorFactory(ReindexIsNotInQueue), multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound), reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled), }; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts index 0004ae8520277..f9575eacb6c7f 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts @@ -12,6 +12,7 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed'); export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted'); export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress'); export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent'); +export const ReindexIsNotInQueue = Symbol('ReindexIsNotInQueue'); export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled'); export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound'); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts index dbed7de13f010..ecba02e0d5466 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({ ), }); +export const queuedOpHasStarted = (op: ReindexSavedObject) => + Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt); + export const sortAndOrderReindexOperations = flow( sortReindexOperations, orderQueuedReindexOperations diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts index bb14361f220c7..e5d054af7b1b2 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -10,7 +10,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server'; import { IndexGroup, - ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -62,7 +61,10 @@ export interface ReindexService { * @param indexName * @param opts Additional options when creating a new reindex operation */ - createReindexOperation(indexName: string, opts?: ReindexOptions): Promise; + createReindexOperation( + indexName: string, + opts?: { enqueue?: boolean } + ): Promise; /** * Retrieves all reindex operations that have the given status. @@ -101,7 +103,21 @@ export interface ReindexService { * @param indexName * @param opts As with {@link createReindexOperation} we support this setting. */ - resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise; + resumeReindexOperation( + indexName: string, + opts?: { enqueue?: boolean } + ): Promise; + + /** + * Update the update_at field on the reindex operation + * + * @remark + * Currently also sets a startedAt field on the SavedObject, not really used + * elsewhere, but is an indication that the object has started being processed. + * + * @param indexName + */ + startQueuedReindexOperation(indexName: string): Promise; /** * Cancel an in-progress reindex operation for a given index. Only allowed when the @@ -570,7 +586,7 @@ export const reindexServiceFactory = ( } }, - async createReindexOperation(indexName: string, opts?: ReindexOptions) { + async createReindexOperation(indexName: string, opts?: { enqueue: boolean }) { if (isSystemIndex(indexName)) { throw error.reindexSystemIndex( `Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.` @@ -598,7 +614,10 @@ export const reindexServiceFactory = ( } } - return actions.createReindexOp(indexName, opts); + return actions.createReindexOp( + indexName, + opts?.enqueue ? { queueSettings: { queuedAt: Date.now() } } : undefined + ); }, async findReindexOperation(indexName: string) { @@ -703,7 +722,7 @@ export const reindexServiceFactory = ( }); }, - async resumeReindexOperation(indexName: string, opts?: ReindexOptions) { + async resumeReindexOperation(indexName: string, opts?: { enqueue: boolean }) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { @@ -717,16 +736,30 @@ export const reindexServiceFactory = ( } else if (op.attributes.status !== ReindexStatus.paused) { throw new Error(`Reindex operation must be paused in order to be resumed.`); } - - const reindexOptions: ReindexOptions | undefined = opts - ? { - ...(op.attributes.reindexOptions ?? {}), - ...opts, - } - : undefined; + const queueSettings = opts?.enqueue ? { queuedAt: Date.now() } : undefined; return actions.updateReindexOp(op, { status: ReindexStatus.inProgress, + reindexOptions: queueSettings ? { queueSettings } : undefined, + }); + }); + }, + + async startQueuedReindexOperation(indexName: string) { + const reindexOp = await this.findReindexOperation(indexName); + + if (!reindexOp) { + throw error.indexNotFound(`No reindex operation found for index ${indexName}`); + } + + if (!reindexOp.attributes.reindexOptions?.queueSettings) { + throw error.reindexIsNotInQueue(`Reindex operation ${indexName} is not in the queue.`); + } + + return actions.runWhileLocked(reindexOp, async lockedReindexOp => { + const { reindexOptions } = lockedReindexOp.attributes; + reindexOptions!.queueSettings!.startedAt = Date.now(); + return actions.updateReindexOp(lockedReindexOp, { reindexOptions, }); }); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index b0311358e8f3f..06349d069a813 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -6,11 +6,11 @@ import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server'; import moment from 'moment'; import { ReindexSavedObject, ReindexStatus } from '../../../common/types'; -import { CredentialStore } from './credential_store'; +import { Credential, CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { sortAndOrderReindexOperations } from './op_utils'; +import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -130,17 +130,40 @@ export class ReindexWorker { } }; + private getCredentialScopedReindexService = (credential: Credential) => { + const fakeRequest: FakeRequest = { headers: credential }; + const scopedClusterClient = this.clusterClient.asScoped(fakeRequest); + const callAsCurrentUser = scopedClusterClient.callAsCurrentUser.bind(scopedClusterClient); + const actions = reindexActionsFactory(this.client, callAsCurrentUser); + return reindexServiceFactory( + callAsCurrentUser, + actions, + this.log, + this.licensing, + this.apmIndexPatterns + ); + }; + private updateInProgressOps = async () => { try { const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps); - const [firstOpInQueue] = queue; + let [firstOpInQueue] = queue; - if (firstOpInQueue) { + if (firstOpInQueue && !queuedOpHasStarted(firstOpInQueue)) { this.log.debug( `Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})` ); + const credential = this.credentialStore.get(firstOpInQueue); + if (credential) { + const service = this.getCredentialScopedReindexService(credential); + firstOpInQueue = await service.startQueuedReindexOperation( + firstOpInQueue.attributes.indexName + ); + // Re-associate the credentials + this.credentialStore.set(firstOpInQueue, credential); + } } this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index e640d03791cce..74c349d894839 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -8,7 +8,7 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana import { LicensingPluginSetup } from '../../../../licensing/server'; -import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types'; +import { ReindexOperation, ReindexStatus } from '../../../common/types'; import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; import { reindexServiceFactory } from '../../lib/reindexing'; @@ -53,17 +53,11 @@ export const reindexHandler = async ({ const existingOp = await reindexService.findReindexOperation(indexName); - const opts: ReindexOptions | undefined = reindexOptions - ? { - queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined, - } - : undefined; - // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. const reindexOp = existingOp && existingOp.attributes.status === ReindexStatus.paused - ? await reindexService.resumeReindexOperation(indexName, opts) - : await reindexService.createReindexOperation(indexName, opts); + ? await reindexService.resumeReindexOperation(indexName, reindexOptions) + : await reindexService.createReindexOperation(indexName, reindexOptions); // Add users credentials for the worker to use credentialStore.set(reindexOp, headers); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index 30beecd8a5de1..860a6a07f6f2e 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -263,7 +263,7 @@ describe('reindex API', () => { describe('POST /api/upgrade_assistant/reindex/batch', () => { const queueSettingsArg = { - queueSettings: { queuedAt: expect.any(Number) }, + enqueue: true, }; it('creates a collection of index operations', async () => { mockReindexService.createReindexOperation