Skip to content

Commit

Permalink
Refactor reindex routes into separate single and batch reindex files.…
Browse files Browse the repository at this point in the history
… Apply version precheck to batch routes. (elastic#113822)
  • Loading branch information
cjcenizal authored and sabarasaba committed Oct 20, 2021
1 parent 15cd95d commit a408298
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { registerAppRoutes } from './app';
import { registerCloudBackupStatusRoutes } from './cloud_backup_status';
import { registerESDeprecationRoutes } from './es_deprecations';
import { registerDeprecationLoggingRoutes } from './deprecation_logging';
import { registerReindexIndicesRoutes } from './reindex_indices';
import { registerReindexIndicesRoutes, registerBatchReindexIndicesRoutes } from './reindex_indices';
import { registerTelemetryRoutes } from './telemetry';
import { registerUpdateSettingsRoute } from './update_index_settings';
import { registerMlSnapshotRoutes } from './ml_snapshots';
Expand All @@ -24,6 +24,7 @@ export function registerRoutes(dependencies: RouteDependencies, getWorker: () =>
registerESDeprecationRoutes(dependencies);
registerDeprecationLoggingRoutes(dependencies);
registerReindexIndicesRoutes(dependencies, getWorker);
registerBatchReindexIndicesRoutes(dependencies, getWorker);
registerTelemetryRoutes(dependencies);
registerUpdateSettingsRoute(dependencies);
registerMlSnapshotRoutes(dependencies);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { kibanaResponseFactory } from 'src/core/server';
import { loggingSystemMock } from 'src/core/server/mocks';
import { licensingMock } from '../../../../licensing/server/mocks';
import { securityMock } from '../../../../security/server/mocks';
import { createMockRouter, MockRouter, routeHandlerContextMock } from '../__mocks__/routes.mock';
import { createRequestMock } from '../__mocks__/request.mock';
import { handleEsError } from '../../shared_imports';

const mockReindexService = {
hasRequiredPrivileges: jest.fn(),
detectReindexWarnings: jest.fn(),
getIndexGroup: jest.fn(),
createReindexOperation: jest.fn(),
findAllInProgressOperations: jest.fn(),
findReindexOperation: jest.fn(),
processNextStep: jest.fn(),
resumeReindexOperation: jest.fn(),
cancelReindexing: jest.fn(),
};
jest.mock('../../lib/es_version_precheck', () => ({
versionCheckHandlerWrapper: (a: any) => a,
}));

jest.mock('../../lib/reindexing', () => {
return {
reindexServiceFactory: () => mockReindexService,
};
});

import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
import { registerBatchReindexIndicesRoutes } from './batch_reindex_indices';

const logMock = loggingSystemMock.create().get();

/**
* Since these route callbacks are so thin, these serve simply as integration tests
* to ensure they're wired up to the lib functions correctly. Business logic is tested
* more thoroughly in the es_migration_apis test.
*/
describe('reindex API', () => {
let routeDependencies: any;
let mockRouter: MockRouter;

const credentialStore = credentialStoreFactory(logMock);
const worker = {
includes: jest.fn(),
forceRefresh: jest.fn(),
} as any;

beforeEach(() => {
mockRouter = createMockRouter();
routeDependencies = {
credentialStore,
router: mockRouter,
licensing: licensingMock.createSetup(),
lib: { handleEsError },
getSecurityPlugin: () => securityMock.createStart(),
};
registerBatchReindexIndicesRoutes(routeDependencies, () => worker);

mockReindexService.hasRequiredPrivileges.mockResolvedValue(true);
mockReindexService.detectReindexWarnings.mockReset();
mockReindexService.getIndexGroup.mockReset();
mockReindexService.createReindexOperation.mockReset();
mockReindexService.findAllInProgressOperations.mockReset();
mockReindexService.findReindexOperation.mockReset();
mockReindexService.processNextStep.mockReset();
mockReindexService.resumeReindexOperation.mockReset();
mockReindexService.cancelReindexing.mockReset();
worker.includes.mockReset();
worker.forceRefresh.mockReset();

// Reset the credentialMap
credentialStore.clear();
});

afterEach(() => {
jest.clearAllMocks();
});

describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
enqueue: true,
};
it('creates a collection of index operations', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex2' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex3' },
});

const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
1,
'theIndex1',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
2,
'theIndex2',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
3,
'theIndex3',
queueSettingsArg
);

// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [],
enqueued: [
{ indexName: 'theIndex1' },
{ indexName: 'theIndex2' },
{ indexName: 'theIndex3' },
],
});
});

