Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test migrations success even on temporary ES failures #158995

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
type MigrationResult,
type IndexTypesMap,
} from '@kbn/core-saved-objects-base-server-internal';
import type { State } from './state';
import { buildActiveMappings, buildTypesMappings } from './core';
import { DocumentMigrator } from './document_migrator';
import { runZeroDowntimeMigration } from './zdt';
Expand Down Expand Up @@ -72,6 +73,7 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly waitForMigrationCompletion: boolean;
private readonly nodeRoles: NodeRoles;
public readonly kibanaVersion: string;
public readonly stateStatus$: BehaviorSubject<State | null>;

/**
* Creates an instance of KibanaMigrator.
Expand Down Expand Up @@ -109,6 +111,7 @@ export class KibanaMigrator implements IKibanaMigrator {
// operation so we cache the result
this.activeMappings = buildActiveMappings(this.mappingProperties);
this.docLinks = docLinks;
this.stateStatus$ = new BehaviorSubject<State | null>(null);
}

public runMigrations({ rerun = false }: { rerun?: boolean } = {}): Promise<MigrationResult[]> {
Expand Down Expand Up @@ -138,6 +141,10 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.status$.asObservable();
}

public getStateStatus$() {
return this.stateStatus$.asObservable();
}

private runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
Expand Down Expand Up @@ -167,6 +174,7 @@ export class KibanaMigrator implements IKibanaMigrator {
elasticsearchClient: this.client,
mappingProperties: this.mappingProperties,
waitForMigrationCompletion: this.waitForMigrationCompletion,
stateStatus$: this.stateStatus$,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import * as Option from 'fp-ts/lib/Option';
import * as Rx from 'rxjs';
import { omit } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { Defer } from './kibana_migrator_utils';
Expand Down Expand Up @@ -273,14 +274,17 @@ export const next = (
client: ElasticsearchClient,
transformRawDocs: TransformRawDocs,
readyToReindex: Defer<void>,
doneReindexing: Defer<void>
doneReindexing: Defer<void>,
stateStatus$?: Rx.BehaviorSubject<State | null>
) => {
const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing);
return (state: State) => {
const delay = createDelayFn(state);

if (state.controlState === 'DONE' || state.controlState === 'FATAL') {
// Return null if we're in one of the terminating states
stateStatus$?.next(state);
stateStatus$?.complete();
return null;
} else {
// Otherwise return the delayed action
Expand All @@ -290,6 +294,7 @@ export const next = (
const nextAction = map[state.controlState] as (
state: State
) => ReturnType<typeof map[AllActionStates]>;
stateStatus$?.next(state);
return delay(nextAction(state));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import * as Rx from 'rxjs';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsMigrationVersion } from '@kbn/core-saved-objects-common';
import type { ISavedObjectTypeRegistry } from '@kbn/core-saved-objects-server';
Expand Down Expand Up @@ -58,6 +59,7 @@ export interface RunResilientMigratorParams {
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
stateStatus$?: Rx.BehaviorSubject<State | null>;
}

/**
Expand All @@ -83,6 +85,7 @@ export async function runResilientMigrator({
migrationsConfig,
typeRegistry,
docLinks,
stateStatus$,
}: RunResilientMigratorParams): Promise<MigrationResult> {
const initialState = createInitialState({
kibanaVersion,
Expand All @@ -103,7 +106,7 @@ export async function runResilientMigrator({
return migrationStateActionMachine({
initialState,
logger,
next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing),
next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing, stateStatus$),
model,
abort: async (state?: State) => {
// At this point, we could reject this migrator's defers and unblock other migrators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import type { Logger } from '@kbn/logging';
import { BehaviorSubject } from 'rxjs';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type {
Expand All @@ -21,6 +22,7 @@ import type {
SavedObjectsTypeMappingDefinitions,
} from '@kbn/core-saved-objects-base-server-internal';
import Semver from 'semver';
import type { State } from './state';
import type { DocumentMigrator } from './document_migrator';
import { buildActiveMappings, createIndexMap } from './core';
import {
Expand Down Expand Up @@ -56,6 +58,8 @@ export interface RunV2MigrationOpts {
mappingProperties: SavedObjectsTypeMappingDefinitions;
/** Tells whether this instance should actively participate in the migration or not */
waitForMigrationCompletion: boolean;
/** Subject that emits the current active step in the migrator state machine */
stateStatus$?: BehaviorSubject<State | null>;
}

