Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewriting SO id during migration #97222

Merged
merged 40 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
19b94c8
some typos
mshustov Apr 13, 2021
59a3fed
implement an alternative client-side migration algorithm
mshustov Apr 15, 2021
8c54a03
update tests
mshustov Apr 15, 2021
2cf6d41
lol
mshustov Apr 15, 2021
4afb285
remove unnecessary param from request generic
mshustov Apr 15, 2021
6841668
remove unused parameter
mshustov Apr 15, 2021
f988f23
Merge branch 'master' into migration-id-gen
mshustov Apr 15, 2021
10fdb61
Merge branch 'master' into migration-id-gen
mshustov Apr 16, 2021
d3a2dd1
optimize search when quierying SO for migration
mshustov Apr 16, 2021
be9438e
fix wrong type in fixtures
mshustov Apr 16, 2021
4ebf73b
try shard_doc asc
mshustov Apr 16, 2021
8983403
Merge branch 'master' into migration-id-gen
mshustov Apr 16, 2021
fdd18e4
Merge branch 'master' into migration-id-gen
mshustov Apr 19, 2021
e40c824
add an integration test
mshustov Apr 19, 2021
8c03eaf
cleanup
mshustov Apr 19, 2021
dfdb8e8
Merge branch 'master' into migration-id-gen
mshustov Apr 19, 2021
61ef354
Merge branch 'master' into migration-id-gen
mshustov Apr 19, 2021
2fa72b9
Merge branch 'master' into migration-id-gen
mshustov Apr 20, 2021
e522a4b
track_total_hits: false to improve perf
mshustov Apr 20, 2021
ea3a1c4
add happy path test for transformDocs action
mshustov Apr 20, 2021
3aab116
remove unused types
mshustov Apr 20, 2021
0126616
fix wrong typing
mshustov Apr 21, 2021
aeb35cb
add cleanup phase
mshustov Apr 21, 2021
9c768a9
add an integration test for cleanup phase
mshustov Apr 21, 2021
8e331ce
add unit-tests for cleanup function
mshustov Apr 22, 2021
5a5c309
Merge branch 'master' into migration-id-gen
mshustov Apr 22, 2021
2b26a4e
Merge branch 'master' into migration-id-gen
mshustov Apr 22, 2021
d2dfc35
address comments
mshustov Apr 23, 2021
5a2c5fe
Merge branch 'master' into migration-id-gen
mshustov Apr 23, 2021
8d17ce0
Fix functional test
kertal Apr 23, 2021
abe3fff
Merge remote-tracking branch 'origin/migration-id-gen' into migration…
mshustov Apr 25, 2021
1332961
Merge branch 'master' into migration-id-gen
mshustov Apr 25, 2021
a128d7b
set defaultIndex before each test. otherwise it is deleted in the fir…
mshustov Apr 25, 2021
585bdd9
sourceIndex: Option.some<> for consistency
mshustov Apr 25, 2021
145244a
Revert "set defaultIndex before each test. otherwise it is deleted in…
mshustov Apr 25, 2021
44f7974
address comments from Pierre
mshustov Apr 25, 2021
97315b6
fix test
mshustov Apr 25, 2021
f7b4d7d
Revert "fix test"
mshustov Apr 26, 2021
c6ff34b
revert min convert version back to 8.0
mshustov Apr 26, 2021
94ea2cd
Merge branch 'master' into migration-id-gen
mshustov Apr 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions src/core/server/saved_objects/migrations/core/elastic_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import _ from 'lodash';
import { estypes } from '@elastic/elasticsearch';
import { MigrationEsClient } from './migration_es_client';
import { CountResponse, SearchResponse } from '../../../elasticsearch';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no changes in the file, a simple cleanup

