Skip to content

Commit

Permalink
[Migrations] Ensure individual migrator failures do not break consist…
Browse files Browse the repository at this point in the history
…ency (elastic#166924)

## Summary

Tackles elastic#158818

The goal of the PR is to introduce failures in single migrators at some
of the crucial steps of the migration, testing that consistency is
maintained, and that subsequent migration attempts can successfully
complete the upgrade.

This is done by _proxying_ the `Client` class, which uses the
elasticsearch-js library underneath to perform all calls to ES.
Inspired on elastic#158995.
  • Loading branch information
gsoldevila authored Sep 25, 2023
1 parent d8b8090 commit e4a088c
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import type { Client } from '@elastic/elasticsearch';

export type ElasticsearchClientWrapperFactory = (client: Client) => Client;

interface GetElasticsearchClientWrapperFactoryParams {
failOn: (methodName: string, methodArguments: any[]) => boolean;
errorDelaySeconds?: number;
}

export const getElasticsearchClientWrapperFactory = ({
failOn,
errorDelaySeconds,
}: GetElasticsearchClientWrapperFactoryParams): ElasticsearchClientWrapperFactory => {
const interceptClientMethod = (methodName: string, method: any): any => {
return new Proxy(method, {
apply: (applyTarget, thisArg, methodArguments) => {
if (failOn(methodName, methodArguments)) {
return new Promise((_, reject) =>
setTimeout(
() => reject(`Error: esClient.${methodName}() failed unexpectedly`),
(errorDelaySeconds || 0) * 1000
)
);
}
return Reflect.apply(applyTarget, thisArg, methodArguments);
},
});
};

const interceptClientApi = (apiName: string, api: any): any =>
new Proxy(api, {
get(target, prop) {
return typeof target[prop] === 'function'
? interceptClientMethod(`${apiName}.${prop.toString()}`, target[prop])
: target[prop];
},
});

const wrapClient = (client: Client): any =>
new Proxy(client, {
get(target, prop, receiver) {
switch (prop) {
// intercept top level esClient methods
case 'bulk':
case 'deleteByQuery':
case 'info':
case 'search':
case 'updateByQuery':
const clientMethod = Reflect.get(target, prop, receiver);
return interceptClientMethod(prop, clientMethod);
// intercept esClient APIs
case 'cluster':
case 'indices':
case 'tasks':
const clientApi = Reflect.get(target, prop, receiver);
return interceptClientApi(prop, clientApi);
// proxy child Clients too
case 'child':
return new Proxy(target[prop], {
apply(applyTarget, thisArg, argArray) {
const childClient = Reflect.apply(applyTarget, thisArg, argArray);
return wrapClient(childClient);
},
});
// do NOT intercept the rest of properties and symbols
default:
return Reflect.get(target, prop, receiver);
}
},
});

return wrapClient;
};
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from '../kibana_migrator_test_kit';
import { delay, parseLogFile } from '../test_utils';
import '../jest_matchers';
import { getElasticsearchClientWrapperFactory } from '../elasticsearch_client_wrapper';

// define a type => index distribution
const RELOCATE_TYPES: Record<string, string> = {
Expand Down Expand Up @@ -58,6 +59,210 @@ describe('split .kibana index into multiple system indices', () => {
await clearLog(logFilePathSecondRun);
});

describe('failure cases', () => {
const getFailingKibanaMigratorTestKit = async ({
logFilePath,
failOn,
delaySeconds,
}: {
logFilePath: string;
failOn: (methodName: string, methodArgs: any[]) => boolean;
delaySeconds?: number;
}) => {
const clientWrapperFactory = getElasticsearchClientWrapperFactory({
failOn,
errorDelaySeconds: delaySeconds,
});

return await getKibanaMigratorTestKit({
types: typeRegistry.getAllTypes(),
kibanaIndex: MAIN_SAVED_OBJECT_INDEX,
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
logFilePath,
clientWrapperFactory,
});
};

beforeEach(async () => {
esServer = await startElasticsearch({
dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'),
});
});

describe('when the .kibana_task_manager migrator fails on the TRANSFORMED_DOCUMENTS_BULK_INDEX state, after the other ones have finished', () => {
it('is capable of completing the .kibana_task_manager migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.bulk({ index: '.kibana_task_manager_1' }) which supposedly causes
// the .kibana_task_manager migrator to fail on the TRANSFORMED_DOCUMENTS_BULK_INDEX state
return methodName === 'bulk' && methodArgs[0].index === '.kibana_task_manager_1';
},
delaySeconds: 90, // give the other migrators enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana_task_manager] index. Error: esClient.bulk() failed unexpectedly'
);
}
});
});