export const runV2Migration = async (options: RunV2MigrationOpts): Promise<MigrationResult[]> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const createContext = ({
typeRegistry,
serializer,
nodeRoles,
stateStatus$,
}: CreateContextOps): MigratorContext => {
return {
migrationConfig,
Expand All @@ -44,5 +45,6 @@ export const createContext = ({
batchSize: migrationConfig.batchSize,
discardCorruptObjects: Boolean(migrationConfig.discardCorruptObjects),
nodeRoles,
stateStatus$,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import type {
SavedObjectsMigrationConfigType,
} from '@kbn/core-saved-objects-base-server-internal';
import type { DocLinks } from '@kbn/doc-links';
import type { BehaviorSubject } from 'rxjs';
import { VersionedTransformer } from '../../document_migrator';
import type { State } from '../state';

/**
* The set of static, precomputed values and services used by the ZDT migration
Expand Down Expand Up @@ -53,4 +55,6 @@ export interface MigratorContext {
readonly discardCorruptObjects: boolean;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
/** Subject that emits the current active step in the migrator state machine */
stateStatus$?: BehaviorSubject<State | null>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import type {
} from '@kbn/core-saved-objects-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { BehaviorSubject } from 'rxjs';
import { NodeRoles } from '@kbn/core-node-server';
import { migrationStateActionMachine } from './migration_state_action_machine';
import type { VersionedTransformer } from '../document_migrator';
import { createContext } from './context';
import { next } from './next';
import { model } from './model';
import { createInitialState } from './state';
import { createInitialState, State } from './state';

export interface MigrateIndexOptions {
kibanaVersion: string;
Expand All @@ -45,6 +46,8 @@ export interface MigrateIndexOptions {
elasticsearchClient: ElasticsearchClient;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
/** Subject that emits the current active step in the migrator state machine */
stateStatus$?: BehaviorSubject<State | null>;
}

export const migrateIndex = async ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ export const next = (context: MigratorContext) => {

if (state.controlState === 'DONE' || state.controlState === 'FATAL') {
// Return null if we're in one of the terminating states
context.stateStatus$?.next(state);
context.stateStatus$?.complete();
return null;
} else {
// Otherwise return the delayed action
Expand All @@ -202,6 +204,7 @@ export const next = (context: MigratorContext) => {
const nextAction = map[state.controlState] as (
state: State
) => ReturnType<typeof map[AllActionStates]>;
context.stateStatus$?.next(state);
return delay(nextAction(state));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { NodeRoles } from '@kbn/core-node-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { BehaviorSubject } from 'rxjs';

import type {
ISavedObjectTypeRegistry,
ISavedObjectsSerializer,
Expand All @@ -18,6 +20,7 @@ import {
type SavedObjectsMigrationConfigType,
type MigrationResult,
} from '@kbn/core-saved-objects-base-server-internal';
import type { State } from './state';
import type { VersionedTransformer } from '../document_migrator';
import { buildMigratorConfigs } from './utils';
import { migrateIndex } from './migrate_index';
Expand All @@ -43,6 +46,8 @@ export interface RunZeroDowntimeMigrationOpts {
elasticsearchClient: ElasticsearchClient;
/** The node roles of the Kibana instance */
nodeRoles: NodeRoles;
/** Subject that emits the current active step in the migrator state machine */
stateStatus$?: BehaviorSubject<State | null>;
}

export const runZeroDowntimeMigration = async (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,64 @@ describe('split .kibana index into multiple system indices', () => {
await clearLog(logFilePath);
});

describe('failure cases', () => {
it.only('successfully performs the migrations even if a migrator fails', async () => {
esServer = await startElasticsearch({
dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'),
});

const updatedTypeRegistry = overrideTypeRegistry(
typeRegistry,
(type: SavedObjectsType<any>) => {
return {
...type,
indexPattern: RELOCATE_TYPES[type.name] ?? MAIN_SAVED_OBJECT_INDEX,
};
}
);

const migratorTestKitFactory = ({ failAfterStep }: { failAfterStep?: string }) =>
getKibanaMigratorTestKit({
types: updatedTypeRegistry.getAllTypes(),
kibanaIndex: '.kibana',
logFilePath,
failAfterStep,
});

let firstRunFail = false;
let secondRunFail = false;

try {
const { runMigrations } = await migratorTestKitFactory({
failAfterStep: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
});
await runMigrations();
} catch (err) {
firstRunFail = true;
}

expect(firstRunFail).toBe(true);

try {
const { runMigrations } = await migratorTestKitFactory({});
const results = await runMigrations();
expect(
results
.flat()
.every((result) => result.status === 'migrated' || result.status === 'patched')
).toEqual(true);
} catch (err) {
secondRunFail = true;
}
expect(secondRunFail).toBe(false);
});

afterAll(async () => {
await esServer?.stop();
await delay(2);
});
});

describe('when migrating from a legacy version', () => {
let migratorTestKitFactory: () => Promise<KibanaMigratorTestKit>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import type { ISavedObjectsRepository } from '@kbn/core-saved-objects-api-server
import { getDocLinks, getDocLinksMeta } from '@kbn/doc-links';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { NodeRoles } from '@kbn/core-node-server';
import { Client } from '@elastic/elasticsearch';
import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures';
import { delay } from './test_utils';

Expand Down Expand Up @@ -76,6 +77,8 @@ export interface KibanaMigratorTestKitParams {
types?: Array<SavedObjectsType<any>>;
defaultIndexTypesMap?: IndexTypesMap;
logFilePath?: string;
esClientProxy?: (client: Client) => Client;
failAfterStep?: string;
}

export interface KibanaMigratorTestKit {
Expand Down Expand Up @@ -134,8 +137,12 @@ export const getKibanaMigratorTestKit = async ({
types = [],
logFilePath = defaultLogFilePath,
nodeRoles = defaultNodeRoles,
failAfterStep,
}: KibanaMigratorTestKitParams = {}): Promise<KibanaMigratorTestKit> => {
let hasRun = false;
let hasComplete = false;
let reachedTargetFailureStep = false;
let controlState: string | undefined;
const loggingSystem = new LoggingSystem();
const loggerFactory = loggingSystem.asLoggerFactory();

Expand All @@ -145,7 +152,35 @@ export const getKibanaMigratorTestKit = async ({
const loggingConf = await firstValueFrom(configService.atPath<LoggingConfigType>('logging'));
loggingSystem.upgrade(loggingConf);

const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const proxyClient = (rawClient: Client): Client => {
return new Proxy(rawClient, {
get(target, prop, receiver) {
if (!failAfterStep || !hasRun || hasComplete) {
return Reflect.get(target, prop, receiver);
}
if (reachedTargetFailureStep) {
throw new Error('SIMULATING ES CONNECTION ERROR');
}

switch (prop) {
case 'child': {
return new Proxy(Reflect.get(target, prop, receiver), {
apply(_target, _thisArg, argumentsList) {
const childClient = rawClient.child(argumentsList[0]);
return proxyClient(childClient);
},
});
}
default: {
return Reflect.get(target, prop, receiver);
}
}
},
});
};

const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const client = proxyClient(rawClient);

const typeRegistry = new SavedObjectTypeRegistry();

Expand All @@ -164,6 +199,16 @@ export const getKibanaMigratorTestKit = async ({
nodeRoles,
});

if (failAfterStep) {
const stateStatus = migrator.getStateStatus$();
stateStatus.subscribe((result) => {
controlState = result?.controlState;
if (controlState === failAfterStep) {
reachedTargetFailureStep = true;
}
});
}

const runMigrations = async () => {
if (hasRun) {
throw new Error('The test kit migrator can only be run once. Please instantiate it again.');
Expand All @@ -173,6 +218,7 @@ export const getKibanaMigratorTestKit = async ({
try {
return await migrator.runMigrations();
} finally {
hasComplete = true;
await loggingSystem.stop();
}
};
Expand Down