Skip to content

Commit

Permalink
Single updateAliases() for all migrators when relocating during upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
gsoldevila committed Jun 2, 2023
1 parent f5d37e7 commit b539b1a
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AcknowledgeResponse } from '.';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
Expand Down Expand Up @@ -46,6 +45,9 @@ export interface CreateIndexParams {
aliases?: string[];
timeout?: string;
}

export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists';

/**
* Creates an index with the given mappings
*
Expand All @@ -64,11 +66,11 @@ export const createIndex = ({
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
CreateIndexSuccessResponse
> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError | ClusterShardLimitExceeded,
AcknowledgeResponse
CreateIndexSuccessResponse
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);

Expand Down Expand Up @@ -103,31 +105,12 @@ export const createIndex = ({
},
},
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated on all nodes with the newly created index, but it
* probably will be created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, index creation complete
*/
return Either.right({
acknowledged: Boolean(res.acknowledged),
shardsAcknowledged: res.shards_acknowledged,
});
.then(() => {
return Either.right('create_index_succeeded' as const);
})
.catch((error) => {
if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous create
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
return Either.right('index_already_exists' as const);
} else if (isClusterShardLimitExceeded(error?.body?.error)) {
return Either.left({
type: 'cluster_shard_limit_exceeded' as const,
Expand All @@ -143,11 +126,12 @@ export const createIndex = ({
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
CreateIndexSuccessResponse,
CreateIndexSuccessResponse
>((res) => {
// Systematicaly wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
// When the index status is 'green' we know that all shards were started
// see https://github.com/elastic/kibana/issues/157968
return pipe(
waitForIndexStatus({
Expand All @@ -156,10 +140,7 @@ export const createIndex = ({
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
TaskEither.map(() => res)
);
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,36 @@
* Side Public License, v 1.
*/
import { synchronizeMigrators } from './synchronize_migrators';
import { type Defer, defer } from '../kibana_migrator_utils';
import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils';

describe('synchronizeMigrators', () => {
let defers: Array<Defer<void>>;
let allDefersPromise: Promise<any>;
let migratorsDefers: Array<Defer<void>>;
let waitGroups: Array<WaitGroup<void>>;
let allWaitGroupsPromise: Promise<any>;
let migratorsWaitGroups: Array<WaitGroup<void>>;

beforeEach(() => {
jest.clearAllMocks();

defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer);
allDefersPromise = Promise.all(defers.map(({ promise }) => promise));
waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup);
allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise));

migratorsDefers = defers.map(({ resolve, reject }) => ({
migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({
resolve: jest.fn(resolve),
reject: jest.fn(reject),
promise: allDefersPromise,
promise: allWaitGroupsPromise,
}));
});

describe('when all migrators reach the synchronization point with a correct state', () => {
it('unblocks all migrators and resolves Right', async () => {
const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer));
const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup }));

const res = await Promise.all(tasks.map((task) => task()));

migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.resolve).toHaveBeenCalledTimes(1)
);
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.reject).not.toHaveBeenCalled()
migratorsWaitGroups.forEach((waitGroup) =>
expect(waitGroup.resolve).toHaveBeenCalledTimes(1)
);
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());

expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
Expand All @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => {

it('migrators are not unblocked until the last one reaches the synchronization point', async () => {
let resolved: number = 0;
migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we simulate that only kibana_task_manager and kibana migrators get to the sync point
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we don't await for them, or we would be locked forever
Promise.all(tasks.map((task) => task()));

Expand All @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => {
expect(resolved).toEqual(0);

// finally, the last migrator gets to the synchronization point
await synchronizeMigrators(casesDefer)();
await synchronizeMigrators({ waitGroup: casesDefer })();
expect(resolved).toEqual(3);
});
});
Expand All @@ -75,18 +71,16 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we first make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');

// the other migrators then try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([
{
Expand Down Expand Up @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// some migrators try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

// we then make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,31 @@

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Defer } from '../kibana_migrator_utils';
import type { WaitGroup } from '../kibana_migrator_utils';

export interface SyncFailed {
type: 'sync_failed';
error: Error;
}

export function synchronizeMigrators(
defer: Defer<void>
): TaskEither.TaskEither<SyncFailed, 'synchronized_successfully'> {
export interface SynchronizeMigratorsParams<T, U> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}

export function synchronizeMigrators<T, U>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SyncFailed, U> {
return () => {
defer.resolve();
return defer.promise
.then(() => Either.right('synchronized_successfully' as const))
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.catch((error) => Either.left({ type: 'sync_failed' as const, error }));
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export interface UpdateAliasesParams {
aliasActions: AliasAction[];
timeout?: string;
}

/** @internal */
export type UpdateAliasesReturnType = TaskEither.TaskEither<
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
>;

/**
* Calls the Update index alias API `_alias` with the provided alias actions.
*/
Expand All @@ -45,11 +52,9 @@ export const updateAliases =
client,
aliasActions,
timeout = DEFAULT_TIMEOUT,
}: UpdateAliasesParams): TaskEither.TaskEither<
IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError,
'update_aliases_succeeded'
> =>
}: UpdateAliasesParams): UpdateAliasesReturnType =>
() => {
if (!aliasActions || !aliasActions.length) throw Error('updating NO aliases!');
return client.indices
.updateAliases({
actions: aliasActions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import {
calculateTypeStatuses,
createMultiPromiseDefer,
createWaitGroupMap,
getCurrentIndexTypesMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures';

describe('createMultiPromiseDefer', () => {
describe('createWaitGroupMap', () => {
it('creates defer objects with the same Promise', () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
expect(Object.keys(defers)).toHaveLength(2);
expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise);
expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve);
expect(defers['.kibana'].reject).not.toEqual(defers['.kibana_cases'].reject);
});

it('the common Promise resolves when all defers resolve', async () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
let resolved = 0;
Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved));
defers['.kibana'].resolve();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,35 @@ import { TypeStatus, type TypeStatusDetails } from './kibana_migrator_constants'

// even though this utility class is present in @kbn/kibana-utils-plugin, we can't easily import it from Core
// aka. one does not simply reuse code
export class Defer<T> {
public resolve!: (data: T) => void;
class Defer<T> {
public resolve!: (data?: T) => void;
public reject!: (error: any) => void;
public promise: Promise<any> = new Promise<any>((resolve, reject) => {
(this as any).resolve = resolve;
(this as any).reject = reject;
});
}

export const defer = () => new Defer<void>();
export type WaitGroup<T> = Defer<T>;

export function createMultiPromiseDefer(indices: string[]): Record<string, Defer<void>> {
const defers: Array<Defer<void>> = indices.map(defer);
const all = Promise.all(defers.map(({ promise }) => promise));
return indices.reduce<Record<string, Defer<any>>>((acc, indexName, i) => {
export function waitGroup<T>(): WaitGroup<T> {
return new Defer<T>();
}

export function createWaitGroupMap<T, U>(
keys: string[],
thenHook: (res: T[]) => U = (res) => res as unknown as U
): Record<string, WaitGroup<T>> {
if (!keys?.length) {
return {};
}

const defers: Array<WaitGroup<T>> = keys.map(() => waitGroup<T>());

// every member of the WaitGroup will wait for all members to resolve
const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook);

return keys.reduce<Record<string, WaitGroup<T>>>((acc, indexName, i) => {
const { resolve, reject } = defers[i];
acc[indexName] = { resolve, reject, promise: all };
return acc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2970,10 +2970,10 @@ describe('migrations v2 model', () => {
sourceIndex: Option.none as Option.None,
targetIndex: '.kibana_7.11.0_001',
};
test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY', () => {
test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS', () => {
const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded');
const newState = model(createNewTargetState, res);
expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY');
expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
Expand All @@ -2987,7 +2987,7 @@ describe('migrations v2 model', () => {
expect(newState.retryCount).toEqual(1);
expect(newState.retryDelay).toEqual(2000);
});
test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY resets the retry count and delay', () => {
test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS resets the retry count and delay', () => {
const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded');
const testState = {
...createNewTargetState,
Expand All @@ -2996,7 +2996,7 @@ describe('migrations v2 model', () => {
};

const newState = model(testState, res);
expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY');
expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS');
expect(newState.retryCount).toEqual(0);
expect(newState.retryDelay).toEqual(0);
});
Expand Down
Loading

0 comments on commit b539b1a

Please sign in to comment.