Skip to content

Commit

Permalink
Handle index not found during bulk index action (#108544)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
rudolf and kibanamachine authored Aug 16, 2021
1 parent c6c24e4 commit b96648c
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 23 deletions.
69 changes: 65 additions & 4 deletions src/core/server/saved_objects/migrationsv2/README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,71 @@
- [Introduction](#introduction)
- [Algorithm steps](#algorithm-steps)
- [INIT](#init)
- [Next action](#next-action)
- [New control state](#new-control-state)
- [CREATE_NEW_TARGET](#create_new_target)
- [Next action](#next-action-1)
- [New control state](#new-control-state-1)
- [LEGACY_SET_WRITE_BLOCK](#legacy_set_write_block)
- [Next action](#next-action-2)
- [New control state](#new-control-state-2)
- [LEGACY_CREATE_REINDEX_TARGET](#legacy_create_reindex_target)
- [Next action](#next-action-3)
- [New control state](#new-control-state-3)
- [LEGACY_REINDEX](#legacy_reindex)
- [Next action](#next-action-4)
- [New control state](#new-control-state-4)
- [LEGACY_REINDEX_WAIT_FOR_TASK](#legacy_reindex_wait_for_task)
- [Next action](#next-action-5)
- [New control state](#new-control-state-5)
- [LEGACY_DELETE](#legacy_delete)
- [Next action](#next-action-6)
- [New control state](#new-control-state-6)
- [WAIT_FOR_YELLOW_SOURCE](#wait_for_yellow_source)
- [Next action](#next-action-7)
- [New control state](#new-control-state-7)
- [SET_SOURCE_WRITE_BLOCK](#set_source_write_block)
- [Next action](#next-action-8)
- [New control state](#new-control-state-8)
- [CREATE_REINDEX_TEMP](#create_reindex_temp)
- [Next action](#next-action-9)
- [New control state](#new-control-state-9)
- [REINDEX_SOURCE_TO_TEMP_OPEN_PIT](#reindex_source_to_temp_open_pit)
- [Next action](#next-action-10)
- [New control state](#new-control-state-10)
- [REINDEX_SOURCE_TO_TEMP_READ](#reindex_source_to_temp_read)
- [Next action](#next-action-11)
- [New control state](#new-control-state-11)
- [REINDEX_SOURCE_TO_TEMP_INDEX](#reindex_source_to_temp_index)
- [Next action](#next-action-12)
- [New control state](#new-control-state-12)
- [REINDEX_SOURCE_TO_TEMP_INDEX_BULK](#reindex_source_to_temp_index_bulk)
- [Next action](#next-action-13)
- [New control state](#new-control-state-13)
- [REINDEX_SOURCE_TO_TEMP_CLOSE_PIT](#reindex_source_to_temp_close_pit)
- [Next action](#next-action-14)
- [New control state](#new-control-state-14)
- [SET_TEMP_WRITE_BLOCK](#set_temp_write_block)
- [Next action](#next-action-15)
- [New control state](#new-control-state-15)
- [CLONE_TEMP_TO_TARGET](#clone_temp_to_target)
- [Next action](#next-action-16)
- [New control state](#new-control-state-16)
- [OUTDATED_DOCUMENTS_SEARCH](#outdated_documents_search)
- [Next action](#next-action-17)
- [New control state](#new-control-state-17)
- [OUTDATED_DOCUMENTS_TRANSFORM](#outdated_documents_transform)
- [Next action](#next-action-18)
- [New control state](#new-control-state-18)
- [UPDATE_TARGET_MAPPINGS](#update_target_mappings)
- [Next action](#next-action-19)
- [New control state](#new-control-state-19)
- [UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK](#update_target_mappings_wait_for_task)
- [Next action](#next-action-20)
- [New control state](#new-control-state-20)
- [MARK_VERSION_INDEX_READY_CONFLICT](#mark_version_index_ready_conflict)
- [Next action](#next-action-21)
- [New control state](#new-control-state-21)
- [Manual QA Test Plan](#manual-qa-test-plan)
- [1. Legacy pre-migration](#1-legacy-pre-migration)
- [2. Plugins enabled/disabled](#2-plugins-enableddisabled)
Expand Down Expand Up @@ -245,15 +290,31 @@ Read the next batch of outdated documents from the source index by using search

## REINDEX_SOURCE_TO_TEMP_INDEX
### Next action
`transformRawDocs` + `bulkIndexTransformedDocuments`

1. Transform the current batch of documents
2. Use the bulk API create action to write a batch of up-to-date documents. The create action ensures that there will be only one write per reindexed document even if multiple Kibana instances are performing this step. Ignore any create errors because of documents that already exist in the temporary index. Use `refresh=false` to speed up the create actions, the `UPDATE_TARGET_MAPPINGS` step will ensure that the index is refreshed before we start serving traffic.
`transformRawDocs`

Transform the current batch of documents

In order to support sharing saved objects to multiple spaces in 8.0, the
transforms will also regenerate document `_id`'s. To ensure that this step
remains idempotent, the new `_id` is deterministically generated using UUIDv5
ensuring that each Kibana instance generates the same new `_id` for the same document.
### New control state
`REINDEX_SOURCE_TO_TEMP_INDEX_BULK`
## REINDEX_SOURCE_TO_TEMP_INDEX_BULK
### Next action
`bulkIndexTransformedDocuments`

Use the bulk API create action to write a batch of up-to-date documents. The
create action ensures that there will be only one write per reindexed document
even if multiple Kibana instances are performing this step. Use
`refresh=false` to speed up the create actions, the `UPDATE_TARGET_MAPPINGS`
step will ensure that the index is refreshed before we start serving traffic.

The following errors are ignored because it means another instance already
completed this step:
- documents already exist in the temp index
- temp index has a write block
- temp index is not found
### New control state
`REINDEX_SOURCE_TO_TEMP_READ`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { isWriteBlockException } from './es_errors';
import { isWriteBlockException, isIndexNotFoundException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock, RequestEntityTooLargeException } from './index';
import type {
TargetIndexHadWriteBlock,
RequestEntityTooLargeException,
IndexNotFound,
} from './index';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
Expand All @@ -37,7 +41,10 @@ export const bulkOverwriteTransformedDocuments = ({
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError | TargetIndexHadWriteBlock | RequestEntityTooLargeException,
| RetryableEsClientError
| TargetIndexHadWriteBlock
| IndexNotFound
| RequestEntityTooLargeException,
'bulk_index_succeeded'
> => () => {
return client
Expand Down Expand Up @@ -87,6 +94,12 @@ export const bulkOverwriteTransformedDocuments = ({
type: 'target_index_had_write_block' as const,
});
}
if (errors.every(isIndexNotFoundException)) {
return Either.left({
type: 'index_not_found_exception' as const,
index,
});
}
throw new Error(JSON.stringify(errors));
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean =
export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => {
return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';
};

export const isIndexNotFoundException = ({ type }: EsErrorCause): boolean => {
return type === 'index_not_found_exception';
};
40 changes: 30 additions & 10 deletions src/core/server/saved_objects/migrationsv2/model/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,16 @@ describe('migrations v2 model', () => {
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if response is left index_not_found_exception', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({
type: 'index_not_found_exception',
index: 'the_temp_index',
});
const newState = model(reindexSourceToTempIndexBulkState, res);
expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
test('REINDEX_SOURCE_TO_TEMP_INDEX_BULK -> FATAL if action returns left request_entity_too_large_exception', () => {
const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX_BULK'> = Either.left({
type: 'request_entity_too_large_exception',
Expand Down Expand Up @@ -1529,18 +1539,28 @@ describe('migrations v2 model', () => {
hasTransformedDocs: false,
progress: createInitialProgress(),
};
test('TRANSFORMED_DOCUMENTS_BULK_INDEX should throw a throwBadResponse error if action failed', () => {

test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left index_not_found_exception', () => {
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({
type: 'retryable_es_client_error',
message: 'random documents bulk index error',
type: 'index_not_found_exception',
index: 'the_target_index',
});
const newState = model(
transformedDocumentsBulkIndexState,
res
) as TransformedDocumentsBulkIndex;
expect(newState.controlState).toEqual('TRANSFORMED_DOCUMENTS_BULK_INDEX');
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
expect(() =>
model(transformedDocumentsBulkIndexState, res)
).toThrowErrorMatchingInlineSnapshot(
`"TRANSFORMED_DOCUMENTS_BULK_INDEX received unexpected action response: {\\"type\\":\\"index_not_found_exception\\",\\"index\\":\\"the_target_index\\"}"`
);
});

test('TRANSFORMED_DOCUMENTS_BULK_INDEX throws if action returns left target_index_had_write_block', () => {
const res: ResponseType<'TRANSFORMED_DOCUMENTS_BULK_INDEX'> = Either.left({
type: 'target_index_had_write_block',
});
expect(() =>
model(transformedDocumentsBulkIndexState, res)
).toThrowErrorMatchingInlineSnapshot(
`"TRANSFORMED_DOCUMENTS_BULK_INDEX received unexpected action response: {\\"type\\":\\"target_index_had_write_block\\"}"`
);
});

test('TRANSFORMED_DOCUMENTS_BULK_INDEX -> FATAL if action returns left request_entity_too_large_exception', () => {
Expand Down
20 changes: 14 additions & 6 deletions src/core/server/saved_objects/migrationsv2/model/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
transformErrors: [],
};
} else {
if (isLeftTypeof(res.left, 'target_index_had_write_block')) {
// the temp index has a write block, meaning that another instance already finished and moved forward.
// close the PIT search and carry on with the happy path.
if (
isLeftTypeof(res.left, 'target_index_had_write_block') ||
isLeftTypeof(res.left, 'index_not_found_exception')
) {
// When the temp index has a write block or has been deleted another
// instance already completed this step. Close the PIT search and carry
// on with the happy path.
return {
...stateP,
controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT',
Expand Down Expand Up @@ -721,9 +725,13 @@ export const model = (currentState: State, resW: ResponseType<AllActionStates>):
controlState: 'FATAL',
reason: `While indexing a batch of saved objects, Elasticsearch returned a 413 Request Entity Too Large exception. Try to use smaller batches by changing the Kibana 'migrations.batchSize' configuration option and restarting Kibana.`,
};
} else if (isLeftTypeof(res.left, 'target_index_had_write_block')) {
// we fail on this error since the target index will only have a write
// block if a newer version of Kibana started an upgrade
} else if (
isLeftTypeof(res.left, 'target_index_had_write_block') ||
isLeftTypeof(res.left, 'index_not_found_exception')
) {
// we fail on these errors since the target index will never get
// deleted and should only have a write block if a newer version of
// Kibana started an upgrade
throwBadResponse(stateP, res.left as never);
} else {
throwBadResponse(stateP, res.left);
Expand Down

0 comments on commit b96648c

Please sign in to comment.