From f078844ef158a8d03bde91d82b8c79b9a17adf1e Mon Sep 17 00:00:00 2001 From: Mikhail Shustov Date: Tue, 13 Apr 2021 21:54:30 +0200 Subject: [PATCH] [7.12] Migration v2 waits for yellow cluster (#96788) (#96888) * Migration v2 waits for yellow cluster (#96788) * migrator waits for source index to be yellow otherwise the next request to Elasticsearch can fail * unskip integration tests that failed due to a red cluster * log how much the every step lasts * use Date.now instead of performance.now migration cannot finish in ms * update tests * clean log file before running tests * fix wrong type * add an integration test for waitForIndexStatusYellow # Conflicts: # src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts * remove tests using so from v8 * skip as on master :( --- .../migrationsv2/actions/index.ts | 4 +- .../integration_tests/actions.test.ts | 46 ++++ .../integration_tests/migration.test.ts | 21 +- .../migration_7.7.2_xpack_100k.test.ts | 16 +- .../migrations_state_action_machine.test.ts | 13 +- .../migrations_state_action_machine.ts | 23 +- .../saved_objects/migrationsv2/model.test.ts | 228 +++++++----------- .../saved_objects/migrationsv2/model.ts | 41 ++-- .../server/saved_objects/migrationsv2/next.ts | 5 +- .../saved_objects/migrationsv2/types.ts | 8 + 10 files changed, 224 insertions(+), 181 deletions(-) diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index 204d2b064d9b0..2748d3be74b2f 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -180,10 +180,10 @@ export const removeWriteBlock = ( * yellow at any point in the future. So ultimately data-redundancy is up to * users to maintain. */ -const waitForIndexStatusYellow = ( +export const waitForIndexStatusYellow = ( client: ElasticsearchClient, index: string, - timeout: string + timeout = DEFAULT_TIMEOUT ): TaskEither.TaskEither => () => { return client.cluster .health({ index, wait_for_status: 'yellow', timeout }) diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index 97da887d4deb3..38f33d97d09c4 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -30,6 +30,7 @@ import { UpdateAndPickupMappingsResponse, verifyReindex, removeWriteBlock, + waitForIndexStatusYellow, } from '../actions'; import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; @@ -207,6 +208,51 @@ describe('migration actions', () => { }); }); + describe('waitForIndexStatusYellow', () => { + afterAll(async () => { + await client.indices.delete({ index: 'red_then_yellow_index' }); + }); + it('resolves right after waiting for an index status to be yellow if the index already existed', async () => { + // Create a red index + await client.indices.create( + { + index: 'red_then_yellow_index', + timeout: '5s', + body: { + mappings: { properties: {} }, + settings: { + // Allocate 1 replica so that this index stays yellow + number_of_replicas: '1', + // Disable all shard allocation so that the index status is red + index: { routing: { allocation: { enable: 'none' } } }, + }, + }, + }, + { maxRetries: 0 /** handle retry ourselves for now */ } + ); + + // Start tracking the index status + const indexStatusPromise = waitForIndexStatusYellow(client, 'red_then_yellow_index')(); + + const redStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); + expect(redStatusResponse.body.status).toBe('red'); + + client.indices.putSettings({ + index: 'red_then_yellow_index', + body: { + // Enable all shard allocation so that the index status turns yellow + index: { routing: { allocation: { enable: 'all' } } }, + }, + }); + + await indexStatusPromise; + // Assert that the promise didn't resolve before the index became yellow + + const yellowStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); + expect(yellowStatusResponse.body.status).toBe('yellow'); + }); + }); + describe('cloneIndex', () => { afterAll(async () => { try { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts index 638e0e7afbc23..7e4930b48576b 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts @@ -6,7 +6,9 @@ * Side Public License, v 1. */ -import { join } from 'path'; +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; import Semver from 'semver'; import { REPO_ROOT } from '@kbn/dev-utils'; import { Env } from '@kbn/config'; @@ -19,6 +21,14 @@ import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; +const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + describe('migration v2', () => { let esServer: kbnTestServer.TestElasticsearchUtils; let root: Root; @@ -46,7 +56,7 @@ describe('migration v2', () => { appenders: { file: { type: 'file', - fileName: join(__dirname, 'migration_test_kibana.log'), + fileName: logFilePath, layout: { type: 'json', }, @@ -121,9 +131,10 @@ describe('migration v2', () => { const migratedIndex = `.kibana_${kibanaVersion}_001`; beforeAll(async () => { + await removeLogFile(); await startServers({ oss: false, - dataArchive: join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'), + dataArchive: Path.join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'), }); }); @@ -162,7 +173,9 @@ describe('migration v2', () => { const expectedVersions = getExpectedVersionPerType(); const res = await esClient.search({ index: migratedIndex, - sort: ['_doc'], + body: { + sort: ['_doc'], + }, size: 10000, }); const allDocuments = res.body.hits.hits as SavedObjectsRawDoc[]; diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts index c26d4593bede1..229bbb895cfe0 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration_7.7.2_xpack_100k.test.ts @@ -6,7 +6,9 @@ * Side Public License, v 1. */ -import { join } from 'path'; +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; import { REPO_ROOT } from '@kbn/dev-utils'; import { Env } from '@kbn/config'; import { getEnvOptions } from '@kbn/config/target/mocks'; @@ -16,6 +18,13 @@ import { InternalCoreStart } from '../../../internal_types'; import { Root } from '../../../root'; const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version; +const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} describe.skip('migration from 7.7.2-xpack with 100k objects', () => { let esServer: kbnTestServer.TestElasticsearchUtils; @@ -48,7 +57,7 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => { appenders: { file: { type: 'file', - fileName: join(__dirname, 'migration_test_kibana.log'), + fileName: logFilePath, layout: { type: 'json', }, @@ -93,9 +102,10 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => { const migratedIndex = `.kibana_${kibanaVersion}_001`; beforeAll(async () => { + await removeLogFile(); await startServers({ oss: false, - dataArchive: join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'), + dataArchive: Path.join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'), }); }); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts index d7c777c0dfdfa..cf73c135ad88e 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts @@ -16,6 +16,11 @@ import { ResponseError } from '@elastic/elasticsearch/lib/errors'; import { elasticsearchClientMock } from '../../elasticsearch/client/mocks'; describe('migrationsStateActionMachine', () => { + beforeAll(() => { + jest + .spyOn(global.Date, 'now') + .mockImplementation(() => new Date('2021-04-12T16:00:00.000Z').valueOf()); + }); beforeEach(() => { jest.clearAllMocks(); }); @@ -112,25 +117,25 @@ describe('migrationsStateActionMachine', () => { "[.my-so-index] Log from LEGACY_REINDEX control state", ], Array [ - "[.my-so-index] INIT -> LEGACY_REINDEX", + "[.my-so-index] INIT -> LEGACY_REINDEX. took: 0ms.", ], Array [ "[.my-so-index] Log from LEGACY_DELETE control state", ], Array [ - "[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE", + "[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE. took: 0ms.", ], Array [ "[.my-so-index] Log from LEGACY_DELETE control state", ], Array [ - "[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE", + "[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE. took: 0ms.", ], Array [ "[.my-so-index] Log from DONE control state", ], Array [ - "[.my-so-index] LEGACY_DELETE -> DONE", + "[.my-so-index] LEGACY_DELETE -> DONE. took: 0ms.", ], ], "log": Array [], diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts index dddc66d68ad20..e35e21421ac1f 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts @@ -8,7 +8,6 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import * as Option from 'fp-ts/lib/Option'; -import { performance } from 'perf_hooks'; import { Logger, LogMeta } from '../../logging'; import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs'; import { Model, Next, stateActionMachine } from './state_action_machine'; @@ -32,7 +31,8 @@ const logStateTransition = ( logger: Logger, logMessagePrefix: string, oldState: State, - newState: State + newState: State, + tookMs: number ) => { if (newState.logs.length > oldState.logs.length) { newState.logs @@ -40,7 +40,9 @@ const logStateTransition = ( .forEach((log) => logger[log.level](logMessagePrefix + log.message)); } - logger.info(logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}`); + logger.info( + logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}. took: ${tookMs}ms.` + ); }; const logActionResponse = ( @@ -85,11 +87,12 @@ export async function migrationStateActionMachine({ model: Model; }) { const executionLog: ExecutionLog = []; - const starteTime = performance.now(); + const startTime = Date.now(); // Since saved object index names usually start with a `.` and can be // configured by users to include several `.`'s we can't use a logger tag to // indicate which messages come from which index upgrade. const logMessagePrefix = `[${initialState.indexPrefix}] `; + let prevTimestamp = startTime; try { const finalState = await stateActionMachine( initialState, @@ -116,12 +119,20 @@ export async function migrationStateActionMachine({ controlState: newState.controlState, prevControlState: state.controlState, }); - logStateTransition(logger, logMessagePrefix, state, redactedNewState as State); + const now = Date.now(); + logStateTransition( + logger, + logMessagePrefix, + state, + redactedNewState as State, + now - prevTimestamp + ); + prevTimestamp = now; return newState; } ); - const elapsedMs = performance.now() - starteTime; + const elapsedMs = Date.now() - startTime; if (finalState.controlState === 'DONE') { logger.info(logMessagePrefix + `Migration completed after ${Math.round(elapsedMs)}ms`); if (finalState.sourceIndex != null && Option.isSome(finalState.sourceIndex)) { diff --git a/src/core/server/saved_objects/migrationsv2/model.test.ts b/src/core/server/saved_objects/migrationsv2/model.test.ts index 083247e15eb05..1d1134f75b143 100644 --- a/src/core/server/saved_objects/migrationsv2/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model.test.ts @@ -8,7 +8,7 @@ import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; -import { +import type { FatalState, State, LegacySetWriteBlockState, @@ -30,6 +30,7 @@ import { CreateNewTargetState, CloneTempToSource, SetTempWriteBlock, + WaitForYellowSourceState, } from './types'; import { SavedObjectsRawDoc } from '..'; import { AliasAction, RetryableEsClientError } from './actions'; @@ -274,7 +275,7 @@ describe('migrations v2 model', () => { `"The .kibana alias is pointing to a newer version of Kibana: v7.12.0"` ); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when .kibana points to an index with an invalid version', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when .kibana points to an index with an invalid version', () => { // If users tamper with our index version naming scheme we can no // longer accurately detect a newer version. Older Kibana versions // will have indices like `.kibana_10` and users might choose an @@ -299,39 +300,13 @@ describe('migrations v2 model', () => { }); const newState = model(initState, res) as FatalState; - expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.controlState).toEqual('WAIT_FOR_YELLOW_SOURCE'); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_7.invalid.0_001'), - targetIndex: '.kibana_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_7.invalid.0_001', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a v2 migrations index (>= 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v2 migrations index (>= 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_7.11.0_001': { aliases: { '.kibana': {}, '.kibana_7.11.0': {} }, @@ -357,39 +332,13 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_7.11.0_001'), - targetIndex: '.kibana_7.12.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_7.11.0_001', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_3': { aliases: { @@ -402,35 +351,9 @@ describe('migrations v2 model', () => { const newState = model(initState, res); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('.kibana_3'), - targetIndex: '.kibana_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_3', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -477,7 +400,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a custom kibana.index name (>= 6.5 < 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a custom kibana.index name (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ 'my-saved-objects_3': { aliases: { @@ -499,39 +422,13 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('my-saved-objects_3'), - targetIndex: 'my-saved-objects_7.11.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: 'my-saved-objects_3', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a custom kibana.index v2 migrations index (>= 7.11.0)', () => { + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a custom kibana.index v2 migrations index (>= 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ 'my-saved-objects_7.11.0': { aliases: { @@ -554,35 +451,9 @@ describe('migrations v2 model', () => { ); expect(newState).toMatchObject({ - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some('my-saved-objects_7.11.0'), - targetIndex: 'my-saved-objects_7.12.0_001', + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: 'my-saved-objects_7.11.0', }); - // This snapshot asserts that we disable the unknown saved object - // type. Because it's mappings are disabled, we also don't copy the - // `_meta.migrationMappingPropertyHashes` for the disabled type. - expect(newState.targetIndexMappings).toMatchInlineSnapshot(` - Object { - "_meta": Object { - "migrationMappingPropertyHashes": Object { - "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", - }, - }, - "properties": Object { - "disabled_saved_object_type": Object { - "dynamic": false, - "properties": Object {}, - }, - "new_saved_object_type": Object { - "properties": Object { - "value": Object { - "type": "text", - }, - }, - }, - }, - } - `); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -770,6 +641,69 @@ describe('migrations v2 model', () => { expect(newState.retryDelay).toEqual(0); }); }); + + describe('WAIT_FOR_YELLOW_SOURCE', () => { + const mappingsWithUnknownType = { + properties: { + disabled_saved_object_type: { + properties: { + value: { type: 'keyword' }, + }, + }, + }, + _meta: { + migrationMappingPropertyHashes: { + disabled_saved_object_type: '7997cf5a56cc02bdc9c93361bde732b0', + }, + }, + }; + + const waitForYellowSourceState: WaitForYellowSourceState = { + ...baseState, + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: '.kibana_3', + sourceIndexMappings: mappingsWithUnknownType, + }; + + test('WAIT_FOR_YELLOW_SOURCE -> SET_SOURCE_WRITE_BLOCK if action succeeds', () => { + const res: ResponseType<'WAIT_FOR_YELLOW_SOURCE'> = Either.right({}); + const newState = model(waitForYellowSourceState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + + expect(newState).toMatchObject({ + controlState: 'SET_SOURCE_WRITE_BLOCK', + sourceIndex: Option.some('.kibana_3'), + targetIndex: '.kibana_7.11.0_001', + }); + + // This snapshot asserts that we disable the unknown saved object + // type. Because it's mappings are disabled, we also don't copy the + // `_meta.migrationMappingPropertyHashes` for the disabled type. + expect(newState.targetIndexMappings).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "migrationMappingPropertyHashes": Object { + "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", + }, + }, + "properties": Object { + "disabled_saved_object_type": Object { + "dynamic": false, + "properties": Object {}, + }, + "new_saved_object_type": Object { + "properties": Object { + "value": Object { + "type": "text", + }, + }, + }, + }, + } + `); + }); + }); + describe('SET_SOURCE_WRITE_BLOCK', () => { const setWriteBlockState: SetSourceWriteBlockState = { ...baseState, diff --git a/src/core/server/saved_objects/migrationsv2/model.ts b/src/core/server/saved_objects/migrationsv2/model.ts index 1b341330b1b76..ec6f56a469028 100644 --- a/src/core/server/saved_objects/migrationsv2/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model.ts @@ -224,22 +224,11 @@ export const model = (currentState: State, resW: ResponseType): ) { // The source index is the index the `.kibana` alias points to const source = aliases[stateP.currentAlias]; - const target = stateP.versionIndex; return { ...stateP, - controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some(source) as Option.Some, - targetIndex: target, - targetIndexMappings: disableUnknownTypeMappingFields( - stateP.targetIndexMappings, - indices[source].mappings - ), - versionIndexReadyActions: Option.some([ - { remove: { index: source, alias: stateP.currentAlias, must_exist: true } }, - { add: { index: target, alias: stateP.currentAlias } }, - { add: { index: target, alias: stateP.versionAlias } }, - { remove_index: { index: stateP.tempIndex } }, - ]), + controlState: 'WAIT_FOR_YELLOW_SOURCE', + sourceIndex: source, + sourceIndexMappings: indices[source].mappings, }; } else if (indices[stateP.legacyIndex] != null) { // Migrate from a legacy index @@ -434,6 +423,30 @@ export const model = (currentState: State, resW: ResponseType): } else { throwBadResponse(stateP, res); } + } else if (stateP.controlState === 'WAIT_FOR_YELLOW_SOURCE') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + const source = stateP.sourceIndex; + const target = stateP.versionIndex; + return { + ...stateP, + controlState: 'SET_SOURCE_WRITE_BLOCK', + sourceIndex: Option.some(source) as Option.Some, + targetIndex: target, + targetIndexMappings: disableUnknownTypeMappingFields( + stateP.targetIndexMappings, + stateP.sourceIndexMappings + ), + versionIndexReadyActions: Option.some([ + { remove: { index: source, alias: stateP.currentAlias, must_exist: true } }, + { add: { index: target, alias: stateP.currentAlias } }, + { add: { index: target, alias: stateP.versionAlias } }, + { remove_index: { index: stateP.tempIndex } }, + ]), + }; + } else { + return throwBadResponse(stateP, res); + } } else if (stateP.controlState === 'SET_SOURCE_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index d80e208c40790..8d7390aedad1b 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -10,7 +10,7 @@ import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { UnwrapPromise } from '@kbn/utility-types'; import { pipe } from 'fp-ts/lib/pipeable'; -import { +import type { AllActionStates, ReindexSourceToTempState, MarkVersionIndexReady, @@ -32,6 +32,7 @@ import { CreateNewTargetState, CloneTempToSource, SetTempWriteBlock, + WaitForYellowSourceState, } from './types'; import * as Actions from './actions'; import { ElasticsearchClient } from '../../elasticsearch'; @@ -54,6 +55,8 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra return { INIT: (state: InitState) => Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]), + WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) => + Actions.waitForIndexStatusYellow(client, state.sourceIndex), SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) => Actions.setWriteBlock(client, state.sourceIndex.value), CREATE_NEW_TARGET: (state: CreateNewTargetState) => diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index 6f506916aeb21..2e409a6263a5a 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -113,6 +113,13 @@ export type FatalState = BaseState & { readonly reason: string; }; +export interface WaitForYellowSourceState extends BaseState { + /** Wait for the source index to be yellow before requesting it. */ + readonly controlState: 'WAIT_FOR_YELLOW_SOURCE'; + readonly sourceIndex: string; + readonly sourceIndexMappings: IndexMapping; +} + export type SetSourceWriteBlockState = PostInitState & { /** Set a write block on the source index to prevent any further writes */ readonly controlState: 'SET_SOURCE_WRITE_BLOCK'; @@ -275,6 +282,7 @@ export type State = | FatalState | InitState | DoneState + | WaitForYellowSourceState | SetSourceWriteBlockState | CreateNewTargetState | CreateReindexTempState