diff --git a/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts b/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts new file mode 100644 index 0000000000000..66975bbba4409 --- /dev/null +++ b/src/core/server/integration_tests/saved_objects/migrations/elasticsearch_client_wrapper.ts @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { Client } from '@elastic/elasticsearch'; + +export type ElasticsearchClientWrapperFactory = (client: Client) => Client; + +interface GetElasticsearchClientWrapperFactoryParams { + failOn: (methodName: string, methodArguments: any[]) => boolean; + errorDelaySeconds?: number; +} + +export const getElasticsearchClientWrapperFactory = ({ + failOn, + errorDelaySeconds, +}: GetElasticsearchClientWrapperFactoryParams): ElasticsearchClientWrapperFactory => { + const interceptClientMethod = (methodName: string, method: any): any => { + return new Proxy(method, { + apply: (applyTarget, thisArg, methodArguments) => { + if (failOn(methodName, methodArguments)) { + return new Promise((_, reject) => + setTimeout( + () => reject(`Error: esClient.${methodName}() failed unexpectedly`), + (errorDelaySeconds || 0) * 1000 + ) + ); + } + return Reflect.apply(applyTarget, thisArg, methodArguments); + }, + }); + }; + + const interceptClientApi = (apiName: string, api: any): any => + new Proxy(api, { + get(target, prop) { + return typeof target[prop] === 'function' + ? interceptClientMethod(`${apiName}.${prop.toString()}`, target[prop]) + : target[prop]; + }, + }); + + const wrapClient = (client: Client): any => + new Proxy(client, { + get(target, prop, receiver) { + switch (prop) { + // intercept top level esClient methods + case 'bulk': + case 'deleteByQuery': + case 'info': + case 'search': + case 'updateByQuery': + const clientMethod = Reflect.get(target, prop, receiver); + return interceptClientMethod(prop, clientMethod); + // intercept esClient APIs + case 'cluster': + case 'indices': + case 'tasks': + const clientApi = Reflect.get(target, prop, receiver); + return interceptClientApi(prop, clientApi); + // proxy child Clients too + case 'child': + return new Proxy(target[prop], { + apply(applyTarget, thisArg, argArray) { + const childClient = Reflect.apply(applyTarget, thisArg, argArray); + return wrapClient(childClient); + }, + }); + // do NOT intercept the rest of properties and symbols + default: + return Reflect.get(target, prop, receiver); + } + }, + }); + + return wrapClient; +}; diff --git a/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts index 636e70cd9dd9e..25dc5a46a6793 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group5/dot_kibana_split.test.ts @@ -29,6 +29,7 @@ import { } from '../kibana_migrator_test_kit'; import { delay, parseLogFile } from '../test_utils'; import '../jest_matchers'; +import { getElasticsearchClientWrapperFactory } from '../elasticsearch_client_wrapper'; // define a type => index distribution const RELOCATE_TYPES: Record = { @@ -58,6 +59,210 @@ describe('split .kibana index into multiple system indices', () => { await clearLog(logFilePathSecondRun); }); + describe('failure cases', () => { + const getFailingKibanaMigratorTestKit = async ({ + logFilePath, + failOn, + delaySeconds, + }: { + logFilePath: string; + failOn: (methodName: string, methodArgs: any[]) => boolean; + delaySeconds?: number; + }) => { + const clientWrapperFactory = getElasticsearchClientWrapperFactory({ + failOn, + errorDelaySeconds: delaySeconds, + }); + + return await getKibanaMigratorTestKit({ + types: typeRegistry.getAllTypes(), + kibanaIndex: MAIN_SAVED_OBJECT_INDEX, + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + logFilePath, + clientWrapperFactory, + }); + }; + + beforeEach(async () => { + esServer = await startElasticsearch({ + dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'), + }); + }); + + describe('when the .kibana_task_manager migrator fails on the TRANSFORMED_DOCUMENTS_BULK_INDEX state, after the other ones have finished', () => { + it('is capable of completing the .kibana_task_manager migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.bulk({ index: '.kibana_task_manager_1' }) which supposedly causes + // the .kibana_task_manager migrator to fail on the TRANSFORMED_DOCUMENTS_BULK_INDEX state + return methodName === 'bulk' && methodArgs[0].index === '.kibana_task_manager_1'; + }, + delaySeconds: 90, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_task_manager] index. Error: esClient.bulk() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.bulk({ index: '.kibana_8.11.0_reindex_temp_alias' }) which supposedly causes + // the .kibana migrator to fail on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK + return ( + methodName === 'bulk' && + methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp_alias` + ); + }, + delaySeconds: 10, // give the .kibana_task_manager migrator enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.bulk() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the CLONE_TEMP_TO_TARGET state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes + // the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET + return ( + methodName === 'indices.clone' && + methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp` && + methodArgs[0].target === `.kibana_${currentVersion}_001` + ); + }, + delaySeconds: 15, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.indices.clone() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes + // the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes) + return ( + methodName === 'updateByQuery' && + methodArgs[0].index === `.kibana_${currentVersion}_001` + ); + }, + delaySeconds: 10, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.updateByQuery() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana_analytics migrator fails on the CLONE_TEMP_TO_TARGET state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes + // the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET + return ( + methodName === 'indices.clone' && + methodArgs[0].index === `.kibana_analytics_${currentVersion}_reindex_temp` && + methodArgs[0].target === `.kibana_analytics_${currentVersion}_001` + ); + }, + delaySeconds: 15, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.indices.clone() failed unexpectedly' + ); + } + }); + }); + + describe('when the .kibana_analytics migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => { + it('is capable of successfully performing the split migration in subsequent restart', async () => { + const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({ + logFilePath: logFilePathFirstRun, + failOn: (methodName, methodArgs) => { + // fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes + // the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes) + return ( + methodName === 'updateByQuery' && + methodArgs[0].index === `.kibana_analytics_${currentVersion}_001` + ); + }, + delaySeconds: 10, // give the other migrators enough time to finish before failing + }); + + try { + await firstRun(); + throw new Error('First run should have thrown an error but it did not'); + } catch (error) { + expect(error.message).toEqual( + 'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.updateByQuery() failed unexpectedly' + ); + } + }); + }); + + afterEach(async () => { + const { runMigrations: secondRun } = await getKibanaMigratorTestKit({ + types: typeRegistry.getAllTypes(), + logFilePath: logFilePathSecondRun, + kibanaIndex: MAIN_SAVED_OBJECT_INDEX, + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + }); + const results = await secondRun(); + expect( + results + .flat() + .every((result) => result.status === 'migrated' || result.status === 'patched') + ).toEqual(true); + + await esServer?.stop(); + await delay(2); + }); + }); + describe('when migrating from a legacy version', () => { let migratorTestKitFactory: (logFilePath: string) => Promise; diff --git a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts index a911fcdbdead5..1f6e9a7a58c77 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts @@ -50,6 +50,7 @@ import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; import type { NodeRoles } from '@kbn/core-node-server'; import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures'; import { delay } from './test_utils'; +import type { ElasticsearchClientWrapperFactory } from './elasticsearch_client_wrapper'; export const defaultLogFilePath = Path.join(__dirname, 'kibana_migrator_test_kit.log'); @@ -76,6 +77,7 @@ export interface KibanaMigratorTestKitParams { types?: Array>; defaultIndexTypesMap?: IndexTypesMap; logFilePath?: string; + clientWrapperFactory?: ElasticsearchClientWrapperFactory; } export interface KibanaMigratorTestKit { @@ -134,6 +136,7 @@ export const getKibanaMigratorTestKit = async ({ types = [], logFilePath = defaultLogFilePath, nodeRoles = defaultNodeRoles, + clientWrapperFactory, }: KibanaMigratorTestKitParams = {}): Promise => { let hasRun = false; const loggingSystem = new LoggingSystem(); @@ -145,7 +148,8 @@ export const getKibanaMigratorTestKit = async ({ const loggingConf = await firstValueFrom(configService.atPath('logging')); loggingSystem.upgrade(loggingConf); - const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const client = clientWrapperFactory ? clientWrapperFactory(rawClient) : rawClient; const typeRegistry = new SavedObjectTypeRegistry();