Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Upgrade Assistant] Fix edge case where reindex op can falsely be seen as stale (#60770) #61082

Merged
merged 1 commit into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions x-pack/plugins/upgrade_assistant/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,27 @@ export enum ReindexStatus {
export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';

export interface QueueSettings extends SavedObjectAttributes {
/**
* A Unix timestamp of when the reindex operation was enqueued.
*
* @remark
* This is used by the reindexing scheduler to determine execution
* order.
*/
queuedAt: number;

/**
* A Unix timestamp of when the reindex operation was started.
*
* @remark
* Updating this field is useful for _also_ updating the saved object "updated_at" field
* which is used to determine stale or abandoned reindex operations.
*
* For now this is used by the reindex worker scheduler to determine whether we have
* A queue item at the start of the queue.
*
*/
startedAt?: number;
}

export interface ReindexOptions extends SavedObjectAttributes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
MultipleReindexJobsFound,
CannotReindexSystemIndexInCurrent,
ReindexCannotBeCancelled,
ReindexIsNotInQueue,
} from './error_symbols';

export class ReindexError extends Error {
Expand All @@ -34,6 +35,7 @@ export const error = {
reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted),
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent),
reindexIsNotInQueue: createErrorFactory(ReindexIsNotInQueue),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent');
export const ReindexIsNotInQueue = Symbol('ReindexIsNotInQueue');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');

export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({
),
});

export const queuedOpHasStarted = (op: ReindexSavedObject) =>
Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt);

export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server';

import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand Down Expand Up @@ -62,7 +61,10 @@ export interface ReindexService {
* @param indexName
* @param opts Additional options when creating a new reindex operation
*/
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
createReindexOperation(
indexName: string,
opts?: { enqueue?: boolean }
): Promise<ReindexSavedObject>;

/**
* Retrieves all reindex operations that have the given status.
Expand Down Expand Up @@ -101,7 +103,21 @@ export interface ReindexService {
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
resumeReindexOperation(
indexName: string,
opts?: { enqueue?: boolean }
): Promise<ReindexSavedObject>;

/**
* Update the update_at field on the reindex operation
*
* @remark
* Currently also sets a startedAt field on the SavedObject, not really used
* elsewhere, but is an indication that the object has started being processed.
*
* @param indexName
*/
startQueuedReindexOperation(indexName: string): Promise<ReindexSavedObject>;

/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
Expand Down Expand Up @@ -570,7 +586,7 @@ export const reindexServiceFactory = (
}
},

async createReindexOperation(indexName: string, opts?: ReindexOptions) {
async createReindexOperation(indexName: string, opts?: { enqueue: boolean }) {
if (isSystemIndex(indexName)) {
throw error.reindexSystemIndex(
`Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.`
Expand Down Expand Up @@ -598,7 +614,10 @@ export const reindexServiceFactory = (
}
}

return actions.createReindexOp(indexName, opts);
return actions.createReindexOp(
indexName,
opts?.enqueue ? { queueSettings: { queuedAt: Date.now() } } : undefined
);
},

async findReindexOperation(indexName: string) {
Expand Down Expand Up @@ -703,7 +722,7 @@ export const reindexServiceFactory = (
});
},

async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
async resumeReindexOperation(indexName: string, opts?: { enqueue: boolean }) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
Expand All @@ -717,16 +736,30 @@ export const reindexServiceFactory = (
} else if (op.attributes.status !== ReindexStatus.paused) {
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

const reindexOptions: ReindexOptions | undefined = opts
? {
...(op.attributes.reindexOptions ?? {}),
...opts,
}
: undefined;
const queueSettings = opts?.enqueue ? { queuedAt: Date.now() } : undefined;

return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: queueSettings ? { queueSettings } : undefined,
});
});
},

async startQueuedReindexOperation(indexName: string) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
}

if (!reindexOp.attributes.reindexOptions?.queueSettings) {
throw error.reindexIsNotInQueue(`Reindex operation ${indexName} is not in the queue.`);
}

return actions.runWhileLocked(reindexOp, async lockedReindexOp => {
const { reindexOptions } = lockedReindexOp.attributes;
reindexOptions!.queueSettings!.startedAt = Date.now();
return actions.updateReindexOp(lockedReindexOp, {
reindexOptions,
});
});
Expand Down
31 changes: 27 additions & 4 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';
import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { Credential, CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';
import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
Expand Down Expand Up @@ -130,17 +130,40 @@ export class ReindexWorker {
}
};

private getCredentialScopedReindexService = (credential: Credential) => {
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.callAsCurrentUser.bind(scopedClusterClient);
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
return reindexServiceFactory(
callAsCurrentUser,
actions,
this.log,
this.licensing,
this.apmIndexPatterns
);
};

private updateInProgressOps = async () => {
try {
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);

const [firstOpInQueue] = queue;
let [firstOpInQueue] = queue;

if (firstOpInQueue) {
if (firstOpInQueue && !queuedOpHasStarted(firstOpInQueue)) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
const credential = this.credentialStore.get(firstOpInQueue);
if (credential) {
const service = this.getCredentialScopedReindexService(credential);
firstOpInQueue = await service.startQueuedReindexOperation(
firstOpInQueue.attributes.indexName
);
// Re-associate the credentials
this.credentialStore.set(firstOpInQueue, credential);
}
}

this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana

import { LicensingPluginSetup } from '../../../../licensing/server';

import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types';
import { ReindexOperation, ReindexStatus } from '../../../common/types';

import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { reindexServiceFactory } from '../../lib/reindexing';
Expand Down Expand Up @@ -53,17 +53,11 @@ export const reindexHandler = async ({

const existingOp = await reindexService.findReindexOperation(indexName);

const opts: ReindexOptions | undefined = reindexOptions
? {
queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined,
}
: undefined;

// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
const reindexOp =
existingOp && existingOp.attributes.status === ReindexStatus.paused
? await reindexService.resumeReindexOperation(indexName, opts)
: await reindexService.createReindexOperation(indexName, opts);
? await reindexService.resumeReindexOperation(indexName, reindexOptions)
: await reindexService.createReindexOperation(indexName, reindexOptions);

// Add users credentials for the worker to use
credentialStore.set(reindexOp, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ describe('reindex API', () => {

describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
queueSettings: { queuedAt: expect.any(Number) },
enqueue: true,
};
it('creates a collection of index operations', async () => {
mockReindexService.createReindexOperation
Expand Down