Skip to content

Commit

Permalink
[7.12] Migration v2 waits for yellow cluster (#96788) (#96888)
Browse files Browse the repository at this point in the history
* Migration v2 waits for yellow cluster (#96788)

* migrator waits for source index to be yellow

otherwise the next  request to Elasticsearch can fail

* unskip integration tests that failed due to a red cluster

* log how much the every step lasts

* use Date.now instead of performance.now migration cannot finish in ms

* update tests

* clean log file before running tests

* fix wrong type

* add an integration test for waitForIndexStatusYellow
# Conflicts:
#	src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts

* remove tests using so from v8

* skip as on master :(
  • Loading branch information
mshustov authored Apr 13, 2021
1 parent e3369df commit f078844
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 181 deletions.
4 changes: 2 additions & 2 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ export const removeWriteBlock = (
* yellow at any point in the future. So ultimately data-redundancy is up to
* users to maintain.
*/
const waitForIndexStatusYellow = (
export const waitForIndexStatusYellow = (
client: ElasticsearchClient,
index: string,
timeout: string
timeout = DEFAULT_TIMEOUT
): TaskEither.TaskEither<RetryableEsClientError, {}> => () => {
return client.cluster
.health({ index, wait_for_status: 'yellow', timeout })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
UpdateAndPickupMappingsResponse,
verifyReindex,
removeWriteBlock,
waitForIndexStatusYellow,
} from '../actions';
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
Expand Down Expand Up @@ -207,6 +208,51 @@ describe('migration actions', () => {
});
});

describe('waitForIndexStatusYellow', () => {
afterAll(async () => {
await client.indices.delete({ index: 'red_then_yellow_index' });
});
it('resolves right after waiting for an index status to be yellow if the index already existed', async () => {
// Create a red index
await client.indices.create(
{
index: 'red_then_yellow_index',
timeout: '5s',
body: {
mappings: { properties: {} },
settings: {
// Allocate 1 replica so that this index stays yellow
number_of_replicas: '1',
// Disable all shard allocation so that the index status is red
index: { routing: { allocation: { enable: 'none' } } },
},
},
},
{ maxRetries: 0 /** handle retry ourselves for now */ }
);

// Start tracking the index status
const indexStatusPromise = waitForIndexStatusYellow(client, 'red_then_yellow_index')();

const redStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' });
expect(redStatusResponse.body.status).toBe('red');

client.indices.putSettings({
index: 'red_then_yellow_index',
body: {
// Enable all shard allocation so that the index status turns yellow
index: { routing: { allocation: { enable: 'all' } } },
},
});

await indexStatusPromise;
// Assert that the promise didn't resolve before the index became yellow

const yellowStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' });
expect(yellowStatusResponse.body.status).toBe('yellow');
});
});

describe('cloneIndex', () => {
afterAll(async () => {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
* Side Public License, v 1.
*/

import { join } from 'path';
import Path from 'path';
import Fs from 'fs';
import Util from 'util';
import Semver from 'semver';
import { REPO_ROOT } from '@kbn/dev-utils';
import { Env } from '@kbn/config';
Expand All @@ -19,6 +21,14 @@ import { Root } from '../../../root';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;

const logFilePath = Path.join(__dirname, 'migration_test_kibana.log');

const asyncUnlink = Util.promisify(Fs.unlink);
async function removeLogFile() {
// ignore errors if it doesn't exist
await asyncUnlink(logFilePath).catch(() => void 0);
}

describe('migration v2', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
let root: Root;
Expand Down Expand Up @@ -46,7 +56,7 @@ describe('migration v2', () => {
appenders: {
file: {
type: 'file',
fileName: join(__dirname, 'migration_test_kibana.log'),
fileName: logFilePath,
layout: {
type: 'json',
},
Expand Down Expand Up @@ -121,9 +131,10 @@ describe('migration v2', () => {
const migratedIndex = `.kibana_${kibanaVersion}_001`;

beforeAll(async () => {
await removeLogFile();
await startServers({
oss: false,
dataArchive: join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'),
dataArchive: Path.join(__dirname, 'archives', '7.3.0_xpack_sample_saved_objects.zip'),
});
});

Expand Down Expand Up @@ -162,7 +173,9 @@ describe('migration v2', () => {
const expectedVersions = getExpectedVersionPerType();
const res = await esClient.search({
index: migratedIndex,
sort: ['_doc'],
body: {
sort: ['_doc'],
},
size: 10000,
});
const allDocuments = res.body.hits.hits as SavedObjectsRawDoc[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
* Side Public License, v 1.
*/

import { join } from 'path';
import Path from 'path';
import Fs from 'fs';
import Util from 'util';
import { REPO_ROOT } from '@kbn/dev-utils';
import { Env } from '@kbn/config';
import { getEnvOptions } from '@kbn/config/target/mocks';
Expand All @@ -16,6 +18,13 @@ import { InternalCoreStart } from '../../../internal_types';
import { Root } from '../../../root';

const kibanaVersion = Env.createDefault(REPO_ROOT, getEnvOptions()).packageInfo.version;
const logFilePath = Path.join(__dirname, 'migration_test_kibana.log');

const asyncUnlink = Util.promisify(Fs.unlink);
async function removeLogFile() {
// ignore errors if it doesn't exist
await asyncUnlink(logFilePath).catch(() => void 0);
}

describe.skip('migration from 7.7.2-xpack with 100k objects', () => {
let esServer: kbnTestServer.TestElasticsearchUtils;
Expand Down Expand Up @@ -48,7 +57,7 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => {
appenders: {
file: {
type: 'file',
fileName: join(__dirname, 'migration_test_kibana.log'),
fileName: logFilePath,
layout: {
type: 'json',
},
Expand Down Expand Up @@ -93,9 +102,10 @@ describe.skip('migration from 7.7.2-xpack with 100k objects', () => {
const migratedIndex = `.kibana_${kibanaVersion}_001`;

beforeAll(async () => {
await removeLogFile();
await startServers({
oss: false,
dataArchive: join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'),
dataArchive: Path.join(__dirname, 'archives', '7.7.2_xpack_100k_obj.zip'),
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { elasticsearchClientMock } from '../../elasticsearch/client/mocks';

describe('migrationsStateActionMachine', () => {
beforeAll(() => {
jest
.spyOn(global.Date, 'now')
.mockImplementation(() => new Date('2021-04-12T16:00:00.000Z').valueOf());
});
beforeEach(() => {
jest.clearAllMocks();
});
Expand Down Expand Up @@ -112,25 +117,25 @@ describe('migrationsStateActionMachine', () => {
"[.my-so-index] Log from LEGACY_REINDEX control state",
],
Array [
"[.my-so-index] INIT -> LEGACY_REINDEX",
"[.my-so-index] INIT -> LEGACY_REINDEX. took: 0ms.",
],
Array [
"[.my-so-index] Log from LEGACY_DELETE control state",
],
Array [
"[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE",
"[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE. took: 0ms.",
],
Array [
"[.my-so-index] Log from LEGACY_DELETE control state",
],
Array [
"[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE",
"[.my-so-index] LEGACY_DELETE -> LEGACY_DELETE. took: 0ms.",
],
Array [
"[.my-so-index] Log from DONE control state",
],
Array [
"[.my-so-index] LEGACY_DELETE -> DONE",
"[.my-so-index] LEGACY_DELETE -> DONE. took: 0ms.",
],
],
"log": Array [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import { errors as EsErrors } from '@elastic/elasticsearch';
import * as Option from 'fp-ts/lib/Option';
import { performance } from 'perf_hooks';
import { Logger, LogMeta } from '../../logging';
import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs';
import { Model, Next, stateActionMachine } from './state_action_machine';
Expand All @@ -32,15 +31,18 @@ const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
oldState: State,
newState: State
newState: State,
tookMs: number
) => {
if (newState.logs.length > oldState.logs.length) {
newState.logs
.slice(oldState.logs.length)
.forEach((log) => logger[log.level](logMessagePrefix + log.message));
}

logger.info(logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}`);
logger.info(
logMessagePrefix + `${oldState.controlState} -> ${newState.controlState}. took: ${tookMs}ms.`
);
};

const logActionResponse = (
Expand Down Expand Up @@ -85,11 +87,12 @@ export async function migrationStateActionMachine({
model: Model<State>;
}) {
const executionLog: ExecutionLog = [];
const starteTime = performance.now();
const startTime = Date.now();
// Since saved object index names usually start with a `.` and can be
// configured by users to include several `.`'s we can't use a logger tag to
// indicate which messages come from which index upgrade.
const logMessagePrefix = `[${initialState.indexPrefix}] `;
let prevTimestamp = startTime;
try {
const finalState = await stateActionMachine<State>(
initialState,
Expand All @@ -116,12 +119,20 @@ export async function migrationStateActionMachine({
controlState: newState.controlState,
prevControlState: state.controlState,
});
logStateTransition(logger, logMessagePrefix, state, redactedNewState as State);
const now = Date.now();
logStateTransition(
logger,
logMessagePrefix,
state,
redactedNewState as State,
now - prevTimestamp
);
prevTimestamp = now;
return newState;
}
);

const elapsedMs = performance.now() - starteTime;
const elapsedMs = Date.now() - startTime;
if (finalState.controlState === 'DONE') {
logger.info(logMessagePrefix + `Migration completed after ${Math.round(elapsedMs)}ms`);
if (finalState.sourceIndex != null && Option.isSome(finalState.sourceIndex)) {
Expand Down
Loading

0 comments on commit f078844

Please sign in to comment.