import { IndexMapping } from '../../mappings';
import { SavedObjectsMigrationVersion } from '../../types';
import { AliasAction, RawDoc } from './call_cluster';
Expand Down Expand Up @@ -95,11 +94,11 @@ export async function fetchInfo(client: MigrationEsClient, index: string): Promi
* Creates a reader function that serves up batches of documents from the index. We aren't using
* an async generator, as that feature currently breaks Kibana's tooling.
*
* @param {CallCluster} callCluster - The elastic search connection
* @param {string} - The index to be read from
* @param client - The elastic search connection
* @param index - The index to be read from
* @param {opts}
* @prop {number} batchSize - The number of documents to read at a time
* @prop {string} scrollDuration - The scroll duration used for scrolling through the index
* @prop batchSize - The number of documents to read at a time
* @prop scrollDuration - The scroll duration used for scrolling through the index
*/
export function reader(
client: MigrationEsClient,
Expand All @@ -111,11 +110,11 @@ export function reader(

const nextBatch = () =>
scrollId !== undefined
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
? client.scroll<SavedObjectsRawDocSource>({
scroll,
scroll_id: scrollId,
})
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
: client.search<SavedObjectsRawDocSource>({
body: {
size: batchSize,
query: excludeUnusedTypesQuery,
Expand Down Expand Up @@ -143,10 +142,6 @@ export function reader(
/**
* Writes the specified documents to the index, throws an exception
* if any of the documents fail to save.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {RawDoc[]} docs
*/
export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) {
const { body } = await client.bulk({
Expand Down Expand Up @@ -184,9 +179,9 @@ export async function write(client: MigrationEsClient, index: string, docs: RawD
* it performs the check *each* time it is called, rather than memoizing itself,
* as this is used to determine if migrations are complete.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations
* @param client - The connection to ElasticSearch
* @param index
* @param migrationVersion - The latest versions of the migrations
*/
export async function migrationsUpToDate(
client: MigrationEsClient,
Expand All @@ -207,7 +202,7 @@ export async function migrationsUpToDate(
return true;
}

const { body } = await client.count<CountResponse>({
const { body } = await client.count({
body: {
query: {
bool: {
Expand Down Expand Up @@ -271,9 +266,9 @@ export async function createIndex(
* is a concrete index. This function will reindex `alias` into a new index, delete the `alias`
* index, and then create an alias `alias` that points to the new index.
*
* @param {CallCluster} callCluster - The connection to ElasticSearch
* @param {FullIndexInfo} info - Information about the mappings and name of the new index
* @param {string} alias - The name of the index being converted to an alias
* @param client - The ElasticSearch connection
* @param info - Information about the mappings and name of the new index
* @param alias - The name of the index being converted to an alias
*/
export async function convertToAlias(
client: MigrationEsClient,
Expand All @@ -297,7 +292,7 @@ export async function convertToAlias(
* alias, meaning that it will only point to one index at a time, so we
* remove any other indices from the alias.
*
* @param {CallCluster} callCluster
* @param {CallCluster} client
* @param {string} index
* @param {string} alias
* @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call
Expand Down Expand Up @@ -377,7 +372,7 @@ async function reindex(
) {
// We poll instead of having the request wait for completion, as for large indices,
// the request times out on the Elasticsearch side of things. We have a relatively tight
// polling interval, as the request is fairly efficent, and we don't
// polling interval, as the request is fairly efficient, and we don't
// want to block index migrations for too long on this.
const pollInterval = 250;
const { body: reindexBody } = await client.reindex({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ async function migrateSourceToDest(context: Context) {
serializer,
documentMigrator.migrateAndConvert,
// @ts-expect-error @elastic/elasticsearch `Hit._id` may be a string | number in ES, but we always expect strings in the SO index.
docs,
log
docs
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import _ from 'lodash';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsSerializer } from '../../serialization';
import { migrateRawDocs } from './migrate_raw_docs';
import { createSavedObjectsMigrationLoggerMock } from '../../migrations/mocks';

describe('migrateRawDocs', () => {
test('converts raw docs to saved objects', async () => {
Expand All @@ -24,8 +23,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
createSavedObjectsMigrationLoggerMock()
]
);

expect(result).toEqual([
Expand Down Expand Up @@ -59,7 +57,6 @@ describe('migrateRawDocs', () => {
});

test('throws when encountering a corrupt saved object document', async () => {
const logger = createSavedObjectsMigrationLoggerMock();
const transform = jest.fn<any, any>((doc: any) => [
set(_.cloneDeep(doc), 'attributes.name', 'TADA'),
]);
Expand All @@ -69,8 +66,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
logger
]
);

expect(result).rejects.toMatchInlineSnapshot(
Expand All @@ -88,8 +84,7 @@ describe('migrateRawDocs', () => {
const result = await migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }]
);

expect(result).toEqual([
Expand Down Expand Up @@ -119,12 +114,9 @@ describe('migrateRawDocs', () => {
throw new Error('error during transform');
});
await expect(
migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
)
migrateRawDocs(new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, [
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
SavedObjectUnsanitizedDoc,
} from '../../serialization';
import { MigrateAndConvertFn } from './document_migrator';
import { SavedObjectsMigrationLogger } from '.';

/**
* Error thrown when saved object migrations encounter a corrupt saved object.
Expand Down Expand Up @@ -46,8 +45,7 @@ export class CorruptSavedObjectError extends Error {
export async function migrateRawDocs(
serializer: SavedObjectsSerializer,
migrateDoc: MigrateAndConvertFn,
rawDocs: SavedObjectsRawDoc[],
log: SavedObjectsMigrationLogger
mshustov marked this conversation as resolved.
Show resolved Hide resolved
rawDocs: SavedObjectsRawDoc[]
): Promise<SavedObjectsRawDoc[]> {
const migrateDocWithoutBlocking = transformNonBlocking(migrateDoc);
const processedDocs = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,6 @@ describe('KibanaMigrator', () => {
jest.clearAllMocks();
});

it('creates a V2 migrator that initializes a new index and migrates an existing index', async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation is tested in the integration tests

const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise();
migrator.prepareMigrations();
await migrator.runMigrations();

// Basic assertions that we're creating and reindexing the expected indices
expect(options.client.indices.create).toHaveBeenCalledTimes(3);
expect(options.client.indices.create.mock.calls).toEqual(
expect.arrayContaining([
// LEGACY_CREATE_REINDEX_TARGET
expect.arrayContaining([expect.objectContaining({ index: '.my-index_pre8.2.3_001' })]),
// CREATE_REINDEX_TEMP
expect.arrayContaining([
expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
]),
// CREATE_NEW_TARGET
expect.arrayContaining([expect.objectContaining({ index: 'other-index_8.2.3_001' })]),
])
);
// LEGACY_REINDEX
expect(options.client.reindex.mock.calls[0][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index' }),
dest: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
}),
})
);
// REINDEX_SOURCE_TO_TEMP
expect(options.client.reindex.mock.calls[1][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
dest: expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
}),
})
);
const { status } = await migratorStatus;
return expect(status).toEqual('completed');
});
it('emits results on getMigratorResult$()', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
Expand Down Expand Up @@ -378,6 +336,24 @@ const mockV2MigrationOptions = () => {
} as estypes.GetTaskResponse)
);

options.client.search = jest
.fn()
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);

options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);

options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);

return options;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { runResilientMigrator } from '../../migrationsv2';
import { migrateRawDocs } from '../core/migrate_raw_docs';
import { MigrationLogger } from '../core/migration_logger';

export interface KibanaMigratorOptions {
client: ElasticsearchClient;
Expand Down Expand Up @@ -185,12 +184,7 @@ export class KibanaMigrator {
logger: this.log,
preMigrationScript: indexMap[index].script,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocs(
this.serializer,
this.documentMigrator.migrateAndConvert,
rawDocs,
new MigrationLogger(this.log)
),
migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs),
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,
Expand Down
50 changes: 49 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,54 @@ describe('actions', () => {
});
});

describe('openPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.openPit(client, 'my_index');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('closePit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.closePit(client, 'pitId');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('transformDocs', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('reindex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.reindex(
Expand Down Expand Up @@ -205,7 +253,7 @@ describe('actions', () => {

describe('bulkOverwriteTransformedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', []);
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for');
try {
await task();
} catch (e) {
Expand Down
Loading