From c2b17696879171858a028bdf4ddfcac6faaf11d9 Mon Sep 17 00:00:00 2001 From: Mikhail Shustov Date: Mon, 12 Apr 2021 20:38:47 +0200 Subject: [PATCH] Migration v2 waits for yellow cluster (#96788) * migrator waits for source index to be yellow otherwise the next request to Elasticsearch can fail * unskip integration tests that failed due to a red cluster * log how much the every step lasts * use Date.now instead of performance.now migration cannot finish in ms * update tests * clean log file before running tests * fix wrong type * add an integration test for waitForIndexStatusYellow --- .../migrationsv2/actions/index.ts | 4 +- .../integration_tests/actions.test.ts | 46 ++++ .../integration_tests/migration.test.ts | 23 +- .../migration_7.7.2_xpack_100k.test.ts | 18 +- .../migrations_state_action_machine.test.ts | 13 +- .../migrations_state_action_machine.ts | 23 +- .../saved_objects/migrationsv2/model.test.ts | 228 +++++++----------- .../saved_objects/migrationsv2/model.ts | 41 ++-- .../server/saved_objects/migrationsv2/next.ts | 5 +- .../saved_objects/migrationsv2/types.ts | 8 + 10 files changed, 225 insertions(+), 184 deletions(-) diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index d759c0c9be20e..9d6afbd3b0d87 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -185,10 +185,10 @@ export const removeWriteBlock = ( * yellow at any point in the future. So ultimately data-redundancy is up to * users to maintain. */ -const waitForIndexStatusYellow = ( +export const waitForIndexStatusYellow = ( client: ElasticsearchClient, index: string, - timeout: string + timeout = DEFAULT_TIMEOUT ): TaskEither.TaskEither => () => { return client.cluster .health({ index, wait_for_status: 'yellow', timeout }) diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index 3ed3ace416990..21c05d22b0581 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -30,6 +30,7 @@ import { UpdateAndPickupMappingsResponse, verifyReindex, removeWriteBlock, + waitForIndexStatusYellow, } from '../actions'; import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; @@ -207,6 +208,51 @@ describe('migration actions', () => { }); }); + describe('waitForIndexStatusYellow', () => { + afterAll(async () => { + await client.indices.delete({ index: 'red_then_yellow_index' }); + }); + it('resolves right after waiting for an index status to be yellow if the index already existed', async () => { + // Create a red index + await client.indices.create( + { + index: 'red_then_yellow_index', + timeout: '5s', + body: { + mappings: { properties: {} }, + settings: { + // Allocate 1 replica so that this index stays yellow + number_of_replicas: '1', + // Disable all shard allocation so that the index status is red + index: { routing: { allocation: { enable: 'none' } } }, + }, + }, + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ); + + // Start tracking the index status + const indexStatusPromise = waitForIndexStatusYellow(client, 'red_then_yellow_index')(); + + const redStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); + expect(redStatusResponse.body.status).toBe('red'); + + client.indices.putSettings({ + index: 'red_then_yellow_index', + body: { + // Enable all shard allocation so that the index status turns yellow + index: { routing: { allocation: { enable: 'all' } } }, + }, + }); + + await indexStatusPromise; + // Assert that the promise didn't resolve before the index became yellow + + const yellowStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); + expect(yellowStatusResponse.body.status).toBe('yellow'); + }); + }); + describe('cloneIndex', () => { afterAll(async () => { try { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts index 4d41a147bc0ef..1f8c3a535a902 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts @@ -6,7 +6,9 @@ * Side Public License, v 1. */ -import { join } from 'path'; +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; import Semver from 'semver'; import { REPO_ROOT } from '@kbn/dev-utils'; import { Env } from '@kbn/config'; @@ -19,8 +21,15 @@ import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; -// FLAKY: https://github.com/elastic/kibana/issues/91107 -describe.skip('migration v2', () => { +const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + +describe('migration v2', () => { let esServer: kbnTestServer.TestElasticsearchUtils; let root: Root; let coreStart: InternalCoreStart; @@ -47,7 +56,7 @@ describe.skip('migration v2', () => { appenders: { file: { type: 'file', - fileName: join(__dirname, 'migration_test_kibana.log'), + fileName: logFilePath, layout: { type: 'json', }, @@ -122,9 +131,10 @@ describe.skip('migration v2', () => { const migratedIndex = `.kibana_${kibanaVersion}_001`; beforeAll(async () => { + await removeLogFile(); await startServers({ oss: false, - dataArchive: join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'), + dataArchive: Path.join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'), }); }); @@ -179,9 +189,10 @@ describe.skip('migration v2', () => { const migratedIndex = `.kibana_${kibanaVersion}_001`; beforeAll(async () => { + await removeLogFile(); await startServers({ oss: true, - dataArchive: join(__dirname, 'archives', '8.0.0_oss_sample_saved_objects.zip'), + dataArchive: Path.join(__dirname, 'archives', '8.0.0_oss_sample_saved_objects.zip'), }); }); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts index c26d4593bede1..0e51c886f7f30 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts @@ -6,7 +6,9 @@ * Side Public License, v 1. */ -import { join } from 'path'; +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; import { REPO_ROOT } from '@kbn/dev-utils'; import { Env } from '@kbn/config'; import { getEnvOptions } from '@kbn/config/target/mocks'; @@ -16,8 +18,15 @@ import { InternalCoreStart } from '../../../internal_types'; import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; +const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); -describe.skip('migration from 7.7.2-xpack with 100k objects', () => { +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + +describe('migration from 7.7.2-xpack with 100k objects', () => { let esServer: kbnTestServer.TestElasticsearchUtils; let root: Root; let coreStart: InternalCoreStart; @@ -48,7 +57,7 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => { appenders: { file: { type: 'file', - fileName: join(__dirname, 'migration_test_kibana.log'), + fileName: logFilePath, layout: { type: 'json', }, @@ -93,9 +102,10 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => { const migratedIndex = `.kibana_${kibanaVersion}_001`; beforeAll(async () => { + await removeLogFile(); await startServers({ oss: false, - dataArchive: join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'), + dataArchive: Path.join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'), }); }); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts index 2c2cd0032abfd..4d93abcc4018f 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts @@ -16,6 +16,11 @@ import { ResponseError } from '@elastic/elasticsearch/lib/errors'; import { elasticsearchClientMock } from '../../elasticsearch/client/mocks'; describe('migrationsStateActionMachine', () => { + beforeAll(() => { + jest + .spyOn(global.Date, 'now') + .mockImplementation(() => new Date('2021-04-12T16:00:00.000Z').valueOf()); + }); beforeEach(() => { jest.clearAllMocks(); }); @@ -112,25 +117,25 @@ describe('migrationsStateActionMachine', () => { "[.my-so-index] Log from LEGACY_REINDEX control state", ], Array [ - "[.my-so-index] INIT -> LEGACY_REINDEX", + "[.my-so-index] INIT -> LEGACY_REINDEX. took: 0ms.", ], Array [ "[.my-so-index] Log from LEGACY_DELETE control state", ], Array [ - "[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE", + "[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE. took: 0ms.", ], Array [ "[.my-so-index] Log from LEGACY_DELETE control state", ], Array [ - "[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE", + "[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE. took: 0ms.", ], Array [ "[.my-so-index] Log from DONE control state", ], Array [ - "[.my-so-index] LEGACY_DELETE -> DONE", + "[.my-so-index] LEGACY_DELETE -> DONE. took: 0ms.", ], ], "log": Array [], diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts index dddc66d68ad20..e35e21421ac1f 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts @@ -8,7 +8,6 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import * as Option from 'fp-ts/lib/Option'; -import { performance } from 'perf_hooks'; import { Logger, LogMeta } from '../../logging'; import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs'; import { Model, Next, stateActionMachine } from './state_action_machine'; @@ -32,7 +31,8 @@ const logStateTransition = ( logger: Logger, logMessagePrefix: string, oldState: State, - newState: State + newState: State, + tookMs: number ) => { if (newState.logs.length > oldState.logs.length) { newState.logs @@ -40,7 +40,9 @@ const logStateTransition = ( .forEach((log) => logger[log.level](logMessagePrefix + log.message)); } - logger.info(logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}`); + logger.info( + logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}. took: ${tookMs}ms.` + ); }; const logActionResponse = ( @@ -85,11 +87,12 @@ export async function migrationStateActionMachine({ model: Model; }) { const executionLog: ExecutionLog = []; - const starteTime = performance.now(); + const startTime = Date.now(); // Since saved object index names usually start with a `.` and can be // configured by users to include several `.`'s we can't use a logger tag to // indicate which messages come from which index upgrade. const logMessagePrefix = `[${initialState.indexPrefix}] `; + let prevTimestamp = startTime; try { const finalState = await stateActionMachine( initialState, @@ -116,12 +119,20 @@ export async function migrationStateActionMachine({ controlState: newState.controlState, prevControlState: state.controlState, }); - logStateTransition(logger, logMessagePrefix, state, redactedNewState as State); + const now = Date.now(); + logStateTransition( + logger, + logMessagePrefix, + state, + redactedNewState as State, + now - prevTimestamp + ); + prevTimestamp = now; return newState; } ); - const elapsedMs = performance.now() - starteTime; + const elapsedMs = Date.now() - startTime; if (finalState.controlState === 'DONE') { logger.info(logMessagePrefix + `Migration completed after ${Math.round(elapsedMs)}ms`); if (finalState.sourceIndex != null && Option.isSome(finalState.sourceIndex)) { diff --git a/src/core/server/saved_objects/migrationsv2/model.test.ts b/src/core/server/saved_objects/migrationsv2/model.test.ts index 4fd9b7cbb3df4..8aad62f13b8fe 100644 --- a/src/core/server/saved_objects/migrationsv2/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model.test.ts @@ -8,7 +8,7 @@ import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; -import { +import type { FatalState, State, LegacySetWriteBlockState, @@ -30,6 +30,7 @@ import { CreateNewTargetState, CloneTempToSource, SetTempWriteBlock, + WaitForYellowSourceState, } from './types'; import { SavedObjectsRawDoc } from '..'; import { AliasAction, RetryableEsClientError } from './actions'; @@ -265,7 +266,7 @@ describe('migrations v2 model', () => { `"The .kibana alias is pointing to a newer version of Kibana: v7.12.0"` ); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when .kibana points to an index with an invalid version', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when .kibana points to an index with an invalid version', () => { // If users tamper with our index version naming scheme we can no // longer accurately detect a newer version. Older Kibana versions // will have indices like `.kibana_10` and users might choose an @@ -290,39 +291,13 @@ describe('migrations v2 model', () => { }); const newState = model(initState, res) as FatalState; - expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.controlState).toEqual('WAIT_FOR_YELLOW_SOURCE'); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_7.invalid.0_001'), - targetIndex: '.kibana_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_7.invalid.0_001', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a v2 migrations index (>= 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v2 migrations index (>= 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_7.11.0_001': { aliases: { '.kibana': {}, '.kibana_7.11.0': {} }, @@ -348,39 +323,13 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_7.11.0_001'), - targetIndex: '.kibana_7.12.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_7.11.0_001', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_3': { aliases: { @@ -393,35 +342,9 @@ describe('migrations v2 model', () => { const newState = model(initState, res); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_3'), - targetIndex: '.kibana_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_3', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -468,7 +391,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a custom kibana.index name (>= 6.5 < 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a custom kibana.index name (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ 'my-saved-objects_3': { aliases: { @@ -490,39 +413,13 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('my-saved-objects_3'), - targetIndex: 'my-saved-objects_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: 'my-saved-objects_3', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a custom kibana.index v2 migrations index (>= 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a custom kibana.index v2 migrations index (>= 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ 'my-saved-objects_7.11.0': { aliases: { @@ -545,35 +442,9 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('my-saved-objects_7.11.0'), - targetIndex: 'my-saved-objects_7.12.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: 'my-saved-objects_7.11.0', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -761,6 +632,69 @@ describe('migrations v2 model', () => { expect(newState.retryDelay).toEqual(0); }); }); + + describe('WAIT_FOR_YELLOW_SOURCE', () => { + const mappingsWithUnknownType = { + properties: { + disabled_saved_object_type: { + properties: { + value: { type: 'keyword' }, + }, + }, + }, + _meta: { + migrationMappingPropertyHashes: { + disabled_saved_object_type: '7997cf5a56cc02bdc9c93361bde732b0', + }, + }, + }; + + const waitForYellowSourceState: WaitForYellowSourceState = { + ...baseState, + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_3', + sourceIndexMappings: mappingsWithUnknownType, + }; + + test('WAIT_FOR_YELLOW_SOURCE -> SET_SOURCE_WRITE_BLOCK if action succeeds', () => { + const res: ResponseType<'WAIT_FOR_YELLOW_SOURCE'> = Either.right({}); + const newState = model(waitForYellowSourceState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + + expect(newState).toMatchObject({ + controlState: 'SET_SOURCE_WRITE_BLOCK', + sourceIndex: Option.some('.kibana_3'), + targetIndex: '.kibana_7.11.0_001', + }); + + // This snapshot asserts that we disable the unknown saved object + // type. Because it's mappings are disabled, we also don't copy the + // `_meta.migrationMappingPropertyHashes` for the disabled type. + expect(newState.targetIndexMappings).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "migrationMappingPropertyHashes": Object { + "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", + }, + }, + "properties": Object { + "disabled_saved_object_type": Object { + "dynamic": false, + "properties": Object {}, + }, + "new_saved_object_type": Object { + "properties": Object { + "value": Object { + "type": "text", + }, + }, + }, + }, + } + `); + }); + }); + describe('SET_SOURCE_WRITE_BLOCK', () => { const setWriteBlockState: SetSourceWriteBlockState = { ...baseState, diff --git a/src/core/server/saved_objects/migrationsv2/model.ts b/src/core/server/saved_objects/migrationsv2/model.ts index 2353452a6a51b..ee78692a7044f 100644 --- a/src/core/server/saved_objects/migrationsv2/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model.ts @@ -222,22 +222,11 @@ export const model = (currentState: State, resW: ResponseType): ) { // The source index is the index the `.kibana` alias points to const source = aliases[stateP.currentAlias]; - const target = stateP.versionIndex; return { ...stateP, - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some(source) as Option.Some, - targetIndex: target, - targetIndexMappings: disableUnknownTypeMappingFields( - stateP.targetIndexMappings, - indices[source].mappings - ), - versionIndexReadyActions: Option.some([ - { remove: { index: source, alias: stateP.currentAlias, must_exist: true } }, - { add: { index: target, alias: stateP.currentAlias } }, - { add: { index: target, alias: stateP.versionAlias } }, - { remove_index: { index: stateP.tempIndex } }, - ]), + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: source, + sourceIndexMappings: indices[source].mappings, }; } else if (indices[stateP.legacyIndex] != null) { // Migrate from a legacy index @@ -432,6 +421,30 @@ export const model = (currentState: State, resW: ResponseType): } else { throwBadResponse(stateP, res); } + } else if (stateP.controlState === 'WAIT_FOR_YELLOW_SOURCE') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + const source = stateP.sourceIndex; + const target = stateP.versionIndex; + return { + ...stateP, + controlState: 'SET_SOURCE_WRITE_BLOCK', + sourceIndex: Option.some(source) as Option.Some, + targetIndex: target, + targetIndexMappings: disableUnknownTypeMappingFields( + stateP.targetIndexMappings, + stateP.sourceIndexMappings + ), + versionIndexReadyActions: Option.some([ + { remove: { index: source, alias: stateP.currentAlias, must_exist: true } }, + { add: { index: target, alias: stateP.currentAlias } }, + { add: { index: target, alias: stateP.versionAlias } }, + { remove_index: { index: stateP.tempIndex } }, + ]), + }; + } else { + return throwBadResponse(stateP, res); + } } else if (stateP.controlState === 'SET_SOURCE_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index 67b2004a4b31a..5cbda741a0ce5 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -10,7 +10,7 @@ import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { UnwrapPromise } from '@kbn/utility-types'; import { pipe } from 'fp-ts/lib/pipeable'; -import { +import type { AllActionStates, ReindexSourceToTempState, MarkVersionIndexReady, @@ -32,6 +32,7 @@ import { CreateNewTargetState, CloneTempToSource, SetTempWriteBlock, + WaitForYellowSourceState, } from './types'; import * as Actions from './actions'; import { ElasticsearchClient } from '../../elasticsearch'; @@ -54,6 +55,8 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra return { INIT: (state: InitState) => Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]), + WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) => + Actions.waitForIndexStatusYellow(client, state.sourceIndex), SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) => Actions.setWriteBlock(client, state.sourceIndex.value), CREATE_NEW_TARGET: (state: CreateNewTargetState) => diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index cc4aa18171843..e9b351c0152fc 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -128,6 +128,13 @@ export type FatalState = BaseState & { readonly reason: string; }; +export interface WaitForYellowSourceState extends BaseState { + /** Wait for the source index to be yellow before requesting it. */ + readonly controlState: 'WAIT_FOR_YELLOW_SOURCE'; + readonly sourceIndex: string; + readonly sourceIndexMappings: IndexMapping; +} + export type SetSourceWriteBlockState = PostInitState & { /** Set a write block on the source index to prevent any further writes */ readonly controlState: 'SET_SOURCE_WRITE_BLOCK'; @@ -290,6 +297,7 @@ export type State = | FatalState | InitState | DoneState + | WaitForYellowSourceState | SetSourceWriteBlockState | CreateNewTargetState | CreateReindexTempState