it('gracefully handles partial successes', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockRejectedValueOnce(new Error('oops!'));

mockReindexService.hasRequiredPrivileges
.mockResolvedValueOnce(true)
.mockResolvedValueOnce(false)
.mockResolvedValueOnce(true);

const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);

// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
1,
'theIndex1',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
2,
'theIndex3',
queueSettingsArg
);

// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [
{
indexName: 'theIndex2',
message: 'You do not have adequate privileges to reindex "theIndex2".',
},
{ indexName: 'theIndex3', message: 'oops!' },
],
enqueued: [{ indexName: 'theIndex1' }],
});
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { schema } from '@kbn/config-schema';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';

import { API_BASE_PATH } from '../../../common/constants';
import { ReindexStatus } from '../../../common/types';
import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck';
import { ReindexWorker } from '../../lib/reindexing';
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { sortAndOrderReindexOperations } from '../../lib/reindexing/op_utils';
import { RouteDependencies } from '../../types';
import { mapAnyErrorToKibanaHttpResponse } from './map_any_error_to_kibana_http_response';
import { reindexHandler } from './reindex_handler';
import { GetBatchQueueResponse, PostBatchResponse } from './types';

export function registerBatchReindexIndicesRoutes(
{
credentialStore,
router,
licensing,
log,
getSecurityPlugin,
lib: { handleEsError },
}: RouteDependencies,
getWorker: () => ReindexWorker
) {
const BASE_PATH = `${API_BASE_PATH}/reindex`;

// Get the current batch queue
router.get(
{
path: `${BASE_PATH}/batch/queue`,
validate: {},
},
versionCheckHandlerWrapper(
async (
{
core: {
elasticsearch: { client: esClient },
savedObjects,
},
},
request,
response
) => {
const { client } = savedObjects;
const callAsCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(client, callAsCurrentUser);
try {
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);
const { queue } = sortAndOrderReindexOperations(inProgressOps);
const result: GetBatchQueueResponse = {
queue: queue.map((savedObject) => savedObject.attributes),
};
return response.ok({
body: result,
});
} catch (error) {
if (error instanceof ResponseError) {
return handleEsError({ error, response });
}
return mapAnyErrorToKibanaHttpResponse(error);
}
}
)
);

// Add indices for reindexing to the worker's batch
router.post(
{
path: `${BASE_PATH}/batch`,
validate: {
body: schema.object({
indexNames: schema.arrayOf(schema.string()),
}),
},
},
versionCheckHandlerWrapper(
async (
{
core: {
savedObjects: { client: savedObjectsClient },
elasticsearch: { client: esClient },
},
},
request,
response
) => {
const { indexNames } = request.body;
const results: PostBatchResponse = {
enqueued: [],
errors: [],
};
for (const indexName of indexNames) {
try {
const result = await reindexHandler({
savedObjects: savedObjectsClient,
dataClient: esClient,
indexName,
log,
licensing,
request,
credentialStore,
reindexOptions: {
enqueue: true,
},
security: getSecurityPlugin(),
});
results.enqueued.push(result);
} catch (e) {
results.errors.push({
indexName,
message: e.message,
});
}
}

if (results.errors.length < indexNames.length) {
// Kick the worker on this node to immediately pickup the batch.
getWorker().forceRefresh();
}

return response.ok({ body: results });
}
)
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
ElasticsearchServiceStart,
Logger,
SavedObjectsClient,
} from '../../../../../../src/core/server';

import { LicensingPluginSetup } from '../../../../licensing/server';
import { SecurityPluginStart } from '../../../../security/server';
import { ReindexWorker } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';

interface CreateReindexWorker {
logger: Logger;
elasticsearchService: ElasticsearchServiceStart;
credentialStore: CredentialStore;
savedObjects: SavedObjectsClient;
licensing: LicensingPluginSetup;
security: SecurityPluginStart;
}

export function createReindexWorker({
logger,
elasticsearchService,
credentialStore,
savedObjects,
licensing,
security,
}: CreateReindexWorker) {
const esClient = elasticsearchService.client;
return new ReindexWorker(savedObjects, credentialStore, esClient, logger, licensing, security);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
* 2.0.
*/

export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';
export { createReindexWorker } from './create_reindex_worker';
export { registerReindexIndicesRoutes } from './reindex_indices';
export { registerBatchReindexIndicesRoutes } from './batch_reindex_indices';
Loading

0 comments on commit a408298

Please sign in to comment.