From e4a088c7eb09fdb603ec165b77759d919c048cab Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Mon, 25 Sep 2023 13:24:42 +0200 Subject: [PATCH] [Migrations] Ensure individual migrator failures do not break consistency (#166924) ## Summary Tackles https://github.com/elastic/kibana/issues/158818 The goal of the PR is to introduce failures in single migrators at some of the crucial steps of the migration, testing that consistency is maintained, and that subsequent migration attempts can successfully complete the upgrade. This is done by _proxying_ the `Client` class, which uses the elasticsearch-js library underneath to perform all calls to ES. Inspired on https://github.com/elastic/kibana/pull/158995. --- .../elasticsearch_client_wrapper.ts | 81 +++++++ .../group5/dot_kibana_split.test.ts | 205 ++++++++++++++++++ .../migrations/kibana_migrator_test_kit.ts | 6 +- 3 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts diff --git a/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts b/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts new file mode 100644 index 0000000000000..66975bbba4409 --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { Client } from '@elastic/elasticsearch'; + +export type ElasticsearchClientWrapperFactory = (client: Client) => Client; + +interface GetElasticsearchClientWrapperFactoryParams { + failOn: (methodName: string, methodArguments: any[]) => boolean; + errorDelaySeconds?: number; +} + +export const getElasticsearchClientWrapperFactory = ({ + failOn, + errorDelaySeconds, +}: GetElasticsearchClientWrapperFactoryParams): ElasticsearchClientWrapperFactory => { + const interceptClientMethod = (methodName: string, method: any): any => { + return new Proxy(method, { + apply: (applyTarget, thisArg, methodArguments) => { + if (failOn(methodName, methodArguments)) { + return new Promise((_, reject) => + setTimeout( + () => reject(`Error: esClient.${methodName}() failed unexpectedly`), + (errorDelaySeconds || 0) * 1000 + ) + ); + } + return Reflect.apply(applyTarget, thisArg, methodArguments); + }, + }); + }; + + const interceptClientApi = (apiName: string, api: any): any => + new Proxy(api, { + get(target, prop) { + return typeof target[prop] === 'function' + ? interceptClientMethod(`${apiName}.${prop.toString()}`, target[prop]) + : target[prop]; + }, + }); + + const wrapClient = (client: Client): any => + new Proxy(client, { + get(target, prop, receiver) { + switch (prop) { + // intercept top level esClient methods + case 'bulk': + case 'deleteByQuery': + case 'info': + case 'search': + case 'updateByQuery': + const clientMethod = Reflect.get(target, prop, receiver); + return interceptClientMethod(prop, clientMethod); + // intercept esClient APIs + case 'cluster': + case 'indices': + case 'tasks': + const clientApi = Reflect.get(target, prop, receiver); + return interceptClientApi(prop, clientApi); + // proxy child Clients too + case 'child': + return new Proxy(target[prop], { + apply(applyTarget, thisArg, argArray) { + const childClient = Reflect.apply(applyTarget, thisArg, argArray); + return wrapClient(childClient); + }, + }); + // do NOT intercept the rest of properties and symbols + default: + return Reflect.get(target, prop, receiver); + } + }, + }); + + return wrapClient; +}; diff --git a/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts index 636e70cd9dd9e..25dc5a46a6793 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts @@ -29,6 +29,7 @@ import { } from '../kibana_migrator_test_kit'; import { delay, parseLogFile } from '../test_utils'; import '../jest_matchers'; +import { getElasticsearchClientWrapperFactory } from '../elasticsearch_client_wrapper'; // define a type => index distribution const RELOCATE_TYPES: Record = { @@ -58,6 +59,210 @@ describe('split .kibana index into multiple system indices', () => { await clearLog(logFilePathSecondRun); }); + describe('failure cases', () => { + const getFailingKibanaMigratorTestKit = async ({ + logFilePath, + failOn, + delaySeconds, + }: { + logFilePath: string; + failOn: (methodName: string, methodArgs: any[]) => boolean; + delaySeconds?: number; + }) => { + const clientWrapperFactory = getElasticsearchClientWrapperFactory({ + failOn, + errorDelaySeconds: delaySeconds, + }); + + return await getKibanaMigratorTestKit({ + types: typeRegistry.getAllTypes(), + kibanaIndex: MAIN_SAVED_OBJECT_INDEX, + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + logFilePath, + clientWrapperFactory, + }); + }; + + beforeEach(async () => { + esServer = await startElasticsearch({ + dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'), + }); + }); + + describe('when the .kibana_task_manager migrator fails on the TRANSFORMED_DOCUMENTS_BULK_INDEX state, after the other ones have finished', () => { + it('is capable of completing the .kibana_task_manager migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.bulk({ index: '.kibana_task_manager_1' }) which supposedly causes + // the .kibana_task_manager migrator to fail on the TRANSFORMED_DOCUMENTS_BULK_INDEX state + return methodName === 'bulk' && methodArgs[0].index === '.kibana_task_manager_1'; + }, + delaySeconds: 90, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_task_manager] index. Error: esClient.bulk() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.bulk({ index: '.kibana_8.11.0_reindex_temp_alias' }) which supposedly causes + // the .kibana migrator to fail on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK + return ( + methodName === 'bulk' && + methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp_alias` + ); + }, + delaySeconds: 10, // give the .kibana_task_manager migrator enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.bulk() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the CLONE_TEMP_TO_TARGET state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes + // the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET + return ( + methodName === 'indices.clone' && + methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp` && + methodArgs[0].target === `.kibana_${currentVersion}_001` + ); + }, + delaySeconds: 15, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.indices.clone() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes + // the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes) + return ( + methodName === 'updateByQuery' && + methodArgs[0].index === `.kibana_${currentVersion}_001` + ); + }, + delaySeconds: 10, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.updateByQuery() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana_analytics migrator fails on the CLONE_TEMP_TO_TARGET state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes + // the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET + return ( + methodName === 'indices.clone' && + methodArgs[0].index === `.kibana_analytics_${currentVersion}_reindex_temp` && + methodArgs[0].target === `.kibana_analytics_${currentVersion}_001` + ); + }, + delaySeconds: 15, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.indices.clone() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana_analytics migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes + // the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes) + return ( + methodName === 'updateByQuery' && + methodArgs[0].index === `.kibana_analytics_${currentVersion}_001` + ); + }, + delaySeconds: 10, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.updateByQuery() failed unexpectedly' + ); + } + }); + }); + + afterEach(async () => { + const { runMigrations: secondRun } = await getKibanaMigratorTestKit({ + types: typeRegistry.getAllTypes(), + logFilePath: logFilePathSecondRun, + kibanaIndex: MAIN_SAVED_OBJECT_INDEX, + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + }); + const results = await secondRun(); + expect( + results + .flat() + .every((result) => result.status === 'migrated' || result.status === 'patched') + ).toEqual(true); + + await esServer?.stop(); + await delay(2); + }); + }); + describe('when migrating from a legacy version', () => { let migratorTestKitFactory: (logFilePath: string) => Promise; diff --git a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts index a911fcdbdead5..1f6e9a7a58c77 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts @@ -50,6 +50,7 @@ import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; import type { NodeRoles } from '@kbn/core-node-server'; import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures'; import { delay } from './test_utils'; +import type { ElasticsearchClientWrapperFactory } from './elasticsearch_client_wrapper'; export const defaultLogFilePath = Path.join(__dirname, 'kibana_migrator_test_kit.log'); @@ -76,6 +77,7 @@ export interface KibanaMigratorTestKitParams { types?: Array>; defaultIndexTypesMap?: IndexTypesMap; logFilePath?: string; + clientWrapperFactory?: ElasticsearchClientWrapperFactory; } export interface KibanaMigratorTestKit { @@ -134,6 +136,7 @@ export const getKibanaMigratorTestKit = async ({ types = [], logFilePath = defaultLogFilePath, nodeRoles = defaultNodeRoles, + clientWrapperFactory, }: KibanaMigratorTestKitParams = {}): Promise => { let hasRun = false; const loggingSystem = new LoggingSystem(); @@ -145,7 +148,8 @@ export const getKibanaMigratorTestKit = async ({ const loggingConf = await firstValueFrom(configService.atPath('logging')); loggingSystem.upgrade(loggingConf); - const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const client = clientWrapperFactory ? clientWrapperFactory(rawClient) : rawClient; const typeRegistry = new SavedObjectTypeRegistry();