diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.ts index ac50e60027e2d..ac9591f11a499 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator.ts @@ -31,6 +31,7 @@ import { type MigrationResult, type IndexTypesMap, } from '@kbn/core-saved-objects-base-server-internal'; +import type { State } from './state'; import { buildActiveMappings, buildTypesMappings } from './core'; import { DocumentMigrator } from './document_migrator'; import { runZeroDowntimeMigration } from './zdt'; @@ -72,6 +73,7 @@ export class KibanaMigrator implements IKibanaMigrator { private readonly waitForMigrationCompletion: boolean; private readonly nodeRoles: NodeRoles; public readonly kibanaVersion: string; + public readonly stateStatus$: BehaviorSubject; /** * Creates an instance of KibanaMigrator. @@ -109,6 +111,7 @@ export class KibanaMigrator implements IKibanaMigrator { // operation so we cache the result this.activeMappings = buildActiveMappings(this.mappingProperties); this.docLinks = docLinks; + this.stateStatus$ = new BehaviorSubject(null); } public runMigrations({ rerun = false }: { rerun?: boolean } = {}): Promise { @@ -138,6 +141,10 @@ export class KibanaMigrator implements IKibanaMigrator { return this.status$.asObservable(); } + public getStateStatus$() { + return this.stateStatus$.asObservable(); + } + private runMigrationsInternal(): Promise { const migrationAlgorithm = this.soMigrationsConfig.algorithm; if (migrationAlgorithm === 'zdt') { @@ -167,6 +174,7 @@ export class KibanaMigrator implements IKibanaMigrator { elasticsearchClient: this.client, mappingProperties: this.mappingProperties, waitForMigrationCompletion: this.waitForMigrationCompletion, + stateStatus$: this.stateStatus$, }); } } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index 1b5a9fe99fe3a..004aab0780daf 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -7,6 +7,7 @@ */ import * as Option from 'fp-ts/lib/Option'; +import * as Rx from 'rxjs'; import { omit } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { Defer } from './kibana_migrator_utils'; @@ -273,7 +274,8 @@ export const next = ( client: ElasticsearchClient, transformRawDocs: TransformRawDocs, readyToReindex: Defer, - doneReindexing: Defer + doneReindexing: Defer, + stateStatus$?: Rx.BehaviorSubject ) => { const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing); return (state: State) => { @@ -281,6 +283,8 @@ export const next = ( if (state.controlState === 'DONE' || state.controlState === 'FATAL') { // Return null if we're in one of the terminating states + stateStatus$?.next(state); + stateStatus$?.complete(); return null; } else { // Otherwise return the delayed action @@ -290,6 +294,7 @@ export const next = ( const nextAction = map[state.controlState] as ( state: State ) => ReturnType; + stateStatus$?.next(state); return delay(nextAction(state)); } }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts index 88f6f57578492..f5ce3537b4176 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts @@ -8,6 +8,7 @@ import type { Logger } from '@kbn/logging'; import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; +import * as Rx from 'rxjs'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { SavedObjectsMigrationVersion } from '@kbn/core-saved-objects-common'; import type { ISavedObjectTypeRegistry } from '@kbn/core-saved-objects-server'; @@ -58,6 +59,7 @@ export interface RunResilientMigratorParams { migrationsConfig: SavedObjectsMigrationConfigType; typeRegistry: ISavedObjectTypeRegistry; docLinks: DocLinksServiceStart; + stateStatus$?: Rx.BehaviorSubject; } /** @@ -83,6 +85,7 @@ export async function runResilientMigrator({ migrationsConfig, typeRegistry, docLinks, + stateStatus$, }: RunResilientMigratorParams): Promise { const initialState = createInitialState({ kibanaVersion, @@ -103,7 +106,7 @@ export async function runResilientMigrator({ return migrationStateActionMachine({ initialState, logger, - next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing), + next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing, stateStatus$), model, abort: async (state?: State) => { // At this point, we could reject this migrator's defers and unblock other migrators diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts index c50a3c6997598..172f14f2c9b4e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts @@ -7,6 +7,7 @@ */ import type { Logger } from '@kbn/logging'; +import { BehaviorSubject } from 'rxjs'; import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { @@ -21,6 +22,7 @@ import type { SavedObjectsTypeMappingDefinitions, } from '@kbn/core-saved-objects-base-server-internal'; import Semver from 'semver'; +import type { State } from './state'; import type { DocumentMigrator } from './document_migrator'; import { buildActiveMappings, createIndexMap } from './core'; import { @@ -56,6 +58,8 @@ export interface RunV2MigrationOpts { mappingProperties: SavedObjectsTypeMappingDefinitions; /** Tells whether this instance should actively participate in the migration or not */ waitForMigrationCompletion: boolean; + /** Subject that emits the current active step in the migrator state machine */ + stateStatus$?: BehaviorSubject; } export const runV2Migration = async (options: RunV2MigrationOpts): Promise => { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts index 407966a7ba6e8..b3c986fc59a7f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/create_context.ts @@ -27,6 +27,7 @@ export const createContext = ({ typeRegistry, serializer, nodeRoles, + stateStatus$, }: CreateContextOps): MigratorContext => { return { migrationConfig, @@ -44,5 +45,6 @@ export const createContext = ({ batchSize: migrationConfig.batchSize, discardCorruptObjects: Boolean(migrationConfig.discardCorruptObjects), nodeRoles, + stateStatus$, }; }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts index 35edbb464ee17..280cd83fdd023 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/context/types.ts @@ -17,7 +17,9 @@ import type { SavedObjectsMigrationConfigType, } from '@kbn/core-saved-objects-base-server-internal'; import type { DocLinks } from '@kbn/doc-links'; +import type { BehaviorSubject } from 'rxjs'; import { VersionedTransformer } from '../../document_migrator'; +import type { State } from '../state'; /** * The set of static, precomputed values and services used by the ZDT migration @@ -53,4 +55,6 @@ export interface MigratorContext { readonly discardCorruptObjects: boolean; /** The node roles of the Kibana instance */ readonly nodeRoles: NodeRoles; + /** Subject that emits the current active step in the migrator state machine */ + stateStatus$?: BehaviorSubject; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/migrate_index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/migrate_index.ts index 33f094c765d3b..cf0e34999f44e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/migrate_index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/migrate_index.ts @@ -17,13 +17,14 @@ import type { } from '@kbn/core-saved-objects-server'; import type { Logger } from '@kbn/logging'; import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; +import type { BehaviorSubject } from 'rxjs'; import { NodeRoles } from '@kbn/core-node-server'; import { migrationStateActionMachine } from './migration_state_action_machine'; import type { VersionedTransformer } from '../document_migrator'; import { createContext } from './context'; import { next } from './next'; import { model } from './model'; -import { createInitialState } from './state'; +import { createInitialState, State } from './state'; export interface MigrateIndexOptions { kibanaVersion: string; @@ -45,6 +46,8 @@ export interface MigrateIndexOptions { elasticsearchClient: ElasticsearchClient; /** The node roles of the Kibana instance */ readonly nodeRoles: NodeRoles; + /** Subject that emits the current active step in the migrator state machine */ + stateStatus$?: BehaviorSubject; } export const migrateIndex = async ({ diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts index 17749104d18fe..7edf3796049ff 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/next.ts @@ -193,6 +193,8 @@ export const next = (context: MigratorContext) => { if (state.controlState === 'DONE' || state.controlState === 'FATAL') { // Return null if we're in one of the terminating states + context.stateStatus$?.next(state); + context.stateStatus$?.complete(); return null; } else { // Otherwise return the delayed action @@ -202,6 +204,7 @@ export const next = (context: MigratorContext) => { const nextAction = map[state.controlState] as ( state: State ) => ReturnType; + context.stateStatus$?.next(state); return delay(nextAction(state)); } }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/run_zdt_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/run_zdt_migration.ts index 8af4649711dbf..2be504d2fdbcd 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/run_zdt_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/zdt/run_zdt_migration.ts @@ -10,6 +10,8 @@ import type { Logger } from '@kbn/logging'; import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; import type { NodeRoles } from '@kbn/core-node-server'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import type { BehaviorSubject } from 'rxjs'; + import type { ISavedObjectTypeRegistry, ISavedObjectsSerializer, @@ -18,6 +20,7 @@ import { type SavedObjectsMigrationConfigType, type MigrationResult, } from '@kbn/core-saved-objects-base-server-internal'; +import type { State } from './state'; import type { VersionedTransformer } from '../document_migrator'; import { buildMigratorConfigs } from './utils'; import { migrateIndex } from './migrate_index'; @@ -43,6 +46,8 @@ export interface RunZeroDowntimeMigrationOpts { elasticsearchClient: ElasticsearchClient; /** The node roles of the Kibana instance */ nodeRoles: NodeRoles; + /** Subject that emits the current active step in the migrator state machine */ + stateStatus$?: BehaviorSubject; } export const runZeroDowntimeMigration = async ( diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index 626a89df410a8..f2eb920115845 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -55,6 +55,64 @@ describe('split .kibana index into multiple system indices', () => { await clearLog(logFilePath); }); + describe('failure cases', () => { + it.only('successfully performs the migrations even if a migrator fails', async () => { + esServer = await startElasticsearch({ + dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'), + }); + + const updatedTypeRegistry = overrideTypeRegistry( + typeRegistry, + (type: SavedObjectsType) => { + return { + ...type, + indexPattern: RELOCATE_TYPES[type.name] ?? MAIN_SAVED_OBJECT_INDEX, + }; + } + ); + + const migratorTestKitFactory = ({ failAfterStep }: { failAfterStep?: string }) => + getKibanaMigratorTestKit({ + types: updatedTypeRegistry.getAllTypes(), + kibanaIndex: '.kibana', + logFilePath, + failAfterStep, + }); + + let firstRunFail = false; + let secondRunFail = false; + + try { + const { runMigrations } = await migratorTestKitFactory({ + failAfterStep: 'TRANSFORMED_DOCUMENTS_BULK_INDEX', + }); + await runMigrations(); + } catch (err) { + firstRunFail = true; + } + + expect(firstRunFail).toBe(true); + + try { + const { runMigrations } = await migratorTestKitFactory({}); + const results = await runMigrations(); + expect( + results + .flat() + .every((result) => result.status === 'migrated' || result.status === 'patched') + ).toEqual(true); + } catch (err) { + secondRunFail = true; + } + expect(secondRunFail).toBe(false); + }); + + afterAll(async () => { + await esServer?.stop(); + await delay(2); + }); + }); + describe('when migrating from a legacy version', () => { let migratorTestKitFactory: () => Promise; diff --git a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts index 57258aef0916e..1cdf2cbf63222 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/kibana_migrator_test_kit.ts @@ -48,6 +48,7 @@ import type { ISavedObjectsRepository } from '@kbn/core-saved-objects-api-server import { getDocLinks, getDocLinksMeta } from '@kbn/doc-links'; import type { DocLinksServiceStart } from '@kbn/core-doc-links-server'; import type { NodeRoles } from '@kbn/core-node-server'; +import { Client } from '@elastic/elasticsearch'; import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures'; import { delay } from './test_utils'; @@ -76,6 +77,8 @@ export interface KibanaMigratorTestKitParams { types?: Array>; defaultIndexTypesMap?: IndexTypesMap; logFilePath?: string; + esClientProxy?: (client: Client) => Client; + failAfterStep?: string; } export interface KibanaMigratorTestKit { @@ -134,8 +137,12 @@ export const getKibanaMigratorTestKit = async ({ types = [], logFilePath = defaultLogFilePath, nodeRoles = defaultNodeRoles, + failAfterStep, }: KibanaMigratorTestKitParams = {}): Promise => { let hasRun = false; + let hasComplete = false; + let reachedTargetFailureStep = false; + let controlState: string | undefined; const loggingSystem = new LoggingSystem(); const loggerFactory = loggingSystem.asLoggerFactory(); @@ -145,7 +152,35 @@ export const getKibanaMigratorTestKit = async ({ const loggingConf = await firstValueFrom(configService.atPath('logging')); loggingSystem.upgrade(loggingConf); - const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const proxyClient = (rawClient: Client): Client => { + return new Proxy(rawClient, { + get(target, prop, receiver) { + if (!failAfterStep || !hasRun || hasComplete) { + return Reflect.get(target, prop, receiver); + } + if (reachedTargetFailureStep) { + throw new Error('SIMULATING ES CONNECTION ERROR'); + } + + switch (prop) { + case 'child': { + return new Proxy(Reflect.get(target, prop, receiver), { + apply(_target, _thisArg, argumentsList) { + const childClient = rawClient.child(argumentsList[0]); + return proxyClient(childClient); + }, + }); + } + default: { + return Reflect.get(target, prop, receiver); + } + } + }, + }); + }; + + const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion); + const client = proxyClient(rawClient); const typeRegistry = new SavedObjectTypeRegistry(); @@ -164,6 +199,16 @@ export const getKibanaMigratorTestKit = async ({ nodeRoles, }); + if (failAfterStep) { + const stateStatus = migrator.getStateStatus$(); + stateStatus.subscribe((result) => { + controlState = result?.controlState; + if (controlState === failAfterStep) { + reachedTargetFailureStep = true; + } + }); + } + const runMigrations = async () => { if (hasRun) { throw new Error('The test kit migrator can only be run once. Please instantiate it again.'); @@ -173,6 +218,7 @@ export const getKibanaMigratorTestKit = async ({ try { return await migrator.runMigrations(); } finally { + hasComplete = true; await loggingSystem.stop(); } };