Skip to content

Commit

Permalink
Merge branch '7.x' into backport/7.x/pr-95767
Browse files Browse the repository at this point in the history
  • Loading branch information
kibanamachine authored Apr 7, 2021
2 parents 8a7f393 + 0e3943d commit 6f1ed5e
Show file tree
Hide file tree
Showing 142 changed files with 4,598 additions and 763 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"dependencies": {
"@elastic/apm-rum": "^5.6.1",
"@elastic/apm-rum-react": "^1.2.5",
"@elastic/charts": "26.1.0",
"@elastic/charts": "28.0.0",
"@elastic/datemath": "link:bazel-bin/packages/elastic-datemath/npm_module",
"@elastic/elasticsearch": "npm:@elastic/[email protected]",
"@elastic/ems-client": "7.12.0",
Expand Down Expand Up @@ -441,7 +441,7 @@
"@bazel/ibazel": "^0.14.0",
"@bazel/typescript": "^3.2.3",
"@cypress/snapshot": "^2.1.7",
"@cypress/webpack-preprocessor": "^5.5.0",
"@cypress/webpack-preprocessor": "^5.6.0",
"@elastic/apm-rum": "^5.6.1",
"@elastic/apm-rum-react": "^1.2.5",
"@elastic/eslint-config-kibana": "link:packages/elastic-eslint-config-kibana",
Expand Down Expand Up @@ -679,7 +679,7 @@
"copy-webpack-plugin": "^6.0.2",
"cpy": "^8.1.1",
"css-loader": "^3.4.2",
"cypress": "^6.2.1",
"cypress": "^6.8.0",
"cypress-cucumber-preprocessor": "^2.5.2",
"cypress-multi-reporters": "^1.4.0",
"cypress-pipe": "^2.0.0",
Expand Down
5 changes: 5 additions & 0 deletions src/core/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ export async function bootstrap({ configs, cliArgs, applyConfigOverrides }: Boot
try {
await root.setup();
await root.start();

// notify parent process know when we are ready for dev mode.
if (process.send) {
process.send(['SERVER_LISTENING']);
}
} catch (err) {
await shutdown(err);
}
Expand Down
30 changes: 29 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ describe('actions', () => {

describe('searchForOutdatedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.searchForOutdatedDocuments(client, 'new_index', { properties: {} });
const task = Actions.searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'new_index',
outdatedDocumentsQuery: {},
});

try {
await task();
} catch (e) {
Expand All @@ -172,6 +177,29 @@ describe('actions', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('configures request according to given parameters', async () => {
const esClient = elasticsearchClientMock.createInternalClient();
const query = {};
const targetIndex = 'new_index';
const batchSize = 1000;
const task = Actions.searchForOutdatedDocuments(esClient, {
batchSize,
targetIndex,
outdatedDocumentsQuery: query,
});

await task();

expect(esClient.search).toHaveBeenCalledTimes(1);
expect(esClient.search).toHaveBeenCalledWith(
expect.objectContaining({
index: targetIndex,
size: batchSize,
body: expect.objectContaining({ query }),
})
);
});
});

describe('bulkOverwriteTransformedDocuments', () => {
Expand Down
30 changes: 16 additions & 14 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Option from 'fp-ts/lib/Option';
import { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import type { estypes } from '@elastic/elasticsearch';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { ElasticsearchClientError, ResponseError } from '@elastic/elasticsearch/lib/errors';
import { pipe } from 'fp-ts/lib/pipeable';
import { flow } from 'fp-ts/lib/function';
import type { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization';
Expand All @@ -24,13 +24,10 @@ import {
export type { RetryableEsClientError };

/**
* Batch size for updateByQuery, reindex & search operations. Smaller batches
* reduce the memory pressure on Elasticsearch and Kibana so are less likely
* to cause failures.
* TODO (profile/tune): How much smaller can we make this number before it
* starts impacting how long migrations take to perform?
* Batch size for updateByQuery and reindex operations.
* Uses the default value of 1000 for Elasticsearch reindex operation.
*/
const BATCH_SIZE = 1000;
const BATCH_SIZE = 1_000;
const DEFAULT_TIMEOUT = '60s';
/** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */
const INDEX_AUTO_EXPAND_REPLICAS = '0-1';
Expand Down Expand Up @@ -839,25 +836,30 @@ export interface SearchResponse {
outdatedDocuments: SavedObjectsRawDoc[];
}

interface SearchForOutdatedDocumentsOptions {
batchSize: number;
targetIndex: string;
outdatedDocumentsQuery?: estypes.QueryContainer;
}

/**
* Search for outdated saved object documents with the provided query. Will
* return one batch of documents. Searching should be repeated until no more
* outdated documents can be found.
*/
export const searchForOutdatedDocuments = (
client: ElasticsearchClient,
index: string,
query: Record<string, unknown>
options: SearchForOutdatedDocumentsOptions
): TaskEither.TaskEither<RetryableEsClientError, SearchResponse> => () => {
return client
.search<SavedObjectsRawDocSource>({
index,
index: options.targetIndex,
// Return the _seq_no and _primary_term so we can use optimistic
// concurrency control for updates
seq_no_primary_term: true,
size: BATCH_SIZE,
size: options.batchSize,
body: {
query,
query: options.outdatedDocumentsQuery,
// Optimize search performance by sorting by the "natural" index order
sort: ['_doc'],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('migration actions', () => {

// Create test fixture data:
await createIndex(client, 'existing_index_with_docs', {
dynamic: true as any,
dynamic: true,
properties: {},
})();
const sourceDocs = ([
Expand Down Expand Up @@ -337,7 +337,6 @@ describe('migration actions', () => {
// Reindex doesn't return any errors on it's own, so we have to test
// together with waitForReindexTask
describe('reindex & waitForReindexTask', () => {
expect.assertions(2);
it('resolves right when reindex succeeds without reindex script', async () => {
const res = (await reindex(
client,
Expand All @@ -354,11 +353,11 @@ describe('migration actions', () => {
}
`);

const results = ((await searchForOutdatedDocuments(
client,
'reindex_target',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand All @@ -384,11 +383,11 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_2',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_2',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand Down Expand Up @@ -432,12 +431,12 @@ describe('migration actions', () => {
}
`);

// Assert that documents weren't overrided by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_3',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
// Assert that documents weren't overridden by the second, unscripted reindex
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_3',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1_updated",
Expand All @@ -452,11 +451,11 @@ describe('migration actions', () => {
// Simulate a reindex that only adds some of the documents from the
// source index into the target index
await createIndex(client, 'reindex_target_4', { properties: {} })();
const sourceDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments
const sourceDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments
.slice(0, 2)
.map(({ _id, _source }) => ({
_id,
Expand All @@ -479,13 +478,13 @@ describe('migration actions', () => {
"right": "reindex_succeeded",
}
`);
// Assert that existing documents weren't overrided, but that missing
// Assert that existing documents weren't overridden, but that missing
// documents were added by the reindex
const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_4',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'reindex_target_4',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
Expand Down Expand Up @@ -701,26 +700,30 @@ describe('migration actions', () => {
describe('searchForOutdatedDocuments', () => {
it('only returns documents that match the outdatedDocumentsQuery', async () => {
expect.assertions(2);
const resultsWithQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
{
const resultsWithQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
}
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithQuery.length).toBe(3);

const resultsWithoutQuery = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const resultsWithoutQuery = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithoutQuery.length).toBe(4);
});
it('resolves with _id, _source, _seq_no and _primary_term', async () => {
expect.assertions(1);
const results = ((await searchForOutdatedDocuments(client, 'existing_index_with_docs', {
match: { title: { query: 'doc' } },
const results = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results).toEqual(
expect.arrayContaining([
Expand Down Expand Up @@ -805,7 +808,7 @@ describe('migration actions', () => {
it('resolves right when mappings were updated and picked up', async () => {
// Create an index without any mappings and insert documents into it
await createIndex(client, 'existing_index_without_mappings', {
dynamic: false as any,
dynamic: false,
properties: {},
})();
const sourceDocs = ([
Expand All @@ -821,11 +824,13 @@ describe('migration actions', () => {
)();

// Assert that we can't search over the unmapped fields of the document
const originalSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const originalSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(originalSearchResults.length).toBe(0);

// Update and pickup mappings so that the title field is searchable
Expand All @@ -839,11 +844,13 @@ describe('migration actions', () => {
await waitForPickupUpdatedMappingsTask(client, taskId, '60s')();

// Repeat the search expecting to be able to find the existing documents
const pickedUpSearchResults = ((await searchForOutdatedDocuments(
client,
'existing_index_without_mappings',
{ match: { title: { query: 'doc' } } }
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const pickedUpSearchResults = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_without_mappings',
outdatedDocumentsQuery: {
match: { title: { query: 'doc' } },
},
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(pickedUpSearchResults.length).toBe(4);
});
});
Expand Down Expand Up @@ -1050,11 +1057,11 @@ describe('migration actions', () => {
`);
});
it('resolves right even if there were some version_conflict_engine_exception', async () => {
const existingDocs = ((await searchForOutdatedDocuments(
client,
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
const existingDocs = ((await searchForOutdatedDocuments(client, {
batchSize: 1000,
targetIndex: 'existing_index_with_docs',
outdatedDocumentsQuery: undefined,
})()) as Either.Right<SearchResponse>).right.outdatedDocuments;

const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [
...existingDocs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -262,6 +263,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_DELETE -> FATAL",
Object {
"batchSize": 1000,
"controlState": "FATAL",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -413,6 +415,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] INIT -> LEGACY_REINDEX",
Object {
"batchSize": 1000,
"controlState": "LEGACY_REINDEX",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down Expand Up @@ -464,6 +467,7 @@ describe('migrationsStateActionMachine', () => {
Array [
"[.my-so-index] LEGACY_REINDEX -> LEGACY_DELETE",
Object {
"batchSize": 1000,
"controlState": "LEGACY_DELETE",
"currentAlias": ".my-so-index",
"indexPrefix": ".my-so-index",
Expand Down
Loading

0 comments on commit 6f1ed5e

Please sign in to comment.