From b539b1aadb28f142a2ff43893cdd155936f316b7 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 2 Jun 2023 17:07:02 +0200 Subject: [PATCH 1/8] Single updateAliases() for all migrators when relocating during upgrade --- .../src/actions/create_index.ts | 43 ++++---------- .../src/actions/synchronize_migrators.test.ts | 56 ++++++++----------- .../src/actions/synchronize_migrators.ts | 25 ++++++--- .../src/actions/update_aliases.ts | 13 +++-- .../src/kibana_migrator_utils.test.ts | 8 +-- .../src/kibana_migrator_utils.ts | 28 +++++++--- .../src/model/model.test.ts | 8 +-- .../src/model/model.ts | 19 ++++++- .../src/next.test.ts | 18 +++++- .../src/next.ts | 36 +++++++++--- .../src/run_resilient_migrator.test.ts | 7 ++- .../src/run_resilient_migrator.ts | 17 ++++-- .../src/run_v2_migration.test.ts | 32 ++++++----- .../src/run_v2_migration.ts | 26 ++++++--- .../src/state.ts | 9 +++ .../group3/dot_kibana_split.test.ts | 5 +- 16 files changed, 218 insertions(+), 132 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts index 399be22d6b678..1d492301f45bd 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable'; import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal'; -import type { AcknowledgeResponse } from '.'; import { catchRetryableEsClientErrors, type RetryableEsClientError, @@ -46,6 +45,9 @@ export interface CreateIndexParams { aliases?: string[]; timeout?: string; } + +export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists'; + /** * Creates an index with the given mappings * @@ -64,11 +66,11 @@ export const createIndex = ({ timeout = DEFAULT_TIMEOUT, }: CreateIndexParams): TaskEither.TaskEither< RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, - 'create_index_succeeded' + CreateIndexSuccessResponse > => { const createIndexTask: TaskEither.TaskEither< RetryableEsClientError | ClusterShardLimitExceeded, - AcknowledgeResponse + CreateIndexSuccessResponse > = () => { const aliasesObject = aliasArrayToRecord(aliases); @@ -103,31 +105,12 @@ export const createIndex = ({ }, }, }) - .then((res) => { - /** - * - acknowledged=false, we timed out before the cluster state was - * updated on all nodes with the newly created index, but it - * probably will be created sometime soon. - * - shards_acknowledged=false, we timed out before all shards were - * started - * - acknowledged=true, shards_acknowledged=true, index creation complete - */ - return Either.right({ - acknowledged: Boolean(res.acknowledged), - shardsAcknowledged: res.shards_acknowledged, - }); + .then(() => { + return Either.right('create_index_succeeded' as const); }) .catch((error) => { if (error?.body?.error?.type === 'resource_already_exists_exception') { - /** - * If the target index already exists it means a previous create - * operation had already been started. However, we can't be sure - * that all shards were started so return shardsAcknowledged: false - */ - return Either.right({ - acknowledged: true, - shardsAcknowledged: false, - }); + return Either.right('index_already_exists' as const); } else if (isClusterShardLimitExceeded(error?.body?.error)) { return Either.left({ type: 'cluster_shard_limit_exceeded' as const, @@ -143,11 +126,12 @@ export const createIndex = ({ createIndexTask, TaskEither.chain< RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, - AcknowledgeResponse, - 'create_index_succeeded' + CreateIndexSuccessResponse, + CreateIndexSuccessResponse >((res) => { // Systematicaly wait until the target index has a 'green' status meaning // the primary (and on multi node clusters) the replica has been started + // When the index status is 'green' we know that all shards were started // see https://github.com/elastic/kibana/issues/157968 return pipe( waitForIndexStatus({ @@ -156,10 +140,7 @@ export const createIndex = ({ timeout: DEFAULT_TIMEOUT, status: 'green', }), - TaskEither.map(() => { - /** When the index status is 'green' we know that all shards were started */ - return 'create_index_succeeded'; - }) + TaskEither.map(() => res) ); }) ); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts index a5a8e9c25f929..d90cdbc72583b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts @@ -6,38 +6,36 @@ * Side Public License, v 1. */ import { synchronizeMigrators } from './synchronize_migrators'; -import { type Defer, defer } from '../kibana_migrator_utils'; +import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils'; describe('synchronizeMigrators', () => { - let defers: Array>; - let allDefersPromise: Promise; - let migratorsDefers: Array>; + let waitGroups: Array>; + let allWaitGroupsPromise: Promise; + let migratorsWaitGroups: Array>; beforeEach(() => { jest.clearAllMocks(); - defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer); - allDefersPromise = Promise.all(defers.map(({ promise }) => promise)); + waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup); + allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise)); - migratorsDefers = defers.map(({ resolve, reject }) => ({ + migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({ resolve: jest.fn(resolve), reject: jest.fn(reject), - promise: allDefersPromise, + promise: allWaitGroupsPromise, })); }); describe('when all migrators reach the synchronization point with a correct state', () => { it('unblocks all migrators and resolves Right', async () => { - const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer)); + const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup })); const res = await Promise.all(tasks.map((task) => task())); - migratorsDefers.forEach((migratorDefer) => - expect(migratorDefer.resolve).toHaveBeenCalledTimes(1) - ); - migratorsDefers.forEach((migratorDefer) => - expect(migratorDefer.reject).not.toHaveBeenCalled() + migratorsWaitGroups.forEach((waitGroup) => + expect(waitGroup.resolve).toHaveBeenCalledTimes(1) ); + migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled()); expect(res).toEqual([ { _tag: 'Right', right: 'synchronized_successfully' }, @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => { it('migrators are not unblocked until the last one reaches the synchronization point', async () => { let resolved: number = 0; - migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved)); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved)); + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // we simulate that only kibana_task_manager and kibana migrators get to the sync point - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); // we don't await for them, or we would be locked forever Promise.all(tasks.map((task) => task())); @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => { expect(resolved).toEqual(0); // finally, the last migrator gets to the synchronization point - await synchronizeMigrators(casesDefer)(); + await synchronizeMigrators({ waitGroup: casesDefer })(); expect(resolved).toEqual(3); }); }); @@ -75,18 +71,16 @@ describe('synchronizeMigrators', () => { it('synchronizedMigrators resolves Left for the rest of migrators', async () => { let resolved: number = 0; let errors: number = 0; - migratorsDefers.forEach((migratorDefer) => - migratorDefer.promise.then(() => ++resolved).catch(() => ++errors) + migratorsWaitGroups.forEach((waitGroup) => + waitGroup.promise.then(() => ++resolved).catch(() => ++errors) ); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // we first make one random migrator fail and not reach the sync point casesDefer.reject('Oops. The cases migrator failed unexpectedly.'); // the other migrators then try to synchronize - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([ { @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => { it('synchronizedMigrators resolves Left for the rest of migrators', async () => { let resolved: number = 0; let errors: number = 0; - migratorsDefers.forEach((migratorDefer) => - migratorDefer.promise.then(() => ++resolved).catch(() => ++errors) + migratorsWaitGroups.forEach((waitGroup) => + waitGroup.promise.then(() => ++resolved).catch(() => ++errors) ); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // some migrators try to synchronize - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); // we then make one random migrator fail and not reach the sync point casesDefer.reject('Oops. The cases migrator failed unexpectedly.'); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts index 26763f1f51ae2..34389cf1057e6 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts @@ -8,20 +8,31 @@ import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; -import type { Defer } from '../kibana_migrator_utils'; +import type { WaitGroup } from '../kibana_migrator_utils'; export interface SyncFailed { type: 'sync_failed'; error: Error; } -export function synchronizeMigrators( - defer: Defer -): TaskEither.TaskEither { +export interface SynchronizeMigratorsParams { + waitGroup: WaitGroup; + thenHook?: (res: any) => Either.Right; + payload?: T; +} + +export function synchronizeMigrators({ + waitGroup, + payload, + thenHook = () => + Either.right( + 'synchronized_successfully' as const + ) as Either.Right<'synchronized_successfully'> as unknown as Either.Right, +}: SynchronizeMigratorsParams): TaskEither.TaskEither { return () => { - defer.resolve(); - return defer.promise - .then(() => Either.right('synchronized_successfully' as const)) + waitGroup.resolve(payload); + return waitGroup.promise + .then((res) => (thenHook ? thenHook(res) : res)) .catch((error) => Either.left({ type: 'sync_failed' as const, error })); }; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts index eeaadbb86306f..c3fa320437804 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts @@ -37,6 +37,13 @@ export interface UpdateAliasesParams { aliasActions: AliasAction[]; timeout?: string; } + +/** @internal */ +export type UpdateAliasesReturnType = TaskEither.TaskEither< + IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, + 'update_aliases_succeeded' +>; + /** * Calls the Update index alias API `_alias` with the provided alias actions. */ @@ -45,11 +52,9 @@ export const updateAliases = client, aliasActions, timeout = DEFAULT_TIMEOUT, - }: UpdateAliasesParams): TaskEither.TaskEither< - IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, - 'update_aliases_succeeded' - > => + }: UpdateAliasesParams): UpdateAliasesReturnType => () => { + if (!aliasActions || !aliasActions.length) throw Error('updating NO aliases!'); return client.indices .updateAliases({ actions: aliasActions, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts index 02eb3dcdaadc1..6921e7392fa1f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts @@ -17,16 +17,16 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; import { loggerMock } from '@kbn/logging-mocks'; import { calculateTypeStatuses, - createMultiPromiseDefer, + createWaitGroupMap, getCurrentIndexTypesMap, getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, } from './kibana_migrator_utils'; import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures'; -describe('createMultiPromiseDefer', () => { +describe('createWaitGroupMap', () => { it('creates defer objects with the same Promise', () => { - const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']); + const defers = createWaitGroupMap(['.kibana', '.kibana_cases']); expect(Object.keys(defers)).toHaveLength(2); expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise); expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve); @@ -34,7 +34,7 @@ describe('createMultiPromiseDefer', () => { }); it('the common Promise resolves when all defers resolve', async () => { - const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']); + const defers = createWaitGroupMap(['.kibana', '.kibana_cases']); let resolved = 0; Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved)); defers['.kibana'].resolve(); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts index 36d17cd1159e9..04bb151f818e3 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts @@ -15,8 +15,8 @@ import { TypeStatus, type TypeStatusDetails } from './kibana_migrator_constants' // even though this utility class is present in @kbn/kibana-utils-plugin, we can't easily import it from Core // aka. one does not simply reuse code -export class Defer { - public resolve!: (data: T) => void; +class Defer { + public resolve!: (data?: T) => void; public reject!: (error: any) => void; public promise: Promise = new Promise((resolve, reject) => { (this as any).resolve = resolve; @@ -24,12 +24,26 @@ export class Defer { }); } -export const defer = () => new Defer(); +export type WaitGroup = Defer; -export function createMultiPromiseDefer(indices: string[]): Record> { - const defers: Array> = indices.map(defer); - const all = Promise.all(defers.map(({ promise }) => promise)); - return indices.reduce>>((acc, indexName, i) => { +export function waitGroup(): WaitGroup { + return new Defer(); +} + +export function createWaitGroupMap( + keys: string[], + thenHook: (res: T[]) => U = (res) => res as unknown as U +): Record> { + if (!keys?.length) { + return {}; + } + + const defers: Array> = keys.map(() => waitGroup()); + + // every member of the WaitGroup will wait for all members to resolve + const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook); + + return keys.reduce>>((acc, indexName, i) => { const { resolve, reject } = defers[i]; acc[indexName] = { resolve, reject, promise: all }; return acc; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index 6390edd9b04cb..76b998042910f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -2970,10 +2970,10 @@ describe('migrations v2 model', () => { sourceIndex: Option.none as Option.None, targetIndex: '.kibana_7.11.0_001', }; - test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY', () => { + test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS', () => { const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded'); const newState = model(createNewTargetState, res); - expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY'); + expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -2987,7 +2987,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(1); expect(newState.retryDelay).toEqual(2000); }); - test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY resets the retry count and delay', () => { + test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS resets the retry count and delay', () => { const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded'); const testState = { ...createNewTargetState, @@ -2996,7 +2996,7 @@ describe('migrations v2 model', () => { }; const newState = model(testState, res); - expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY'); + expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 915fe58f6e448..8172f3b9e7a8c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -1458,7 +1458,9 @@ export const model = (currentState: State, resW: ResponseType): // index. return { ...stateP, - controlState: 'MARK_VERSION_INDEX_READY', + controlState: stateP.mustRelocateDocuments + ? 'MARK_VERSION_INDEX_READY_SYNC' + : 'MARK_VERSION_INDEX_READY', versionIndexReadyActions: stateP.versionIndexReadyActions, }; } else { @@ -1474,6 +1476,16 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'CREATE_NEW_TARGET') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + if (res.right === 'index_already_exists') { + // We were supposed to be on a "fresh deployment" state (we did not find any aliases) + // but the target index already exists. Assume it can be from a previous upgrade attempt that: + // - managed to clone ..._reindex_temp into target + // - but did NOT finish the process (aka did not get to update the index aliases) + return { + ...stateP, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT', + }; + } return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY', @@ -1503,7 +1515,10 @@ export const model = (currentState: State, resW: ResponseType): // left responses to handle here. throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'MARK_VERSION_INDEX_READY') { + } else if ( + stateP.controlState === 'MARK_VERSION_INDEX_READY' || + stateP.controlState === 'MARK_VERSION_INDEX_READY_SYNC' + ) { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { return { ...stateP, controlState: 'DONE' }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts index f7cabfb6e42db..c57cf1f7f0705 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts @@ -7,7 +7,7 @@ */ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import { defer } from './kibana_migrator_utils'; +import { waitGroup } from './kibana_migrator_utils'; import { next } from './next'; import type { State } from './state'; @@ -15,12 +15,24 @@ describe('migrations v2 next', () => { it.todo('when state.retryDelay > 0 delays execution of the next action'); it('DONE returns null', () => { const state = { controlState: 'DONE' } as State; - const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state); + const action = next( + {} as ElasticsearchClient, + (() => {}) as any, + waitGroup(), + waitGroup(), + waitGroup() + )(state); expect(action).toEqual(null); }); it('FATAL returns null', () => { const state = { controlState: 'FATAL', reason: '' } as State; - const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state); + const action = next( + {} as ElasticsearchClient, + (() => {}) as any, + waitGroup(), + waitGroup(), + waitGroup() + )(state); expect(action).toEqual(null); }); }); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index 1b5a9fe99fe3a..86b6e7bb67179 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -9,7 +9,7 @@ import * as Option from 'fp-ts/lib/Option'; import { omit } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import type { Defer } from './kibana_migrator_utils'; +import type { WaitGroup } from './kibana_migrator_utils'; import type { AllActionStates, CalculateExcludeFiltersState, @@ -72,8 +72,9 @@ export type ResponseType = Awaited< export const nextActionMap = ( client: ElasticsearchClient, transformRawDocs: TransformRawDocs, - readyToReindex: Defer, - doneReindexing: Defer + readyToReindex: WaitGroup, + doneReindexing: WaitGroup, + updateRelocationAliases: WaitGroup ) => { return { INIT: (state: InitState) => @@ -141,7 +142,10 @@ export const nextActionMap = ( indexName: state.tempIndex, mappings: state.tempIndexMappings, }), - READY_TO_REINDEX_SYNC: () => Actions.synchronizeMigrators(readyToReindex), + READY_TO_REINDEX_SYNC: () => + Actions.synchronizeMigrators({ + waitGroup: readyToReindex, + }), REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) => Actions.openPit({ client, index: state.sourceIndex.value }), REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) => @@ -174,7 +178,10 @@ export const nextActionMap = ( */ refresh: false, }), - DONE_REINDEXING_SYNC: () => Actions.synchronizeMigrators(doneReindexing), + DONE_REINDEXING_SYNC: () => + Actions.synchronizeMigrators({ + waitGroup: doneReindexing, + }), SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) => Actions.setWriteBlock({ client, index: state.tempIndex }), CLONE_TEMP_TO_TARGET: (state: CloneTempToTarget) => @@ -242,6 +249,12 @@ export const nextActionMap = ( }), MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) => Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }), + MARK_VERSION_INDEX_READY_SYNC: (state: MarkVersionIndexReady) => + Actions.synchronizeMigrators({ + waitGroup: updateRelocationAliases, + payload: state.versionIndexReadyActions.value, + thenHook: (res) => res, + }), MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) => Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }), LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) => @@ -272,10 +285,17 @@ export const nextActionMap = ( export const next = ( client: ElasticsearchClient, transformRawDocs: TransformRawDocs, - readyToReindex: Defer, - doneReindexing: Defer + readyToReindex: WaitGroup, + doneReindexing: WaitGroup, + updateRelocationAliases: WaitGroup ) => { - const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing); + const map = nextActionMap( + client, + transformRawDocs, + readyToReindex, + doneReindexing, + updateRelocationAliases + ); return (state: State) => { const delay = createDelayFn(state); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts index 280b8fd08f6cf..56172384dcdcd 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts @@ -13,7 +13,7 @@ import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-m import { loggingSystemMock } from '@kbn/core-logging-server-mocks'; import type { MigrationResult } from '@kbn/core-saved-objects-base-server-internal'; import { createInitialState } from './initial_state'; -import { Defer } from './kibana_migrator_utils'; +import { waitGroup } from './kibana_migrator_utils'; import { migrationStateActionMachine } from './migrations_state_action_machine'; import { next } from './next'; import { runResilientMigrator, type RunResilientMigratorParams } from './run_resilient_migrator'; @@ -128,8 +128,9 @@ const mockOptions = (): RunResilientMigratorParams => { }, }, }, - readyToReindex: new Defer(), - doneReindexing: new Defer(), + readyToReindex: waitGroup(), + doneReindexing: waitGroup(), + updateRelocationAliases: waitGroup(), logger, transformRawDocs: jest.fn(), preMigrationScript: "ctx._id = ctx._source.type + ':' + ctx._id", diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts index 88f6f57578492..90bf2d2454fda 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts @@ -17,7 +17,7 @@ import type { MigrationResult, IndexTypesMap, } from '@kbn/core-saved-objects-base-server-internal'; -import type { Defer } from './kibana_migrator_utils'; +import type { WaitGroup } from './kibana_migrator_utils'; import type { TransformRawDocs } from './types'; import { next } from './next'; import { model } from './model'; @@ -25,6 +25,7 @@ import { createInitialState } from './initial_state'; import { migrationStateActionMachine } from './migrations_state_action_machine'; import { cleanup } from './migrations_state_machine_cleanup'; import type { State } from './state'; +import type { AliasAction } from './actions'; /** * To avoid the Elasticsearch-js client aborting our requests before we @@ -48,8 +49,9 @@ export interface RunResilientMigratorParams { indexTypesMap: IndexTypesMap; targetMappings: IndexMapping; preMigrationScript?: string; - readyToReindex: Defer; - doneReindexing: Defer; + readyToReindex: WaitGroup; + doneReindexing: WaitGroup; + updateRelocationAliases: WaitGroup; logger: Logger; transformRawDocs: TransformRawDocs; coreMigrationVersionPerType: SavedObjectsMigrationVersion; @@ -76,6 +78,7 @@ export async function runResilientMigrator({ preMigrationScript, readyToReindex, doneReindexing, + updateRelocationAliases, transformRawDocs, coreMigrationVersionPerType, migrationVersionPerType, @@ -103,7 +106,13 @@ export async function runResilientMigrator({ return migrationStateActionMachine({ initialState, logger, - next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing), + next: next( + migrationClient, + transformRawDocs, + readyToReindex, + doneReindexing, + updateRelocationAliases + ), model, abort: async (state?: State) => { // At this point, we could reject this migrator's defers and unblock other migrators diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts index 22d62307aacf8..a785ff46823e7 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts @@ -22,8 +22,8 @@ import { buildTypesMappings, createIndexMap } from './core'; import { getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, - createMultiPromiseDefer, - Defer, + createWaitGroupMap, + waitGroup, } from './kibana_migrator_utils'; import { runResilientMigrator } from './run_resilient_migrator'; import { indexTypesMapMock, savedObjectTypeRegistryMock } from './run_resilient_migrator.fixtures'; @@ -41,7 +41,7 @@ jest.mock('./kibana_migrator_utils', () => { return { ...actual, indexMapToIndexTypesMap: jest.fn(actual.indexMapToIndexTypesMap), - createMultiPromiseDefer: jest.fn(actual.createMultiPromiseDefer), + createWaitGroupMap: jest.fn(actual.createWaitGroupMap), getIndicesInvolvedInRelocation: jest.fn(() => Promise.resolve(['.my_index', '.other_index'])), }; }); @@ -79,9 +79,7 @@ const mockCreateIndexMap = createIndexMap as jest.MockedFunction; -const mockCreateMultiPromiseDefer = createMultiPromiseDefer as jest.MockedFunction< - typeof createMultiPromiseDefer ->; +const mockCreateWaitGroupMap = createWaitGroupMap as jest.MockedFunction; const mockGetIndicesInvolvedInRelocation = getIndicesInvolvedInRelocation as jest.MockedFunction< typeof getIndicesInvolvedInRelocation >; @@ -93,7 +91,7 @@ describe('runV2Migration', () => { beforeEach(() => { mockCreateIndexMap.mockClear(); mockIndexMapToIndexTypesMap.mockClear(); - mockCreateMultiPromiseDefer.mockClear(); + mockCreateWaitGroupMap.mockClear(); mockGetIndicesInvolvedInRelocation.mockClear(); mockRunResilientMigrator.mockClear(); }); @@ -143,9 +141,14 @@ describe('runV2Migration', () => { const options = mockOptions(); options.documentMigrator.prepareMigrations(); await runV2Migration(options); - expect(createMultiPromiseDefer).toBeCalledTimes(2); - expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']); - expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toBeCalledTimes(3); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith( + 3, + ['.my_index', '.other_index'], + expect.any(Function) // we expect to receive a method to update all aliases in this hook + ); }); it('calls runResilientMigrator for each migrator it must spawn', async () => { @@ -168,6 +171,7 @@ describe('runV2Migration', () => { mustRelocateDocuments: true, readyToReindex: expect.any(Object), doneReindexing: expect.any(Object), + updateRelocationAliases: expect.any(Object), }) ); expect(runResilientMigrator).toHaveBeenNthCalledWith( @@ -178,6 +182,7 @@ describe('runV2Migration', () => { mustRelocateDocuments: true, readyToReindex: expect.any(Object), doneReindexing: expect.any(Object), + updateRelocationAliases: expect.any(Object), }) ); expect(runResilientMigrator).toHaveBeenNthCalledWith( @@ -188,14 +193,15 @@ describe('runV2Migration', () => { mustRelocateDocuments: false, readyToReindex: undefined, doneReindexing: undefined, + updateRelocationAliases: undefined, }) ); }); it('awaits on all runResilientMigrator promises, and resolves with the results of each of them', async () => { - const myIndexMigratorDefer = new Defer(); - const otherIndexMigratorDefer = new Defer(); - const taskIndexMigratorDefer = new Defer(); + const myIndexMigratorDefer = waitGroup(); + const otherIndexMigratorDefer = waitGroup(); + const taskIndexMigratorDefer = waitGroup(); let migrationResults: MigrationResult[] | undefined; mockRunResilientMigrator.mockReturnValueOnce(myIndexMigratorDefer.promise); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts index c50a3c6997598..29367153caf8c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts @@ -24,12 +24,14 @@ import Semver from 'semver'; import type { DocumentMigrator } from './document_migrator'; import { buildActiveMappings, createIndexMap } from './core'; import { - createMultiPromiseDefer, + createWaitGroupMap, getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, } from './kibana_migrator_utils'; import { runResilientMigrator } from './run_resilient_migrator'; import { migrateRawDocsSafely } from './core/migrate_raw_docs'; +import type { AliasAction } from './actions/update_aliases'; +import { updateAliases } from './actions'; export interface RunV2MigrationOpts { /** The current Kibana version */ @@ -93,13 +95,21 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise>( + indicesWithMovingTypes, + (allAliasActions) => + updateAliases({ + client: options.elasticsearchClient, + aliasActions: allAliasActions.flat(), + })() + ); // build a list of all migrators that must be started const migratorIndices = new Set(Object.keys(indexMap)); @@ -110,10 +120,11 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise { return { migrate: (): Promise => { - const readyToReindex = readyToReindexDefers[indexName]; - const doneReindexing = doneReindexingDefers[indexName]; + const readyToReindex = readyToReindexWaitGroupMap[indexName]; + const doneReindexing = doneReindexingWaitGroupMap[indexName]; + const updateRelocationAliases = updateAliasesWaitGroupMap[indexName]; // check if this migrator's index is involved in some document redistribution - const mustRelocateDocuments = !!readyToReindex; + const mustRelocateDocuments = indicesWithMovingTypes.includes(indexName); return runResilientMigrator({ client: options.elasticsearchClient, @@ -127,6 +138,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise migrateRawDocsSafely({ serializer: options.serializer, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts index 8a6be0269947e..0f9ec48e1603e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts @@ -453,6 +453,14 @@ export interface MarkVersionIndexReady extends PostInitState { readonly versionIndexReadyActions: Option.Some; } +export interface MarkVersionIndexReadySync extends PostInitState { + /** Single "client.indices.updateAliases" operation + * to update multiple indices' aliases simultaneously + * */ + readonly controlState: 'MARK_VERSION_INDEX_READY_SYNC'; + readonly versionIndexReadyActions: Option.Some; +} + export interface MarkVersionIndexReadyConflict extends PostInitState { /** * If the MARK_VERSION_INDEX_READY step fails another instance was @@ -535,6 +543,7 @@ export type State = Readonly< | LegacyReindexWaitForTaskState | LegacySetWriteBlockState | MarkVersionIndexReady + | MarkVersionIndexReadySync | MarkVersionIndexReadyConflict | OutdatedDocumentsRefresh | OutdatedDocumentsSearchClosePit diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index 626a89df410a8..72fc1aad81e3e 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -376,8 +376,8 @@ describe('split .kibana index into multiple system indices', () => { `[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK.`, `[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META.`, `[${index}] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.`, - `[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY.`, - `[${index}] MARK_VERSION_INDEX_READY -> DONE.`, + `[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC.`, + `[${index}] MARK_VERSION_INDEX_READY_SYNC -> DONE.`, `[${index}] Migration completed after`, ], { ordered: true } @@ -395,7 +395,6 @@ describe('split .kibana index into multiple system indices', () => { const { runMigrations } = await migratorTestKitFactory(); await clearLog(logFilePath); await runMigrations(); - const logs = await parseLogFile(logFilePath); expect(logs).not.toContainLogEntries(['REINDEX', 'CREATE', 'UPDATE_TARGET_MAPPINGS']); }); From 4d2619fbcd538e76edd95981c7393205c71c900a Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 2 Jun 2023 17:42:23 +0200 Subject: [PATCH 2/8] Fix typeCheck and UT issues --- .../src/actions/index.ts | 4 +- .../src/actions/synchronize_migrators.test.ts | 8 ++-- .../src/actions/synchronize_migrators.ts | 10 +++-- .../src/model/model.test.ts | 4 +- .../src/model/model.ts | 45 +++++++++++++------ 5 files changed, 47 insertions(+), 24 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts index dbe61920b31b3..5af2471a7f72e 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts @@ -106,7 +106,8 @@ export { import type { UnknownDocsFound } from './check_for_unknown_docs'; import type { IncompatibleClusterRoutingAllocation } from './initialize_action'; -import { ClusterShardLimitExceeded } from './create_index'; +import type { ClusterShardLimitExceeded } from './create_index'; +import type { SynchronizationFailed } from './synchronize_migrators'; export type { CheckForUnknownDocsParams, @@ -174,6 +175,7 @@ export interface ActionErrorTypeMap { index_not_yellow_timeout: IndexNotYellowTimeout; cluster_shard_limit_exceeded: ClusterShardLimitExceeded; es_response_too_large: EsResponseTooLargeError; + synchronization_failed: SynchronizationFailed; } /** diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts index d90cdbc72583b..596e17a36b98b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts @@ -86,14 +86,14 @@ describe('synchronizeMigrators', () => { { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, @@ -125,14 +125,14 @@ describe('synchronizeMigrators', () => { { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts index 34389cf1057e6..50090293ee1e2 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts @@ -10,11 +10,13 @@ import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; import type { WaitGroup } from '../kibana_migrator_utils'; -export interface SyncFailed { - type: 'sync_failed'; +/** @internal */ +export interface SynchronizationFailed { + type: 'synchronization_failed'; error: Error; } +/** @internal */ export interface SynchronizeMigratorsParams { waitGroup: WaitGroup; thenHook?: (res: any) => Either.Right; @@ -28,11 +30,11 @@ export function synchronizeMigrators({ Either.right( 'synchronized_successfully' as const ) as Either.Right<'synchronized_successfully'> as unknown as Either.Right, -}: SynchronizeMigratorsParams): TaskEither.TaskEither { +}: SynchronizeMigratorsParams): TaskEither.TaskEither { return () => { waitGroup.resolve(payload); return waitGroup.promise .then((res) => (thenHook ? thenHook(res) : res)) - .catch((error) => Either.left({ type: 'sync_failed' as const, error })); + .catch((error) => Either.left({ type: 'synchronization_failed' as const, error })); }; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index 76b998042910f..90e7ad624de93 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -1790,7 +1790,7 @@ describe('migrations v2 model', () => { test('READY_TO_REINDEX_SYNC -> FATAL if the synchronization between migrators fails', () => { const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.left({ - type: 'sync_failed', + type: 'synchronization_failed', error: new Error('Other migrators failed to reach the synchronization point'), }); const newState = model(state, res); @@ -2051,7 +2051,7 @@ describe('migrations v2 model', () => { }); test('DONE_REINDEXING_SYNC -> FATAL if the synchronization between migrators fails', () => { const res: ResponseType<'DONE_REINDEXING_SYNC'> = Either.left({ - type: 'sync_failed', + type: 'synchronization_failed', error: new Error('Other migrators failed to reach the synchronization point'), }); const newState = model(state, res); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 8172f3b9e7a8c..a702acd2580f0 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -813,12 +813,18 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'DONE_REINDEXING_SYNC' }; } } else if (Either.isLeft(res)) { - return { - ...stateP, - controlState: 'FATAL', - reason: 'An error occurred whilst waiting for other migrators to get to this step.', - throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem - }; + const left = res.left; + + if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; + } else { + throwBadResponse(stateP, left); + } } else { return throwBadResponse(stateP, res as never); } @@ -954,12 +960,18 @@ export const model = (currentState: State, resW: ResponseType): sourceIndexMappings: Option.none, }; } else if (Either.isLeft(res)) { - return { - ...stateP, - controlState: 'FATAL', - reason: 'An error occurred whilst waiting for other migrators to get to this step.', - throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem - }; + const left = res.left; + + if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; + } else { + throwBadResponse(stateP, left); + } } else { return throwBadResponse(stateP, res as never); } @@ -1488,7 +1500,7 @@ export const model = (currentState: State, resW: ResponseType): } return { ...stateP, - controlState: 'MARK_VERSION_INDEX_READY', + controlState: 'CHECK_VERSION_INDEX_READY_ACTIONS', }; } else if (Either.isLeft(res)) { const left = res.left; @@ -1546,6 +1558,13 @@ export const model = (currentState: State, resW: ResponseType): // cause it to occur (this error is only relevant to the LEGACY_DELETE // step). throwBadResponse(stateP, left as never); + } else if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; } else { throwBadResponse(stateP, left); } From 3508eceadee324cdc0ba1ed9ac6893114f8a9ede Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 2 Jun 2023 18:20:30 +0200 Subject: [PATCH 3/8] Remove unneccessary return from throw statements --- .../src/model/model.ts | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index a702acd2580f0..38eccfd3bb087 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -86,7 +86,7 @@ export const model = (currentState: State, resW: ResponseType): const retryErrorMessage = `[${left.type}] Incompatible Elasticsearch cluster settings detected. Remove the persistent and transient Elasticsearch cluster setting 'cluster.routing.allocation.enable' or set it to a value of 'all' to allow migrations to proceed. Refer to ${stateP.migrationDocLinks.routingAllocationDisabled} for more information on how to resolve the issue.`; return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts); } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else if (Either.isRight(res)) { // cluster routing allocation is enabled and we can continue with the migration as normal @@ -266,7 +266,7 @@ export const model = (currentState: State, resW: ResponseType): }; } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'WAIT_FOR_MIGRATION_COMPLETION') { const res = resW as ExcludeRetryableEsError>; @@ -314,14 +314,14 @@ export const model = (currentState: State, resW: ResponseType): // If the write block failed because the index doesn't exist, it means // another instance already completed the legacy pre-migration. Proceed // to the next step. - if (isTypeof(res.left, 'index_not_found_exception')) { + const left = res.left; + if (isTypeof(left, 'index_not_found_exception')) { return { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' }; } else { - // @ts-expect-error TS doesn't correctly narrow this type to never - return throwBadResponse(stateP, res); + throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'LEGACY_CREATE_REINDEX_TARGET') { const res = resW as ExcludeRetryableEsError>; @@ -343,7 +343,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else if (Either.isRight(res)) { return { @@ -476,10 +476,10 @@ export const model = (currentState: State, resW: ResponseType): const retryErrorMessage = `${left.message} Refer to ${stateP.migrationDocLinks.repeatedTimeoutRequests} for information on how to resolve the issue.`; return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts); } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'UPDATE_SOURCE_MAPPINGS_PROPERTIES') { const res = resW as ExcludeRetryableEsError>; @@ -723,13 +723,11 @@ export const model = (currentState: State, resW: ResponseType): ...stateP, controlState: 'CALCULATE_EXCLUDE_FILTERS', }; - } else if (isTypeof(res.left, 'index_not_found_exception')) { + } else { // We don't handle the following errors as the migration algorithm // will never cause them to occur: // - index_not_found_exception - return throwBadResponse(stateP, res.left as never); - } else { - return throwBadResponse(stateP, res.left); + throwBadResponse(stateP, res.left as never); } } else if (stateP.controlState === 'CALCULATE_EXCLUDE_FILTERS') { const res = resW as ExcludeRetryableEsError>; @@ -753,7 +751,7 @@ export const model = (currentState: State, resW: ResponseType): ], }; } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'CREATE_REINDEX_TEMP') { const res = resW as ExcludeRetryableEsError>; @@ -788,7 +786,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { // If the createIndex action receives an 'resource_already_exists_exception' @@ -826,7 +824,7 @@ export const model = (currentState: State, resW: ResponseType): throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') { const res = resW as ExcludeRetryableEsError>; @@ -973,7 +971,7 @@ export const model = (currentState: State, resW: ResponseType): throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_TRANSFORM') { // We follow a similar control flow as for @@ -1519,7 +1517,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { // If the createIndex action receives an 'resource_already_exists_exception' @@ -1618,6 +1616,6 @@ export const model = (currentState: State, resW: ResponseType): // The state-action machine will never call the model in the terminating states throwBadControlState(stateP as never); } else { - return throwBadControlState(stateP); + throwBadControlState(stateP); } }; From cd9f294a3632741613d7de72f2424a0b47ef2c54 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 2 Jun 2023 18:26:58 +0200 Subject: [PATCH 4/8] Rollback changes in update_aliases.ts --- .../src/actions/update_aliases.ts | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts index c3fa320437804..eeaadbb86306f 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/update_aliases.ts @@ -37,13 +37,6 @@ export interface UpdateAliasesParams { aliasActions: AliasAction[]; timeout?: string; } - -/** @internal */ -export type UpdateAliasesReturnType = TaskEither.TaskEither< - IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, - 'update_aliases_succeeded' ->; - /** * Calls the Update index alias API `_alias` with the provided alias actions. */ @@ -52,9 +45,11 @@ export const updateAliases = client, aliasActions, timeout = DEFAULT_TIMEOUT, - }: UpdateAliasesParams): UpdateAliasesReturnType => + }: UpdateAliasesParams): TaskEither.TaskEither< + IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, + 'update_aliases_succeeded' + > => () => { - if (!aliasActions || !aliasActions.length) throw Error('updating NO aliases!'); return client.indices .updateAliases({ actions: aliasActions, From 13e9828f610b6104903499c486dd5095cabfc4bc Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 2 Jun 2023 22:27:41 +0200 Subject: [PATCH 5/8] Fix outdated integration test --- .../saved_objects/migrations/group3/actions/actions.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index 38d4075f9516c..96af4333a3b0c 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1871,7 +1871,7 @@ describe('migration actions', () => { expect(res).toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "create_index_succeeded", + "right": "index_already_exists", } `); }); From fa263eed61bf3ef0abafaadcb5e430edcff85323 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Sat, 3 Jun 2023 10:02:28 +0200 Subject: [PATCH 6/8] Fix integration test --- .../group3/incompatible_cluster_routing_allocation.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts index 5ae1040072059..b723ca1b62608 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts @@ -161,7 +161,7 @@ describe('incompatible_cluster_routing_allocation', () => { .map((str) => JSON5.parse(str)) as LogRecord[]; expect( - records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY -> DONE')) + records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY_SYNC -> DONE')) ).toBeDefined(); }, { retryAttempts: 100, retryDelayMs: 500 } From 3c505883ff2ebdafe837c8a2ebe2e4568060da32 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Sun, 4 Jun 2023 10:04:23 +0200 Subject: [PATCH 7/8] Handle "other indices not found" conflict --- .../src/model/helpers.ts | 5 +- .../src/model/model.ts | 3 +- .../group3/dot_kibana_split.test.ts | 56 +++++++++++++++---- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index 9763eda1c5b15..30e6d3d53e664 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -19,6 +19,9 @@ import type { AliasAction, FetchIndexResponse } from '../actions'; import type { BulkIndexOperationTuple } from './create_batches'; import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state'; +/** @internal */ +export const REINDEX_TEMP_SUFFIX = '_reindex_temp'; + /** @internal */ export type Aliases = Partial>; @@ -309,7 +312,7 @@ export function getMigrationType({ * @returns A temporary index name to reindex documents */ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string => - `${indexPrefix}_${kibanaVersion}_reindex_temp`; + `${indexPrefix}_${kibanaVersion}${REINDEX_TEMP_SUFFIX}`; /** Increase batchSize by 20% until a maximum of maxBatchSize */ export const increaseBatchSize = ( diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 38eccfd3bb087..e4f38c2a32a3c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -46,6 +46,7 @@ import { increaseBatchSize, hasLaterVersionAlias, aliasVersion, + REINDEX_TEMP_SUFFIX, } from './helpers'; import { buildTempIndexMap, createBatches } from './create_batches'; import type { MigrationLog } from '../types'; @@ -1541,7 +1542,7 @@ export const model = (currentState: State, resW: ResponseType): // migration from the same source. return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' }; } else if (isTypeof(left, 'index_not_found_exception')) { - if (left.index === stateP.tempIndex) { + if (left.index.endsWith(REINDEX_TEMP_SUFFIX)) { // another instance has already completed the migration and deleted // the temporary index return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' }; diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index 72fc1aad81e3e..a7377ec2050d1 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -12,6 +12,7 @@ import { type ISavedObjectTypeRegistry, type SavedObjectsType, MAIN_SAVED_OBJECT_INDEX, + ALL_SAVED_OBJECT_INDICES, } from '@kbn/core-saved-objects-server'; import { DEFAULT_INDEX_TYPES_MAP } from '@kbn/core-saved-objects-base-server-internal'; import { @@ -406,6 +407,7 @@ describe('split .kibana index into multiple system indices', () => { }); // FLAKY: https://github.com/elastic/kibana/issues/157510 + // This test takes too long. Can be manually executed to verify the correct behavior. describe.skip('when multiple Kibana migrators run in parallel', () => { it('correctly migrates 7.7.2_xpack_100k_obj.zip archive', async () => { esServer = await startElasticsearch({ @@ -414,15 +416,29 @@ describe('split .kibana index into multiple system indices', () => { const esClient = await getEsClient(); const breakdownBefore = await getAggregatedTypesCountAllIndices(esClient); - expect(breakdownBefore).toMatchSnapshot('before migration'); + expect(breakdownBefore).toEqual({ + '.kibana': { + 'apm-telemetry': 1, + config: 1, + dashboard: 52994, + 'index-pattern': 1, + search: 1, + space: 1, + 'ui-metric': 5, + visualization: 53004, + }, + '.kibana_task_manager': { + task: 5, + }, + }); for (let i = 0; i < PARALLEL_MIGRATORS; ++i) { await clearLog(Path.join(__dirname, `dot_kibana_split_instance_${i}.log`)); } const testKits = await Promise.all( - new Array(PARALLEL_MIGRATORS) - .fill({ + new Array(PARALLEL_MIGRATORS).fill(true).map((_, index) => + getKibanaMigratorTestKit({ settings: { migrations: { discardUnknownObjects: currentVersion, @@ -431,13 +447,10 @@ describe('split .kibana index into multiple system indices', () => { }, kibanaIndex: MAIN_SAVED_OBJECT_INDEX, types: typeRegistry.getAllTypes(), + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`), }) - .map((config, index) => - getKibanaMigratorTestKit({ - ...config, - logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`), - }) - ) + ) ); const results = await Promise.all(testKits.map((testKit) => testKit.runMigrations())); @@ -447,9 +460,30 @@ describe('split .kibana index into multiple system indices', () => { .every((result) => result.status === 'migrated' || result.status === 'patched') ).toEqual(true); + await esClient.indices.refresh({ index: ALL_SAVED_OBJECT_INDICES }); + const breakdownAfter = await getAggregatedTypesCountAllIndices(esClient); - expect(breakdownAfter).toMatchSnapshot('after migration'); - }); + expect(breakdownAfter).toEqual({ + '.kibana': { + 'apm-telemetry': 1, + config: 1, + space: 1, + 'ui-metric': 5, + }, + '.kibana_alerting_cases': {}, + '.kibana_analytics': { + dashboard: 52994, + 'index-pattern': 1, + search: 1, + visualization: 53004, + }, + '.kibana_ingest': {}, + '.kibana_security_solution': {}, + '.kibana_task_manager': { + task: 5, + }, + }); + }, 1200000); afterEach(async () => { await esServer?.stop(); From 23b4e39ddeb1191b0772022e93410e41ad079789 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Sun, 4 Jun 2023 17:38:31 +0200 Subject: [PATCH 8/8] Address PR feedback --- .../src/kibana_migrator_utils.ts | 8 ++++---- .../src/run_v2_migration.ts | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts index 04bb151f818e3..a646f6e36081c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts @@ -101,7 +101,7 @@ export async function getIndicesInvolvedInRelocation({ defaultIndexTypesMap: IndexTypesMap; logger: Logger; }): Promise { - const indicesWithMovingTypesSet = new Set(); + const indicesWithRelocatingTypesSet = new Set(); const currentIndexTypesMap = await getCurrentIndexTypesMap({ client, @@ -120,11 +120,11 @@ export async function getIndicesInvolvedInRelocation({ Object.values(typeIndexDistribution) .filter(({ status }) => status === TypeStatus.Moved) .forEach(({ currentIndex, targetIndex }) => { - indicesWithMovingTypesSet.add(currentIndex!); - indicesWithMovingTypesSet.add(targetIndex!); + indicesWithRelocatingTypesSet.add(currentIndex!); + indicesWithRelocatingTypesSet.add(targetIndex!); }); - return Array.from(indicesWithMovingTypesSet); + return Array.from(indicesWithRelocatingTypesSet); } export function indexMapToIndexTypesMap(indexMap: IndexMap): IndexTypesMap { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts index 29367153caf8c..49088ebc147b8 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts @@ -76,7 +76,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise>( - indicesWithMovingTypes, + indicesWithRelocatingTypes, (allAliasActions) => updateAliases({ client: options.elasticsearchClient, @@ -113,9 +113,9 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise migratorIndices.add(index)); + indicesWithRelocatingTypes.forEach((index) => migratorIndices.add(index)); const migrators = Array.from(migratorIndices).map((indexName, i) => { return { @@ -124,7 +124,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise