Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Migrations] Update all aliases with a single updateAliases() when relocating SO documents #158940

Merged
merged 9 commits into from
Jun 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AcknowledgeResponse } from '.';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
Expand Down Expand Up @@ -46,6 +45,9 @@ export interface CreateIndexParams {
aliases?: string[];
timeout?: string;
}

export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists';

/**
* Creates an index with the given mappings
*
Expand All @@ -64,11 +66,11 @@ export const createIndex = ({
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
CreateIndexSuccessResponse
> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError | ClusterShardLimitExceeded,
AcknowledgeResponse
CreateIndexSuccessResponse
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);

Expand Down Expand Up @@ -103,31 +105,12 @@ export const createIndex = ({
},
},
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated on all nodes with the newly created index, but it
* probably will be created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, index creation complete
*/
return Either.right({
acknowledged: Boolean(res.acknowledged),
shardsAcknowledged: res.shards_acknowledged,
});
.then(() => {
return Either.right('create_index_succeeded' as const);
})
.catch((error) => {
if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous create
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
return Either.right('index_already_exists' as const);
} else if (isClusterShardLimitExceeded(error?.body?.error)) {
return Either.left({
type: 'cluster_shard_limit_exceeded' as const,
Expand All @@ -143,11 +126,12 @@ export const createIndex = ({
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
CreateIndexSuccessResponse,
CreateIndexSuccessResponse
>((res) => {
// Systematicaly wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
// When the index status is 'green' we know that all shards were started
// see https://github.com/elastic/kibana/issues/157968
return pipe(
waitForIndexStatus({
Expand All @@ -156,10 +140,7 @@ export const createIndex = ({
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
TaskEither.map(() => res)
);
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ export {

import type { UnknownDocsFound } from './check_for_unknown_docs';
import type { IncompatibleClusterRoutingAllocation } from './initialize_action';
import { ClusterShardLimitExceeded } from './create_index';
import type { ClusterShardLimitExceeded } from './create_index';
import type { SynchronizationFailed } from './synchronize_migrators';

export type {
CheckForUnknownDocsParams,
Expand Down Expand Up @@ -174,6 +175,7 @@ export interface ActionErrorTypeMap {
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
synchronization_failed: SynchronizationFailed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,36 @@
* Side Public License, v 1.
*/
import { synchronizeMigrators } from './synchronize_migrators';
import { type Defer, defer } from '../kibana_migrator_utils';
import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils';

describe('synchronizeMigrators', () => {
let defers: Array<Defer<void>>;
let allDefersPromise: Promise<any>;
let migratorsDefers: Array<Defer<void>>;
let waitGroups: Array<WaitGroup<void>>;
let allWaitGroupsPromise: Promise<any>;
let migratorsWaitGroups: Array<WaitGroup<void>>;

beforeEach(() => {
jest.clearAllMocks();

defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer);
allDefersPromise = Promise.all(defers.map(({ promise }) => promise));
waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup);
allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise));

migratorsDefers = defers.map(({ resolve, reject }) => ({
migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({
resolve: jest.fn(resolve),
reject: jest.fn(reject),
promise: allDefersPromise,
promise: allWaitGroupsPromise,
}));
});

describe('when all migrators reach the synchronization point with a correct state', () => {
it('unblocks all migrators and resolves Right', async () => {
const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer));
const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup }));

const res = await Promise.all(tasks.map((task) => task()));

migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.resolve).toHaveBeenCalledTimes(1)
);
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.reject).not.toHaveBeenCalled()
migratorsWaitGroups.forEach((waitGroup) =>
expect(waitGroup.resolve).toHaveBeenCalledTimes(1)
);
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());

expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
Expand All @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => {

it('migrators are not unblocked until the last one reaches the synchronization point', async () => {
let resolved: number = 0;
migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we simulate that only kibana_task_manager and kibana migrators get to the sync point
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we don't await for them, or we would be locked forever
Promise.all(tasks.map((task) => task()));

Expand All @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => {
expect(resolved).toEqual(0);

// finally, the last migrator gets to the synchronization point
await synchronizeMigrators(casesDefer)();
await synchronizeMigrators({ waitGroup: casesDefer })();
expect(resolved).toEqual(3);
});
});
Expand All @@ -75,31 +71,29 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we first make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');

// the other migrators then try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand All @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// some migrators try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

// we then make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
Expand All @@ -133,14 +125,14 @@ describe('synchronizeMigrators', () => {
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Defer } from '../kibana_migrator_utils';
import type { WaitGroup } from '../kibana_migrator_utils';

export interface SyncFailed {
type: 'sync_failed';
/** @internal */
export interface SynchronizationFailed {
type: 'synchronization_failed';
error: Error;
}

export function synchronizeMigrators(
defer: Defer<void>
): TaskEither.TaskEither<SyncFailed, 'synchronized_successfully'> {
/** @internal */
export interface SynchronizeMigratorsParams<T, U> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}

export function synchronizeMigrators<T, U>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SynchronizationFailed, U> {
return () => {
defer.resolve();
return defer.promise
.then(() => Either.right('synchronized_successfully' as const))
.catch((error) => Either.left({ type: 'sync_failed' as const, error }));
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.catch((error) => Either.left({ type: 'synchronization_failed' as const, error }));
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export interface UpdateAliasesParams {
aliasActions: AliasAction[];
timeout?: string;
}

/** @internal */
export type UpdateAliasesReturnType = TaskEither.TaskEither<
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
>;

/**
* Calls the Update index alias API `_alias` with the provided alias actions.
*/
Expand All @@ -45,11 +52,9 @@ export const updateAliases =
client,
aliasActions,
timeout = DEFAULT_TIMEOUT,
}: UpdateAliasesParams): TaskEither.TaskEither<
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
> =>
}: UpdateAliasesParams): UpdateAliasesReturnType =>
() => {
if (!aliasActions || !aliasActions.length) throw Error('updating NO aliases!');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that was a debug statement, removing!

return client.indices
.updateAliases({
actions: aliasActions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import {
calculateTypeStatuses,
createMultiPromiseDefer,
createWaitGroupMap,
getCurrentIndexTypesMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures';

describe('createMultiPromiseDefer', () => {
describe('createWaitGroupMap', () => {
it('creates defer objects with the same Promise', () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
expect(Object.keys(defers)).toHaveLength(2);
expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise);
expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve);
expect(defers['.kibana'].reject).not.toEqual(defers['.kibana_cases'].reject);
});

it('the common Promise resolves when all defers resolve', async () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
let resolved = 0;
Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved));
defers['.kibana'].resolve();
Expand Down
Loading