Skip to content

Commit

Permalink
[Upgrade Assistant] Fix edge case where reindex op can falsely be see…
Browse files Browse the repository at this point in the history
…n as stale (#60770) (#61082)

* Fix edge case where reindex op is can falsely be seen as stale

This is for multiple Kibana workers, to ensure that an item just
coming off the queue is seen as "new" we set a "startedAt" field
which will update the reindex op and give it the full timeout
window.

* Update tests to use new api too

Co-authored-by: Elastic Machine <[email protected]>
# Conflicts:
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts
#	x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
  • Loading branch information
jloleysens authored Mar 24, 2020
1 parent 37ce346 commit 8d39068
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 27 deletions.
20 changes: 20 additions & 0 deletions x-pack/plugins/upgrade_assistant/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,27 @@ export enum ReindexStatus {
export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';

export interface QueueSettings extends SavedObjectAttributes {
/**
* A Unix timestamp of when the reindex operation was enqueued.
*
* @remark
* This is used by the reindexing scheduler to determine execution
* order.
*/
queuedAt: number;

/**
* A Unix timestamp of when the reindex operation was started.
*
* @remark
* Updating this field is useful for _also_ updating the saved object "updated_at" field
* which is used to determine stale or abandoned reindex operations.
*
* For now this is used by the reindex worker scheduler to determine whether we have
* A queue item at the start of the queue.
*
*/
startedAt?: number;
}

export interface ReindexOptions extends SavedObjectAttributes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
MultipleReindexJobsFound,
CannotReindexSystemIndexInCurrent,
ReindexCannotBeCancelled,
ReindexIsNotInQueue,
} from './error_symbols';

export class ReindexError extends Error {
Expand All @@ -34,6 +35,7 @@ export const error = {
reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted),
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent),
reindexIsNotInQueue: createErrorFactory(ReindexIsNotInQueue),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent');
export const ReindexIsNotInQueue = Symbol('ReindexIsNotInQueue');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');

export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({
),
});

export const queuedOpHasStarted = (op: ReindexSavedObject) =>
Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt);

export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server';

import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand Down Expand Up @@ -62,7 +61,10 @@ export interface ReindexService {
* @param indexName
* @param opts Additional options when creating a new reindex operation
*/
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
createReindexOperation(
indexName: string,
opts?: { enqueue?: boolean }
): Promise<ReindexSavedObject>;

/**
* Retrieves all reindex operations that have the given status.
Expand Down Expand Up @@ -101,7 +103,21 @@ export interface ReindexService {
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
resumeReindexOperation(
indexName: string,
opts?: { enqueue?: boolean }
): Promise<ReindexSavedObject>;

/**
* Update the update_at field on the reindex operation
*
* @remark
* Currently also sets a startedAt field on the SavedObject, not really used
* elsewhere, but is an indication that the object has started being processed.
*
* @param indexName
*/
startQueuedReindexOperation(indexName: string): Promise<ReindexSavedObject>;

/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
Expand Down Expand Up @@ -570,7 +586,7 @@ export const reindexServiceFactory = (
}
},

async createReindexOperation(indexName: string, opts?: ReindexOptions) {
async createReindexOperation(indexName: string, opts?: { enqueue: boolean }) {
if (isSystemIndex(indexName)) {
throw error.reindexSystemIndex(
`Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.`
Expand Down Expand Up @@ -598,7 +614,10 @@ export const reindexServiceFactory = (
}
}

return actions.createReindexOp(indexName, opts);
return actions.createReindexOp(
indexName,
opts?.enqueue ? { queueSettings: { queuedAt: Date.now() } } : undefined
);
},

async findReindexOperation(indexName: string) {
Expand Down Expand Up @@ -703,7 +722,7 @@ export const reindexServiceFactory = (
});
},

async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
async resumeReindexOperation(indexName: string, opts?: { enqueue: boolean }) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
Expand All @@ -717,16 +736,30 @@ export const reindexServiceFactory = (
} else if (op.attributes.status !== ReindexStatus.paused) {
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

const reindexOptions: ReindexOptions | undefined = opts
? {
...(op.attributes.reindexOptions ?? {}),
...opts,
}
: undefined;
const queueSettings = opts?.enqueue ? { queuedAt: Date.now() } : undefined;

return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: queueSettings ? { queueSettings } : undefined,
});
});
},

async startQueuedReindexOperation(indexName: string) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
}

if (!reindexOp.attributes.reindexOptions?.queueSettings) {
throw error.reindexIsNotInQueue(`Reindex operation ${indexName} is not in the queue.`);
}

return actions.runWhileLocked(reindexOp, async lockedReindexOp => {
const { reindexOptions } = lockedReindexOp.attributes;
reindexOptions!.queueSettings!.startedAt = Date.now();
return actions.updateReindexOp(lockedReindexOp, {
reindexOptions,
});
});
Expand Down
31 changes: 27 additions & 4 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';
import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { Credential, CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';
import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
Expand Down Expand Up @@ -130,17 +130,40 @@ export class ReindexWorker {
}
};

private getCredentialScopedReindexService = (credential: Credential) => {
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.callAsCurrentUser.bind(scopedClusterClient);
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
return reindexServiceFactory(
callAsCurrentUser,
actions,
this.log,
this.licensing,
this.apmIndexPatterns
);
};

private updateInProgressOps = async () => {
try {
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);

const [firstOpInQueue] = queue;
let [firstOpInQueue] = queue;

if (firstOpInQueue) {
if (firstOpInQueue && !queuedOpHasStarted(firstOpInQueue)) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
const credential = this.credentialStore.get(firstOpInQueue);
if (credential) {
const service = this.getCredentialScopedReindexService(credential);
firstOpInQueue = await service.startQueuedReindexOperation(
firstOpInQueue.attributes.indexName
);
// Re-associate the credentials
this.credentialStore.set(firstOpInQueue, credential);
}
}

this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana

import { LicensingPluginSetup } from '../../../../licensing/server';

import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types';
import { ReindexOperation, ReindexStatus } from '../../../common/types';

import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { reindexServiceFactory } from '../../lib/reindexing';
Expand Down Expand Up @@ -53,17 +53,11 @@ export const reindexHandler = async ({

const existingOp = await reindexService.findReindexOperation(indexName);

const opts: ReindexOptions | undefined = reindexOptions
? {
queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined,
}
: undefined;

// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
const reindexOp =
existingOp && existingOp.attributes.status === ReindexStatus.paused
? await reindexService.resumeReindexOperation(indexName, opts)
: await reindexService.createReindexOperation(indexName, opts);
? await reindexService.resumeReindexOperation(indexName, reindexOptions)
: await reindexService.createReindexOperation(indexName, reindexOptions);

// Add users credentials for the worker to use
credentialStore.set(reindexOp, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ describe('reindex API', () => {

describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
queueSettings: { queuedAt: expect.any(Number) },
enqueue: true,
};
it('creates a collection of index operations', async () => {
mockReindexService.createReindexOperation
Expand Down

0 comments on commit 8d39068

Please sign in to comment.