Skip to content

Commit

Permalink
first pass on code
Browse files Browse the repository at this point in the history
  • Loading branch information
Bamieh committed Jun 4, 2023
1 parent 4a9d3f7 commit f1cff83
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import { pickupUpdatedMappings } from './pickup_updated_mappings';
import { DEFAULT_TIMEOUT } from './constants';


/** @internal */
export interface UpdateAndPickupMappingsResponse {
taskId: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Semver from 'semver';
import type { NodeRoles } from '@kbn/core-node-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { State } from './state';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import {
MAIN_SAVED_OBJECT_INDEX,
Expand Down Expand Up @@ -79,6 +80,7 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly defaultIndexTypesMap: IndexTypesMap;
private readonly nodeRoles: NodeRoles;
public readonly kibanaVersion: string;
public readonly stateStatus$: BehaviorSubject<State | null>;

/**
* Creates an instance of KibanaMigrator.
Expand Down Expand Up @@ -116,6 +118,7 @@ export class KibanaMigrator implements IKibanaMigrator {
this.activeMappings = buildActiveMappings(this.mappingProperties);
this.docLinks = docLinks;
this.defaultIndexTypesMap = defaultIndexTypesMap;
this.stateStatus$ = new BehaviorSubject<State | null>(null);
}

public runMigrations({ rerun = false }: { rerun?: boolean } = {}): Promise<MigrationResult[]> {
Expand Down Expand Up @@ -145,6 +148,10 @@ export class KibanaMigrator implements IKibanaMigrator {
return this.status$.asObservable();
}

public getStateStatus$() {
return this.stateStatus$.asObservable();
}

private async runMigrationsInternal(): Promise<MigrationResult[]> {
const migrationAlgorithm = this.soMigrationsConfig.algorithm;
if (migrationAlgorithm === 'zdt') {
Expand Down Expand Up @@ -225,7 +232,6 @@ export class KibanaMigrator implements IKibanaMigrator {
const doneReindexing = doneReindexingDefers[indexName];
// check if this migrator's index is involved in some document redistribution
const mustRelocateDocuments = !!readyToReindex;

return runResilientMigrator({
client: this.client,
kibanaVersion: this.kibanaVersion,
Expand Down Expand Up @@ -255,6 +261,7 @@ export class KibanaMigrator implements IKibanaMigrator {
migrationsConfig: this.soMigrationsConfig,
typeRegistry: this.typeRegistry,
docLinks: this.docLinks,
stateStatus$: this.stateStatus$,
});
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import * as Option from 'fp-ts/lib/Option';
import * as Rx from 'rxjs';
import { omit } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { Defer } from './kibana_migrator_utils';
Expand Down Expand Up @@ -75,6 +76,7 @@ export const nextActionMap = (
readyToReindex: Defer<void>,
doneReindexing: Defer<void>
) => {

return {
INIT: (state: InitState) =>
Actions.initAction({ client, indices: [state.currentAlias, state.versionAlias] }),
Expand Down Expand Up @@ -240,7 +242,7 @@ export const nextActionMap = (
*/
refresh: false,
}),
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) =>
Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }),
MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) =>
Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }),
Expand Down Expand Up @@ -273,14 +275,16 @@ export const next = (
client: ElasticsearchClient,
transformRawDocs: TransformRawDocs,
readyToReindex: Defer<void>,
doneReindexing: Defer<void>
doneReindexing: Defer<void>,
stateStatus$?: Rx.BehaviorSubject<State | null>,
) => {
const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing);
return (state: State) => {
const delay = createDelayFn(state);

if (state.controlState === 'DONE' || state.controlState === 'FATAL') {
// Return null if we're in one of the terminating states
stateStatus$?.complete();
return null;
} else {
// Otherwise return the delayed action
Expand All @@ -290,6 +294,8 @@ export const next = (
const nextAction = map[state.controlState] as (
state: State
) => ReturnType<typeof map[AllActionStates]>;
stateStatus$?.next(state);

return delay(nextAction(state));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import * as Rx from 'rxjs';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SavedObjectsMigrationVersion } from '@kbn/core-saved-objects-common';
import type { ISavedObjectTypeRegistry } from '@kbn/core-saved-objects-server';
Expand Down Expand Up @@ -63,6 +64,7 @@ export async function runResilientMigrator({
migrationsConfig,
typeRegistry,
docLinks,
stateStatus$,
}: {
client: ElasticsearchClient;
kibanaVersion: string;
Expand All @@ -81,6 +83,7 @@ export async function runResilientMigrator({
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
docLinks: DocLinksServiceStart;
stateStatus$?: Rx.BehaviorSubject<State | null>,
}): Promise<MigrationResult> {
const initialState = createInitialState({
kibanaVersion,
Expand All @@ -101,7 +104,7 @@ export async function runResilientMigrator({
return migrationStateActionMachine({
initialState,
logger,
next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing),
next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing, stateStatus$),
model,
abort: async (state?: State) => {
// At this point, we could reject this migrator's defers and unblock other migrators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,72 @@ describe('split .kibana index into multiple system indices', () => {
await clearLog(logFilePath);
});

describe('failure cases', () => {
it.only('successfully performs the migrations even if a migrator fails', async () => {
esServer = await startElasticsearch({
dataArchive: Path.join(__dirname, '..', 'archives', '7.7.2_xpack_100k_obj.zip'),
});

const updatedTypeRegistry = overrideTypeRegistry(
typeRegistry,
(type: SavedObjectsType<any>) => {
return {
...type,
indexPattern: RELOCATE_TYPES[type.name] ?? MAIN_SAVED_OBJECT_INDEX,
};
}
);

const migratorTestKitFactory = ({ failAfterStep }: { failAfterStep?: string }) =>
getKibanaMigratorTestKit({
types: updatedTypeRegistry.getAllTypes(),
kibanaIndex: '.kibana',
logFilePath,
failAfterStep,
});


let firstRunFail = false;
let secondRunFail = false;

try {
const { runMigrations } = await migratorTestKitFactory({
failAfterStep: 'TRANSFORMED_DOCUMENTS_BULK_INDEX',
});
await runMigrations();
} catch(err) {
console.log('err::', err);
firstRunFail = true;
}

expect(firstRunFail).toBe(true);

try {
const { runMigrations } = await migratorTestKitFactory({});
const results = await runMigrations();
console.log('DONE!!@#!@');
expect(
results
.flat()
.every((result) => result.status === 'migrated' || result.status === 'patched')
).toEqual(true);

} catch(err) {
console.log('err BADDDDD::', err);
secondRunFail = true;
}
expect(secondRunFail).toBe(false);
console.log('COMPLETE RUN!');

});

afterAll(async () => {
await esServer?.stop();
await delay(2);
});
})


describe('when migrating from a legacy version', () => {
let migratorTestKitFactory: () => Promise<KibanaMigratorTestKit>;

Expand Down Expand Up @@ -455,3 +521,9 @@ describe('split .kibana index into multiple system indices', () => {
});
});
});


// fail es at alias change (final step)
// fail at the update target mappings (modifying operations, firs 2 ops)
// fail at any step in the clone target mappings
// update target mappings properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { NodeRoles } from '@kbn/core-node-server';
import { baselineDocuments, baselineTypes } from './kibana_migrator_test_kit.fixtures';
import { delay } from './test_utils';
import { Client } from '@elastic/elasticsearch';

export const defaultLogFilePath = Path.join(__dirname, 'kibana_migrator_test_kit.log');

Expand All @@ -74,6 +75,8 @@ export interface KibanaMigratorTestKitParams {
settings?: Record<string, any>;
types?: Array<SavedObjectsType<any>>;
logFilePath?: string;
esClientProxy?: (client: Client) => Client;
failAfterStep?: string;
}

export interface KibanaMigratorTestKit {
Expand Down Expand Up @@ -123,6 +126,7 @@ export const getEsClient = async ({
return await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
};


export const getKibanaMigratorTestKit = async ({
settings = {},
kibanaIndex = defaultKibanaIndex,
Expand All @@ -131,8 +135,12 @@ export const getKibanaMigratorTestKit = async ({
types = [],
logFilePath = defaultLogFilePath,
nodeRoles = defaultNodeRoles,
failAfterStep,
}: KibanaMigratorTestKitParams = {}): Promise<KibanaMigratorTestKit> => {
let hasRun = false;
let hasComplete = false;
let reachedTargetFailureStep = false
let controlState: string | undefined;
const loggingSystem = new LoggingSystem();
const loggerFactory = loggingSystem.asLoggerFactory();

Expand All @@ -142,7 +150,57 @@ export const getKibanaMigratorTestKit = async ({
const loggingConf = await firstValueFrom(configService.atPath<LoggingConfigType>('logging'));
loggingSystem.upgrade(loggingConf);

const client = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
console.log('creating ES client with failAfterStep: ', failAfterStep);
const proxyClient = (rawClient: Client): Client => {
return new Proxy(rawClient, {
get(target, prop, receiver) {
if (!failAfterStep || !hasRun || hasComplete) {
return Reflect.get(...arguments);
}
if (reachedTargetFailureStep) {
throw new Error('SIMULATING ERROR');
}

console.log('prop::', prop);
console.log('stateStatus::', controlState);

switch (prop) {
case 'child': {
return new Proxy(Reflect.get(...arguments), {
apply(target, thisArg, argumentsList) {
const childClient = rawClient.child(argumentsList[0]);
console.log('reflected proxy child');
return proxyClient(childClient);
},
});
}
case 'indices': {
return new Proxy(Reflect.get(...arguments), {
get(target, prop, receiver) {
console.log('INSIDE CHILD!', hasComplete, prop);
if (!hasRun || hasComplete) {
return Reflect.get(...arguments);
}

if (prop === 'putMapping') {
console.log('putMapping called');
return Reflect.get(...arguments);
}

return Reflect.get(...arguments);
}
})
}
default: {
return Reflect.get(...arguments);
}
}
},
})
}

const rawClient = await getElasticsearchClient(configService, loggerFactory, kibanaVersion);
const client = proxyClient(rawClient);

const typeRegistry = new SavedObjectTypeRegistry();

Expand All @@ -159,6 +217,17 @@ export const getKibanaMigratorTestKit = async ({
kibanaBranch,
nodeRoles
);

if (failAfterStep) {
const stateStatus = migrator.getStateStatus$();
stateStatus.subscribe((result) => {
controlState = result?.controlState;
if (controlState === failAfterStep) {
console.log(`>>>> reachedTargetFailureStep true at ${failAfterStep} <<<<`);
reachedTargetFailureStep = true;
}
})
}

const runMigrations = async () => {
if (hasRun) {
Expand All @@ -169,6 +238,7 @@ export const getKibanaMigratorTestKit = async ({
try {
return await migrator.runMigrations();
} finally {
hasComplete = true;
await loggingSystem.stop();
}
};
Expand Down

0 comments on commit f1cff83

Please sign in to comment.