Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Upgrade Assistant] Server-side batch reindexing #58598

Merged
merged 21 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6c7f5d7
Added server side logic for handling batch reindex
jloleysens Feb 26, 2020
cb0f786
Remove literal string interpolation from translation
jloleysens Feb 26, 2020
d1e503a
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Feb 27, 2020
79aa01a
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Feb 28, 2020
17f39e7
Merge branch 'master' of github.com:elastic/kibana into ua/batch-rein…
jloleysens Mar 2, 2020
6407b6a
Refactor return value of batch endpoint
jloleysens Mar 2, 2020
b4263fd
First iteration of batch queues
jloleysens Mar 3, 2020
8214a69
Single queue
jloleysens Mar 3, 2020
d805855
Clean up old batch queue implementation
jloleysens Mar 3, 2020
c4cd048
Slight refactor
jloleysens Mar 3, 2020
0a64e0d
Revert batch queues implementation
jloleysens Mar 4, 2020
b9c93a6
Introduction of QueueSettings
jloleysens Mar 4, 2020
23ab266
Updated worker logic to handle items in queue in series
jloleysens Mar 4, 2020
6cf9acc
Refactor /batch endpoint response to "enqueued" not "started"
jloleysens Mar 4, 2020
02ae1b2
Fixed jest tests
jloleysens Mar 4, 2020
4d57be9
Refactor worker refresh operations for readability
jloleysens Mar 5, 2020
0fd95c2
Add batch API integration test
jloleysens Mar 5, 2020
2f29404
Added a new endpoint: GET batch/queue
jloleysens Mar 5, 2020
e0f3a0e
Reset the queuedAt timestamp on resume
jloleysens Mar 5, 2020
5cd2270
Merge branch 'master' into ua/batch-reindex-server-side
elasticmachine Mar 5, 2020
63f6659
Fix jest test
jloleysens Mar 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ export const esVersionCheck = async (
}
};

export const versionCheckHandlerWrapper = (handler: RequestHandler<any, any, any>) => async (
export const versionCheckHandlerWrapper = <P, Q, B>(handler: RequestHandler<P, Q, B>) => async (
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) => {
const errorResponse = await esVersionCheck(ctx, response);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana/server';

import { LicensingPluginSetup } from '../../../../licensing/server';

import { ReindexStatus } from '../../../common/types';

import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';

export const SYMBOL_FORBIDDEN = Symbol('Forbidden');

interface ReindexHandlerArgs {
savedObjects: SavedObjectsClientContract;
dataClient: IScopedClusterClient;
indexName: string;
log: Logger;
licensing: LicensingPluginSetup;
headers: Record<string, any>;
credentialStore: CredentialStore;
getWorker: () => ReindexWorker;
}

export const reindexHandler = async ({
credentialStore,
dataClient,
headers,
indexName,
licensing,
log,
savedObjects,
getWorker,
}: ReindexHandlerArgs) => {
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser);
const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing);

if (!(await reindexService.hasRequiredPrivileges(indexName))) {
throw SYMBOL_FORBIDDEN;
}

const existingOp = await reindexService.findReindexOperation(indexName);

// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
const reindexOp =
existingOp && existingOp.attributes.status === ReindexStatus.paused
? await reindexService.resumeReindexOperation(indexName)
: await reindexService.createReindexOperation(indexName);

// Add users credentials for the worker to use
credentialStore.set(reindexOp, headers);

// Kick the worker on this node to immediately pickup the new reindex operation.
getWorker().forceRefresh();

return reindexOp.attributes;
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
*/

import { kibanaResponseFactory } from 'src/core/server';
import { licensingMock } from '../../../licensing/server/mocks';
import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock';
import { createRequestMock } from './__mocks__/request.mock';
import { licensingMock } from '../../../../licensing/server/mocks';
import { createMockRouter, MockRouter, routeHandlerContextMock } from '../__mocks__/routes.mock';
import { createRequestMock } from '../__mocks__/request.mock';

const mockReindexService = {
hasRequiredPrivileges: jest.fn(),
Expand All @@ -21,18 +21,23 @@ const mockReindexService = {
cancelReindexing: jest.fn(),
};

jest.mock('../lib/es_version_precheck', () => ({
jest.mock('../../lib/es_version_precheck', () => ({
versionCheckHandlerWrapper: (a: any) => a,
}));

jest.mock('../lib/reindexing', () => {
jest.mock('../../lib/reindexing', () => {
return {
reindexServiceFactory: () => mockReindexService,
};
});

import { IndexGroup, ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types';
import { credentialStoreFactory } from '../lib/reindexing/credential_store';
import {
IndexGroup,
ReindexSavedObject,
ReindexStatus,
ReindexWarning,
} from '../../../common/types';
import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
import { registerReindexIndicesRoutes } from './reindex_indices';

/**
Expand Down Expand Up @@ -76,7 +81,7 @@ describe('reindex API', () => {
});

afterEach(() => {
jest.resetAllMocks();
jest.clearAllMocks();
});

describe('GET /api/upgrade_assistant/reindex/{indexName}', () => {
Expand Down Expand Up @@ -255,6 +260,88 @@ describe('reindex API', () => {
});
});

describe('POST /api/upgrade_assistant/reindex/batch', () => {
it('creates a collection of index operations', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex2' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex3' },
});

const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1');
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex2');
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(3, 'theIndex3');

// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [],
successes: [
{ indexName: 'theIndex1' },
{ indexName: 'theIndex2' },
{ indexName: 'theIndex3' },
],
});
});

it('gracefully handles partial successes', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockRejectedValueOnce(new Error('oops!'));

mockReindexService.hasRequiredPrivileges
.mockResolvedValueOnce(true)
.mockResolvedValueOnce(false)
.mockResolvedValueOnce(true);

const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1');
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex3');

// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [
{
indexName: 'theIndex2',
message: 'You do not have adequate privileges to reindex "theIndex2".',
},
{ indexName: 'theIndex3', message: 'oops!' },
],
successes: [{ indexName: 'theIndex1' }],
});
});
});

describe('POST /api/upgrade_assistant/reindex/{indexName}/cancel', () => {
it('returns a 501', async () => {
mockReindexService.cancelReindexing.mockResolvedValueOnce({});
Expand Down
Loading