Skip to content

Commit

Permalink
[Upgrade Assistant] Open And Close Slight Refactor (#59890)
Browse files Browse the repository at this point in the history
* Refactor: Move checking of closed index to single point

We should rather only check if an index is currently closed the
moment before starting to reindex. We still store a flag to
indicate that we opened an index that was closed, but this
should not be set from the reindex handlers because the reindex
task may only start some time later in which case the closed
index could have been opened and our reindex job will open it
and close it again.

* Added debug log

* Added comment

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
jloleysens and elasticmachine authored Mar 12, 2020
1 parent d5c0928 commit abc14f5
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { IScopedClusterClient } from 'kibana/server';
import { APICaller } from 'kibana/server';
import { getIndexStateFromClusterState } from '../../common/get_index_state_from_cluster_state';
import { ClusterStateAPIResponse } from '../../common/types';

type StatusCheckResult = Record<string, 'open' | 'close'>;

export const esIndicesStateCheck = async (
dataClient: IScopedClusterClient,
callAsUser: APICaller,
indices: string[]
): Promise<StatusCheckResult> => {
// According to https://www.elastic.co/guide/en/elasticsearch/reference/7.6/cluster-state.html
// The response from this call is considered internal and subject to change. We have an API
// integration test for asserting that the current ES version still returns what we expect.
// This lives in x-pack/test/upgrade_assistant_integration
const clusterState: ClusterStateAPIResponse = await dataClient.callAsCurrentUser(
'cluster.state',
{
index: indices,
metric: 'metadata',
}
);
const clusterState: ClusterStateAPIResponse = await callAsUser('cluster.state', {
index: indices,
metric: 'metadata',
});

const result: StatusCheckResult = {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export async function getUpgradeAssistantStatus(
// If we have found deprecation information for index/indices check whether the index is
// open or closed.
if (indexNames.length) {
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
const indexStates = await esIndicesStateCheck(
dataClient.callAsCurrentUser.bind(dataClient),
indexNames
);

indices.forEach(indexData => {
indexData.blockerForReindexing =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

jest.mock('../es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
import { BehaviorSubject } from 'rxjs';
import { Logger } from 'src/core/server';
import { loggingServiceMock } from 'src/core/server/mocks';
Expand All @@ -19,6 +19,8 @@ import { CURRENT_MAJOR_VERSION, PREV_MAJOR_VERSION } from '../../../common/versi
import { licensingMock } from '../../../../licensing/server/mocks';
import { LicensingPluginSetup } from '../../../../licensing/server';

import { esIndicesStateCheck } from '../es_indices_state_check';

import {
isMlIndex,
isWatcherIndex,
Expand All @@ -43,6 +45,7 @@ describe('reindexService', () => {
Promise.reject(`Mock function ${name} was not implemented!`);

beforeEach(() => {
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
actions = {
createReindexOp: jest.fn(unimplemented('createReindexOp')),
deleteReindexOp: jest.fn(unimplemented('deleteReindexOp')),
Expand Down Expand Up @@ -844,7 +847,6 @@ describe('reindexService', () => {
attributes: {
...defaultAttributes,
lastCompletedStep: ReindexStep.newIndexCreated,
reindexOptions: { openAndClose: false },
},
} as ReindexSavedObject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import { APICaller, Logger } from 'src/core/server';
import { first } from 'rxjs/operators';

import { LicensingPluginSetup } from '../../../../licensing/server';

import {
IndexGroup,
ReindexOptions,
Expand All @@ -15,14 +17,16 @@ import {
ReindexWarning,
} from '../../../common/types';

import { esIndicesStateCheck } from '../es_indices_state_check';

import {
generateNewIndexName,
getReindexWarnings,
sourceNameForIndex,
transformFlatSettings,
} from './index_settings';

import { ReindexActions } from './reindex_actions';
import { LicensingPluginSetup } from '../../../../licensing/server';

import { error } from './error';

Expand Down Expand Up @@ -317,7 +321,12 @@ export const reindexServiceFactory = (
const startReindexing = async (reindexOp: ReindexSavedObject) => {
const { indexName, reindexOptions } = reindexOp.attributes;

if (reindexOptions?.openAndClose === true) {
// Where possible, derive reindex options at the last moment before reindexing
// to prevent them from becoming stale as they wait in the queue.
const indicesState = await esIndicesStateCheck(callAsUser, [indexName]);
const openAndClose = indicesState[indexName] === 'close';
if (indicesState[indexName] === 'close') {
log.debug(`Detected closed index ${indexName}, opening...`);
await callAsUser('indices.open', { index: indexName });
}

Expand All @@ -334,6 +343,12 @@ export const reindexServiceFactory = (
lastCompletedStep: ReindexStep.reindexStarted,
reindexTaskId: startReindex.task,
reindexTaskPercComplete: 0,
reindexOptions: {
...(reindexOptions ?? {}),
// Indicate to downstream states whether we opened a closed index that should be
// closed again.
openAndClose,
},
});
};

Expand Down Expand Up @@ -654,9 +669,16 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

const reindexOptions: ReindexOptions | undefined = opts
? {
...(op.attributes.reindexOptions ?? {}),
...opts,
}
: undefined;

return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts ?? op.attributes.reindexOptions,
reindexOptions,
});
});
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ interface ReindexHandlerArgs {
headers: Record<string, any>;
credentialStore: CredentialStore;
reindexOptions?: {
openAndClose?: boolean;
enqueue?: boolean;
};
}
Expand Down Expand Up @@ -56,7 +55,6 @@ export const reindexHandler = async ({

const opts: ReindexOptions | undefined = reindexOptions
? {
openAndClose: reindexOptions.openAndClose,
queueSettings: reindexOptions.enqueue ? { queuedAt: Date.now() } : undefined,
}
: undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const mockReindexService = {
resumeReindexOperation: jest.fn(),
cancelReindexing: jest.fn(),
};
jest.mock('../../lib/es_indices_state_check', () => ({ esIndicesStateCheck: jest.fn() }));
jest.mock('../../lib/es_version_precheck', () => ({
versionCheckHandlerWrapper: (a: any) => a,
}));
Expand All @@ -39,7 +38,6 @@ import {
} from '../../../common/types';
import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
import { registerReindexIndicesRoutes } from './reindex_indices';
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';

/**
* Since these route callbacks are so thin, these serve simply as integration tests
Expand All @@ -57,7 +55,6 @@ describe('reindex API', () => {
} as any;

beforeEach(() => {
(esIndicesStateCheck as jest.Mock).mockResolvedValue({});
mockRouter = createMockRouter();
routeDependencies = {
credentialStore,
Expand Down Expand Up @@ -168,9 +165,7 @@ describe('reindex API', () => {
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', {
openAndClose: false,
});
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined);

// It returned the right results
expect(resp.status).toEqual(200);
Expand Down Expand Up @@ -237,10 +232,7 @@ describe('reindex API', () => {
kibanaResponseFactory
);
// It called resume correctly
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', {
openAndClose: false,
queueSettings: undefined,
});
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled();

// It returned the right results
Expand Down Expand Up @@ -269,7 +261,6 @@ describe('reindex API', () => {

describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
openAndClose: false,
queueSettings: { queuedAt: expect.any(Number) },
};
it('creates a collection of index operations', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { LicensingPluginSetup } from '../../../../licensing/server';
import { ReindexStatus } from '../../../common/types';

import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck';
import { esIndicesStateCheck } from '../../lib/es_indices_state_check';
import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
Expand Down Expand Up @@ -108,7 +107,6 @@ export function registerReindexIndicesRoutes(
response
) => {
const { indexName } = request.params;
const indexStates = await esIndicesStateCheck(dataClient, [indexName]);
try {
const result = await reindexHandler({
savedObjects: savedObjectsClient,
Expand All @@ -118,7 +116,6 @@ export function registerReindexIndicesRoutes(
licensing,
headers: request.headers,
credentialStore,
reindexOptions: { openAndClose: indexStates[indexName] === 'close' },
});

// Kick the worker on this node to immediately pickup the new reindex operation.
Expand Down Expand Up @@ -190,7 +187,6 @@ export function registerReindexIndicesRoutes(
response
) => {
const { indexNames } = request.body;
const indexStates = await esIndicesStateCheck(dataClient, indexNames);
const results: PostBatchResponse = {
enqueued: [],
errors: [],
Expand All @@ -206,7 +202,6 @@ export function registerReindexIndicesRoutes(
headers: request.headers,
credentialStore,
reindexOptions: {
openAndClose: indexStates[indexName] === 'close',
enqueue: true,
},
});
Expand Down

0 comments on commit abc14f5

Please sign in to comment.