describe('when the .kibana migrator fails on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK state', () => {
it('is capable of successfully performing the split migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.bulk({ index: '.kibana_8.11.0_reindex_temp_alias' }) which supposedly causes
// the .kibana migrator to fail on the REINDEX_SOURCE_TO_TEMP_INDEX_BULK
return (
methodName === 'bulk' &&
methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp_alias`
);
},
delaySeconds: 10, // give the .kibana_task_manager migrator enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.bulk() failed unexpectedly'
);
}
});
});

describe('when the .kibana migrator fails on the CLONE_TEMP_TO_TARGET state', () => {
it('is capable of successfully performing the split migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes
// the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET
return (
methodName === 'indices.clone' &&
methodArgs[0].index === `.kibana_${currentVersion}_reindex_temp` &&
methodArgs[0].target === `.kibana_${currentVersion}_001`
);
},
delaySeconds: 15, // give the other migrators enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.indices.clone() failed unexpectedly'
);
}
});
});

describe('when the .kibana migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => {
it('is capable of successfully performing the split migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes
// the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes)
return (
methodName === 'updateByQuery' &&
methodArgs[0].index === `.kibana_${currentVersion}_001`
);
},
delaySeconds: 10, // give the other migrators enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana] index. Error: esClient.updateByQuery() failed unexpectedly'
);
}
});
});

describe('when the .kibana_analytics migrator fails on the CLONE_TEMP_TO_TARGET state', () => {
it('is capable of successfully performing the split migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.indices.clone({ index: '.kibana_8.11.0_reindex_temp', target: ... }) which supposedly causes
// the .kibana migrator to fail on the CLONE_TEMP_TO_TARGET
return (
methodName === 'indices.clone' &&
methodArgs[0].index === `.kibana_analytics_${currentVersion}_reindex_temp` &&
methodArgs[0].target === `.kibana_analytics_${currentVersion}_001`
);
},
delaySeconds: 15, // give the other migrators enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.indices.clone() failed unexpectedly'
);
}
});
});

describe('when the .kibana_analytics migrator fails on the UPDATE_TARGET_MAPPINGS_PROPERTIES state', () => {
it('is capable of successfully performing the split migration in subsequent restart', async () => {
const { runMigrations: firstRun } = await getFailingKibanaMigratorTestKit({
logFilePath: logFilePathFirstRun,
failOn: (methodName, methodArgs) => {
// fail on esClient.updateByQuery({ index: '.kibana_8.11.0_001' }) which supposedly causes
// the .kibana migrator to fail on the UPDATE_TARGET_MAPPINGS_PROPERTIES (pickup mappings' changes)
return (
methodName === 'updateByQuery' &&
methodArgs[0].index === `.kibana_analytics_${currentVersion}_001`
);
},
delaySeconds: 10, // give the other migrators enough time to finish before failing
});

try {
await firstRun();
throw new Error('First run should have thrown an error but it did not');
} catch (error) {
expect(error.message).toEqual(
'Unable to complete saved object migrations for the [.kibana_analytics] index. Error: esClient.updateByQuery() failed unexpectedly'
);
}
});
});

afterEach(async () => {
const { runMigrations: secondRun } = await getKibanaMigratorTestKit({
types: typeRegistry.getAllTypes(),
logFilePath: logFilePathSecondRun,
kibanaIndex: MAIN_SAVED_OBJECT_INDEX,
defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP,
});
const results = await secondRun();
expect(
results
.flat()
.every((result) => result.status === 'migrated' || result.status === 'patched')
).toEqual(true);

await esServer?.stop();
await delay(2);
});
});

describe('when migrating from a legacy version', () => {
let migratorTestKitFactory: (logFilePath: string) => Promise<KibanaMigratorTestKit>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { NodeRoles } from '@kbn/core-node-server';
import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures';
import { delay } from './test_utils';
import type { ElasticsearchClientWrapperFactory } from './elasticsearch_client_wrapper';

export const defaultLogFilePath = Path.join(__dirname, 'kibana_migrator_test_kit.log');

Expand All @@ -76,6 +77,7 @@ export interface KibanaMigratorTestKitParams {
types?: Array<SavedObjectsType<any>>;
defaultIndexTypesMap?: IndexTypesMap;
logFilePath?: string;
clientWrapperFactory?: ElasticsearchClientWrapperFactory;
}

export interface KibanaMigratorTestKit {
Expand Down Expand Up @@ -134,6 +136,7 @@ export const getKibanaMigratorTestKit = async ({
types = [],
logFilePath = defaultLogFilePath,
nodeRoles = defaultNodeRoles,
clientWrapperFactory,
}: KibanaMigratorTestKitParams = {}): Promise<KibanaMigratorTestKit> => {
let hasRun = false;
const loggingSystem = new LoggingSystem();
Expand All @@ -145,7 +148,8 @@ export const getKibanaMigratorTestKit = async ({
const loggingConf = await firstValueFrom(configService.atPath<LoggingConfigType>('logging'));
loggingSystem.upgrade(loggingConf);

const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const client = clientWrapperFactory ? clientWrapperFactory(rawClient) : rawClient;

const typeRegistry = new SavedObjectTypeRegistry();

Expand Down

0 comments on commit e4a088c

Please sign in to comment.