Skip to content

Commit

Permalink
Saved Objects - make import/export stream based (#39674)
Browse files Browse the repository at this point in the history
* transform ndjson within route handlers for SO import/export APIs

* convert export saved objects to return a stream

* fix stream creation
  • Loading branch information
legrego authored Jul 1, 2019
1 parent 8807427 commit 3e8687c
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

import { getSortedObjectsForExport } from './get_sorted_objects_for_export';
import { SavedObjectsClientMock } from '../service/saved_objects_client.mock';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';

async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);
}

describe('getSortedObjectsForExport()', () => {
const savedObjectsClient = SavedObjectsClientMock.create();
Expand Down Expand Up @@ -59,11 +65,14 @@ describe('getSortedObjectsForExport()', () => {
per_page: 1,
page: 0,
});
const response = await getSortedObjectsForExport({
const exportStream = await getSortedObjectsForExport({
savedObjectsClient,
exportSizeLimit: 500,
types: ['index-pattern', 'search'],
});

const response = await readStreamToCompletion(exportStream);

expect(response).toMatchInlineSnapshot(`
Array [
Object {
Expand Down Expand Up @@ -169,7 +178,7 @@ Array [
},
],
});
const response = await getSortedObjectsForExport({
const exportStream = await getSortedObjectsForExport({
exportSizeLimit: 10000,
savedObjectsClient,
types: ['index-pattern', 'search'],
Expand All @@ -184,6 +193,7 @@ Array [
},
],
});
const response = await readStreamToCompletion(exportStream);
expect(response).toMatchInlineSnapshot(`
Array [
Object {
Expand Down Expand Up @@ -259,7 +269,7 @@ Array [
},
],
});
const response = await getSortedObjectsForExport({
const exportStream = await getSortedObjectsForExport({
exportSizeLimit: 10000,
savedObjectsClient,
types: ['index-pattern', 'search'],
Expand All @@ -271,6 +281,7 @@ Array [
],
includeReferencesDeep: true,
});
const response = await readStreamToCompletion(exportStream);
expect(response).toMatchInlineSnapshot(`
Array [
Object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

import Boom from 'boom';
import { createListStream } from '../../../../legacy/utils/streams';
import { SavedObjectsClientContract } from '../';
import { injectNestedDependencies } from './inject_nested_depdendencies';
import { sortObjects } from './sort_objects';
Expand Down Expand Up @@ -86,9 +87,12 @@ export async function getSortedObjectsForExport({
savedObjectsClient,
exportSizeLimit,
});
return sortObjects(

const exportedObjects = sortObjects(
includeReferencesDeep
? await injectNestedDependencies(objectsToExport, savedObjectsClient)
: objectsToExport
);

return createListStream(exportedObjects);
}
6 changes: 3 additions & 3 deletions src/core/server/saved_objects/export/sort_objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import Boom from 'boom';
import { SavedObject } from '../service/saved_objects_client';

export function sortObjects(savedObjects: SavedObject[]) {
const path = new Set();
const sorted = new Set();
export function sortObjects(savedObjects: SavedObject[]): SavedObject[] {
const path = new Set<SavedObject>();
const sorted = new Set<SavedObject>();
const objectsByTypeId = new Map(
savedObjects.map(object => [`${object.type}:${object.id}`, object] as [string, SavedObject])
);
Expand Down
45 changes: 12 additions & 33 deletions src/core/server/saved_objects/import/collect_saved_objects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { collectSavedObjects } from './collect_saved_objects';
describe('collectSavedObjects()', () => {
test('collects nothing when stream is empty', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push(null);
},
Expand All @@ -38,34 +39,9 @@ Object {

test('collects objects from stream', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push('{"foo":true,"type":"a"}');
this.push(null);
},
});
const result = await collectSavedObjects({
readStream,
objectLimit: 1,
supportedTypes: ['a'],
});
expect(result).toMatchInlineSnapshot(`
Object {
"collectedObjects": Array [
Object {
"foo": true,
"migrationVersion": Object {},
"type": "a",
},
],
"errors": Array [],
}
`);
});

test('filters out empty lines', async () => {
const readStream = new Readable({
read() {
this.push('{"foo":true,"type":"a"}\n\n');
this.push({ foo: true, type: 'a' });
this.push(null);
},
});
Expand All @@ -90,9 +66,10 @@ Object {

test('throws error when object limit is reached', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push('{"foo":true,"type":"a"}\n');
this.push('{"bar":true,"type":"a"}\n');
this.push({ foo: true, type: 'a' });
this.push({ bar: true, type: 'a' });
this.push(null);
},
});
Expand All @@ -107,9 +84,10 @@ Object {

test('unsupported types return as import errors', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push('{"id":"1","type":"a","attributes":{"title":"my title"}}\n');
this.push('{"id":"2","type":"b","attributes":{"title":"my title 2"}}\n');
this.push({ id: '1', type: 'a', attributes: { title: 'my title' } });
this.push({ id: '2', type: 'b', attributes: { title: 'my title 2' } });
this.push(null);
},
});
Expand Down Expand Up @@ -141,9 +119,10 @@ Object {

test('unsupported types still count towards object limit', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push('{"foo":true,"type":"a"}\n');
this.push('{"bar":true,"type":"b"}\n');
this.push({ foo: true, type: 'a' });
this.push({ bar: true, type: 'b' });
this.push(null);
},
});
Expand Down
8 changes: 0 additions & 8 deletions src/core/server/saved_objects/import/collect_saved_objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
createFilterStream,
createMapStream,
createPromiseFromStreams,
createSplitStream,
} from '../../../../legacy/utils/streams';
import { SavedObject } from '../service';
import { createLimitStream } from './create_limit_stream';
Expand All @@ -45,13 +44,6 @@ export async function collectSavedObjects({
const errors: ImportError[] = [];
const collectedObjects: SavedObject[] = await createPromiseFromStreams([
readStream,
createSplitStream('\n'),
createMapStream((str: string) => {
if (str && str !== '') {
return JSON.parse(str);
}
}),
createFilterStream<SavedObject>(obj => !!obj),
createLimitStream(objectLimit),
createFilterStream<SavedObject>(obj => {
if (supportedTypes.includes(obj.type)) {
Expand Down
72 changes: 37 additions & 35 deletions src/core/server/saved_objects/import/import_saved_objects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ describe('importSavedObjects()', () => {

test('returns early when no objects exist', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push(null);
},
Expand All @@ -94,8 +95,9 @@ Object {

test('calls bulkCreate without overwrite', async () => {
const readStream = new Readable({
objectMode: true,
read() {
savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n'));
savedObjects.forEach(obj => this.push(obj));
this.push(null);
},
});
Expand Down Expand Up @@ -175,8 +177,9 @@ Object {

test('calls bulkCreate with overwrite', async () => {
const readStream = new Readable({
objectMode: true,
read() {
savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n'));
savedObjects.forEach(obj => this.push(obj));
this.push(null);
},
});
Expand Down Expand Up @@ -256,8 +259,9 @@ Object {

test('extracts errors for conflicts', async () => {
const readStream = new Readable({
objectMode: true,
read() {
savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n'));
savedObjects.forEach(obj => this.push(obj));
this.push(null);
},
});
Expand Down Expand Up @@ -323,39 +327,36 @@ Object {

test('validates references', async () => {
const readStream = new Readable({
objectMode: true,
read() {
this.push(
JSON.stringify({
id: '1',
type: 'search',
attributes: {
title: 'My Search',
this.push({
id: '1',
type: 'search',
attributes: {
title: 'My Search',
},
references: [
{
name: 'ref_0',
type: 'index-pattern',
id: '2',
},
references: [
{
name: 'ref_0',
type: 'index-pattern',
id: '2',
},
],
}) + '\n'
);
this.push(
JSON.stringify({
id: '3',
type: 'visualization',
attributes: {
title: 'My Visualization',
],
});
this.push({
id: '3',
type: 'visualization',
attributes: {
title: 'My Visualization',
},
references: [
{
name: 'ref_0',
type: 'search',
id: '1',
},
references: [
{
name: 'ref_0',
type: 'search',
id: '1',
},
],
}) + '\n'
);
],
});
this.push(null);
},
});
Expand Down Expand Up @@ -433,9 +434,10 @@ Object {

test('validates supported types', async () => {
const readStream = new Readable({
objectMode: true,
read() {
savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n'));
this.push('{"id":"1","type":"wigwags","attributes":{"title":"my title"},"references":[]}');
savedObjects.forEach(obj => this.push(obj));
this.push({ id: '1', type: 'wigwags', attributes: { title: 'my title' }, references: [] });
this.push(null);
},
});
Expand Down
Loading

0 comments on commit 3e8687c

Please sign in to comment.