From cd86b81b82f3cf28edc47f56bab900bd43214d9c Mon Sep 17 00:00:00 2001 From: Mikhail Shustov Date: Thu, 3 Sep 2020 11:57:03 +0300 Subject: [PATCH] Move streams utils to the core (#76397) * move utils/streams to the KP * allow imports from src/core/server/utils * ts-ify * import streams from KP * remove unnecessary ts-expect-error * fix kbn-es-archiver build * lost export * copy array in createListStream * remove streams from legacy utils Co-authored-by: spalger --- .eslintrc.js | 2 ++ src/cli_keystore/add.js | 2 +- .../get_sorted_objects_for_export.test.ts | 2 +- .../export/get_sorted_objects_for_export.ts | 2 +- .../import/collect_saved_objects.ts | 2 +- .../import/create_limit_stream.test.ts | 2 +- .../server/saved_objects/routes/export.ts | 6 +--- .../routes/integration_tests/export.test.ts | 2 +- .../server/saved_objects/routes/utils.test.ts | 2 +- src/core/server/saved_objects/routes/utils.ts | 6 +--- src/core/server/utils/index.ts | 1 + .../utils/streams/concat_stream.test.ts} | 2 +- .../server/utils/streams/concat_stream.ts} | 2 +- .../streams/concat_stream_providers.test.ts} | 0 .../utils/streams/concat_stream_providers.ts} | 7 ++-- .../utils/streams/filter_stream.test.ts | 2 +- .../server}/utils/streams/filter_stream.ts | 0 .../server/utils/streams/index.ts} | 0 .../utils/streams/intersperse_stream.test.ts} | 2 +- .../utils/streams/intersperse_stream.ts} | 4 +-- .../server/utils/streams/list_stream.test.ts} | 2 +- .../server/utils/streams/list_stream.ts} | 4 +-- .../server/utils/streams/map_stream.test.ts} | 4 +-- .../server/utils/streams/map_stream.ts} | 2 +- .../streams/promise_from_streams.test.ts} | 7 ++-- .../utils/streams/promise_from_streams.ts} | 16 ++++++--- .../utils/streams/reduce_stream.test.ts} | 11 +++--- .../server/utils/streams/reduce_stream.ts} | 5 ++- .../utils/streams/replace_stream.test.ts} | 12 ++++--- .../server/utils/streams/replace_stream.ts} | 3 +- .../utils/streams/split_stream.test.ts} | 8 ++--- .../server/utils/streams/split_stream.ts} | 6 ++-- src/dev/build/lib/watch_stdio_for_line.ts | 2 +- src/legacy/server/i18n/index.ts | 1 - .../server/logging/log_format_json.test.js | 2 +- .../server/logging/log_format_string.test.js | 2 +- src/legacy/utils/artifact_type.ts | 1 - src/legacy/utils/index.d.ts | 13 ------- src/legacy/utils/index.js | 12 ------- src/legacy/utils/streams/index.d.ts | 36 ------------------- .../routes/rules/import_rules_route.ts | 2 +- .../routes/rules/utils.test.ts | 2 +- .../create_rules_stream_from_ndjson.test.ts | 2 +- .../rules/create_rules_stream_from_ndjson.ts | 2 +- .../create_timelines_stream_from_ndjson.ts | 2 +- .../routes/import_timelines_route.test.ts | 4 +-- .../lib/timeline/routes/utils/common.ts | 2 +- .../timeline/routes/utils/import_timelines.ts | 2 +- .../utils/install_prepacked_timelines.test.ts | 2 +- .../read_stream/create_stream_from_ndjson.ts | 2 +- .../create_copy_to_space_mocks.ts | 2 +- 51 files changed, 85 insertions(+), 136 deletions(-) rename src/{legacy/utils/streams/concat_stream.test.js => core/server/utils/streams/concat_stream.test.ts} (98%) rename src/{legacy/utils/streams/concat_stream.js => core/server/utils/streams/concat_stream.ts} (96%) rename src/{legacy/utils/streams/concat_stream_providers.test.js => core/server/utils/streams/concat_stream_providers.test.ts} (100%) rename src/{legacy/utils/streams/concat_stream_providers.js => core/server/utils/streams/concat_stream_providers.ts} (91%) rename src/{legacy => core/server}/utils/streams/filter_stream.test.ts (99%) rename src/{legacy => core/server}/utils/streams/filter_stream.ts (100%) rename src/{legacy/utils/streams/index.js => core/server/utils/streams/index.ts} (100%) rename src/{legacy/utils/streams/intersperse_stream.test.js => core/server/utils/streams/intersperse_stream.test.ts} (99%) rename src/{legacy/utils/streams/intersperse_stream.js => core/server/utils/streams/intersperse_stream.ts} (95%) rename src/{legacy/utils/streams/list_stream.test.js => core/server/utils/streams/list_stream.test.ts} (97%) rename src/{legacy/utils/streams/list_stream.js => core/server/utils/streams/list_stream.ts} (90%) rename src/{legacy/utils/streams/map_stream.test.js => core/server/utils/streams/map_stream.test.ts} (95%) rename src/{legacy/utils/streams/map_stream.js => core/server/utils/streams/map_stream.ts} (93%) rename src/{legacy/utils/streams/promise_from_streams.test.js => core/server/utils/streams/promise_from_streams.test.ts} (96%) rename src/{legacy/utils/streams/promise_from_streams.js => core/server/utils/streams/promise_from_streams.ts} (80%) rename src/{legacy/utils/streams/reduce_stream.test.js => core/server/utils/streams/reduce_stream.test.ts} (92%) rename src/{legacy/utils/streams/reduce_stream.js => core/server/utils/streams/reduce_stream.ts} (95%) rename src/{legacy/utils/streams/replace_stream.test.js => core/server/utils/streams/replace_stream.test.ts} (91%) rename src/{legacy/utils/streams/replace_stream.js => core/server/utils/streams/replace_stream.ts} (96%) rename src/{legacy/utils/streams/split_stream.test.js => core/server/utils/streams/split_stream.test.ts} (93%) rename src/{legacy/utils/streams/split_stream.js => core/server/utils/streams/split_stream.ts} (95%) delete mode 100644 src/legacy/utils/streams/index.d.ts diff --git a/.eslintrc.js b/.eslintrc.js index 9b75c36c95abd..3161a25b70870 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -305,6 +305,8 @@ module.exports = { '!src/core/server/mocks{,.ts}', '!src/core/server/types{,.ts}', '!src/core/server/test_utils{,.ts}', + '!src/core/server/utils', // ts alias + '!src/core/server/utils/**/*', // for absolute imports until fixed in // https://github.com/elastic/kibana/issues/36096 '!src/core/server/*.test.mocks{,.ts}', diff --git a/src/cli_keystore/add.js b/src/cli_keystore/add.js index 44737e387c2d2..462259ec942dd 100644 --- a/src/cli_keystore/add.js +++ b/src/cli_keystore/add.js @@ -19,7 +19,7 @@ import { Logger } from '../cli_plugin/lib/logger'; import { confirm, question } from '../legacy/server/utils'; -import { createPromiseFromStreams, createConcatStream } from '../legacy/utils'; +import { createPromiseFromStreams, createConcatStream } from '../core/server/utils'; /** * @param {Keystore} keystore diff --git a/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts b/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts index 85b3a281aef7f..c084125f43127 100644 --- a/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts +++ b/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts @@ -20,7 +20,7 @@ import { exportSavedObjectsToStream } from './get_sorted_objects_for_export'; import { savedObjectsClientMock } from '../service/saved_objects_client.mock'; import { Readable } from 'stream'; -import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams'; +import { createPromiseFromStreams, createConcatStream } from '../../utils/streams'; async function readStreamToCompletion(stream: Readable) { return createPromiseFromStreams([stream, createConcatStream([])]); diff --git a/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts b/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts index 94f727e238ecf..214b51db7dd6b 100644 --- a/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts +++ b/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts @@ -18,7 +18,7 @@ */ import Boom from 'boom'; -import { createListStream } from '../../../../legacy/utils/streams'; +import { createListStream } from '../../utils/streams'; import { SavedObjectsClientContract, SavedObject } from '../types'; import { fetchNestedDependencies } from './inject_nested_depdendencies'; import { sortObjects } from './sort_objects'; diff --git a/src/core/server/saved_objects/import/collect_saved_objects.ts b/src/core/server/saved_objects/import/collect_saved_objects.ts index f55e6bf0d2af4..8e84f864cf449 100644 --- a/src/core/server/saved_objects/import/collect_saved_objects.ts +++ b/src/core/server/saved_objects/import/collect_saved_objects.ts @@ -23,7 +23,7 @@ import { createFilterStream, createMapStream, createPromiseFromStreams, -} from '../../../../legacy/utils/streams'; +} from '../../utils/streams'; import { SavedObject } from '../types'; import { createLimitStream } from './create_limit_stream'; import { SavedObjectsImportError } from './types'; diff --git a/src/core/server/saved_objects/import/create_limit_stream.test.ts b/src/core/server/saved_objects/import/create_limit_stream.test.ts index 736cfadcb6222..a7e689710a564 100644 --- a/src/core/server/saved_objects/import/create_limit_stream.test.ts +++ b/src/core/server/saved_objects/import/create_limit_stream.test.ts @@ -21,7 +21,7 @@ import { createConcatStream, createListStream, createPromiseFromStreams, -} from '../../../../legacy/utils/streams'; +} from '../../utils/streams'; import { createLimitStream } from './create_limit_stream'; describe('createLimitStream()', () => { diff --git a/src/core/server/saved_objects/routes/export.ts b/src/core/server/saved_objects/routes/export.ts index 9445c144ecda4..35a65d8d9651f 100644 --- a/src/core/server/saved_objects/routes/export.ts +++ b/src/core/server/saved_objects/routes/export.ts @@ -19,11 +19,7 @@ import { schema } from '@kbn/config-schema'; import stringify from 'json-stable-stringify'; -import { - createPromiseFromStreams, - createMapStream, - createConcatStream, -} from '../../../../legacy/utils/streams'; +import { createPromiseFromStreams, createMapStream, createConcatStream } from '../../utils/streams'; import { IRouter } from '../../http'; import { SavedObjectConfig } from '../saved_objects_config'; import { exportSavedObjectsToStream } from '../export'; diff --git a/src/core/server/saved_objects/routes/integration_tests/export.test.ts b/src/core/server/saved_objects/routes/integration_tests/export.test.ts index d47f7c6050d8f..a3891712fd22b 100644 --- a/src/core/server/saved_objects/routes/integration_tests/export.test.ts +++ b/src/core/server/saved_objects/routes/integration_tests/export.test.ts @@ -22,7 +22,7 @@ jest.mock('../../export', () => ({ })); import * as exportMock from '../../export'; -import { createListStream } from '../../../../../legacy/utils/streams'; +import { createListStream } from '../../../utils/streams'; import supertest from 'supertest'; import { UnwrapPromise } from '@kbn/utility-types'; import { SavedObjectConfig } from '../../saved_objects_config'; diff --git a/src/core/server/saved_objects/routes/utils.test.ts b/src/core/server/saved_objects/routes/utils.test.ts index 24719724785af..fd3bdad8606ed 100644 --- a/src/core/server/saved_objects/routes/utils.test.ts +++ b/src/core/server/saved_objects/routes/utils.test.ts @@ -19,7 +19,7 @@ import { createSavedObjectsStreamFromNdJson, validateTypes, validateObjects } from './utils'; import { Readable } from 'stream'; -import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams'; +import { createPromiseFromStreams, createConcatStream } from '../../utils/streams'; async function readStreamToCompletion(stream: Readable) { return createPromiseFromStreams([stream, createConcatStream([])]); diff --git a/src/core/server/saved_objects/routes/utils.ts b/src/core/server/saved_objects/routes/utils.ts index 3963833a9c718..f16a6e471257d 100644 --- a/src/core/server/saved_objects/routes/utils.ts +++ b/src/core/server/saved_objects/routes/utils.ts @@ -19,11 +19,7 @@ import { Readable } from 'stream'; import { SavedObject, SavedObjectsExportResultDetails } from 'src/core/server'; -import { - createSplitStream, - createMapStream, - createFilterStream, -} from '../../../../legacy/utils/streams'; +import { createSplitStream, createMapStream, createFilterStream } from '../../utils/streams'; export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) { return ndJsonStream diff --git a/src/core/server/utils/index.ts b/src/core/server/utils/index.ts index b01a4c4e04899..d9c4217c4117f 100644 --- a/src/core/server/utils/index.ts +++ b/src/core/server/utils/index.ts @@ -20,3 +20,4 @@ export * from './crypto'; export * from './from_root'; export * from './package_json'; +export * from './streams'; diff --git a/src/legacy/utils/streams/concat_stream.test.js b/src/core/server/utils/streams/concat_stream.test.ts similarity index 98% rename from src/legacy/utils/streams/concat_stream.test.js rename to src/core/server/utils/streams/concat_stream.test.ts index 1498334013d1a..e964ab2a7a97e 100644 --- a/src/legacy/utils/streams/concat_stream.test.js +++ b/src/core/server/utils/streams/concat_stream.test.ts @@ -17,7 +17,7 @@ * under the License. */ -import { createListStream, createPromiseFromStreams, createConcatStream } from './'; +import { createListStream, createPromiseFromStreams, createConcatStream } from './index'; describe('concatStream', () => { test('accepts an initial value', async () => { diff --git a/src/legacy/utils/streams/concat_stream.js b/src/core/server/utils/streams/concat_stream.ts similarity index 96% rename from src/legacy/utils/streams/concat_stream.js rename to src/core/server/utils/streams/concat_stream.ts index e3f8f7261d2b7..03450cb51b832 100644 --- a/src/legacy/utils/streams/concat_stream.js +++ b/src/core/server/utils/streams/concat_stream.ts @@ -41,6 +41,6 @@ import { createReduceStream } from './reduce_stream'; * items will concat with * @return {Transform} */ -export function createConcatStream(initial) { +export function createConcatStream(initial?: T) { return createReduceStream((acc, chunk) => acc.concat(chunk), initial); } diff --git a/src/legacy/utils/streams/concat_stream_providers.test.js b/src/core/server/utils/streams/concat_stream_providers.test.ts similarity index 100% rename from src/legacy/utils/streams/concat_stream_providers.test.js rename to src/core/server/utils/streams/concat_stream_providers.test.ts diff --git a/src/legacy/utils/streams/concat_stream_providers.js b/src/core/server/utils/streams/concat_stream_providers.ts similarity index 91% rename from src/legacy/utils/streams/concat_stream_providers.js rename to src/core/server/utils/streams/concat_stream_providers.ts index 11dfb84284df3..bb836e3d73787 100644 --- a/src/legacy/utils/streams/concat_stream_providers.js +++ b/src/core/server/utils/streams/concat_stream_providers.ts @@ -17,7 +17,7 @@ * under the License. */ -import { PassThrough } from 'stream'; +import { Readable, PassThrough, TransformOptions } from 'stream'; /** * Write the data and errors from a list of stream providers @@ -29,7 +29,10 @@ import { PassThrough } from 'stream'; * @param {PassThroughOptions} options options passed to the PassThrough constructor * @return {WritableStream} combined stream */ -export function concatStreamProviders(sourceProviders, options = {}) { +export function concatStreamProviders( + sourceProviders: Array<() => Readable>, + options?: TransformOptions +) { const destination = new PassThrough(options); const queue = sourceProviders.slice(); diff --git a/src/legacy/utils/streams/filter_stream.test.ts b/src/core/server/utils/streams/filter_stream.test.ts similarity index 99% rename from src/legacy/utils/streams/filter_stream.test.ts rename to src/core/server/utils/streams/filter_stream.test.ts index 28b7f2588628e..41073e54b0a84 100644 --- a/src/legacy/utils/streams/filter_stream.test.ts +++ b/src/core/server/utils/streams/filter_stream.test.ts @@ -22,7 +22,7 @@ import { createFilterStream, createListStream, createPromiseFromStreams, -} from './'; +} from './index'; describe('createFilterStream()', () => { test('calls the function with each item in the source stream', async () => { diff --git a/src/legacy/utils/streams/filter_stream.ts b/src/core/server/utils/streams/filter_stream.ts similarity index 100% rename from src/legacy/utils/streams/filter_stream.ts rename to src/core/server/utils/streams/filter_stream.ts diff --git a/src/legacy/utils/streams/index.js b/src/core/server/utils/streams/index.ts similarity index 100% rename from src/legacy/utils/streams/index.js rename to src/core/server/utils/streams/index.ts diff --git a/src/legacy/utils/streams/intersperse_stream.test.js b/src/core/server/utils/streams/intersperse_stream.test.ts similarity index 99% rename from src/legacy/utils/streams/intersperse_stream.test.js rename to src/core/server/utils/streams/intersperse_stream.test.ts index e11b36d77106a..9aa15035d2a1c 100644 --- a/src/legacy/utils/streams/intersperse_stream.test.js +++ b/src/core/server/utils/streams/intersperse_stream.test.ts @@ -22,7 +22,7 @@ import { createListStream, createIntersperseStream, createConcatStream, -} from './'; +} from './index'; describe('intersperseStream', () => { test('places the intersperse value between each provided value', async () => { diff --git a/src/legacy/utils/streams/intersperse_stream.js b/src/core/server/utils/streams/intersperse_stream.ts similarity index 95% rename from src/legacy/utils/streams/intersperse_stream.js rename to src/core/server/utils/streams/intersperse_stream.ts index 5f9f0b03cd7eb..272507221caff 100644 --- a/src/legacy/utils/streams/intersperse_stream.js +++ b/src/core/server/utils/streams/intersperse_stream.ts @@ -40,7 +40,7 @@ import { Transform } from 'stream'; * @param {String|Buffer} intersperseChunk * @return {Transform} */ -export function createIntersperseStream(intersperseChunk) { +export function createIntersperseStream(intersperseChunk: string | Buffer) { let first = true; return new Transform({ @@ -55,7 +55,7 @@ export function createIntersperseStream(intersperseChunk) { } this.push(chunk); - callback(null); + callback(); } catch (err) { callback(err); } diff --git a/src/legacy/utils/streams/list_stream.test.js b/src/core/server/utils/streams/list_stream.test.ts similarity index 97% rename from src/legacy/utils/streams/list_stream.test.js rename to src/core/server/utils/streams/list_stream.test.ts index 12e20696b0510..2a20c929db6b9 100644 --- a/src/legacy/utils/streams/list_stream.test.js +++ b/src/core/server/utils/streams/list_stream.test.ts @@ -17,7 +17,7 @@ * under the License. */ -import { createListStream } from './'; +import { createListStream } from './index'; describe('listStream', () => { test('provides the values in the initial list', async () => { diff --git a/src/legacy/utils/streams/list_stream.js b/src/core/server/utils/streams/list_stream.ts similarity index 90% rename from src/legacy/utils/streams/list_stream.js rename to src/core/server/utils/streams/list_stream.ts index a614620b054b7..e62f6d3fa930b 100644 --- a/src/legacy/utils/streams/list_stream.js +++ b/src/core/server/utils/streams/list_stream.ts @@ -26,8 +26,8 @@ import { Readable } from 'stream'; * @param {Array} items - the list of items to provide * @return {Readable} */ -export function createListStream(items = []) { - const queue = [].concat(items); +export function createListStream(items: T | T[] = []) { + const queue = Array.isArray(items) ? [...items] : [items]; return new Readable({ objectMode: true, diff --git a/src/legacy/utils/streams/map_stream.test.js b/src/core/server/utils/streams/map_stream.test.ts similarity index 95% rename from src/legacy/utils/streams/map_stream.test.js rename to src/core/server/utils/streams/map_stream.test.ts index d86da178f0c1b..bf0cab39c21f4 100644 --- a/src/legacy/utils/streams/map_stream.test.js +++ b/src/core/server/utils/streams/map_stream.test.ts @@ -39,7 +39,7 @@ describe('createMapStream()', () => { test('send the return value from the mapper on the output stream', async () => { const result = await createPromiseFromStreams([ createListStream([1, 2, 3]), - createMapStream((n) => n * 100), + createMapStream((n: number) => n * 100), createConcatStream([]), ]); @@ -49,7 +49,7 @@ describe('createMapStream()', () => { test('supports async mappers', async () => { const result = await createPromiseFromStreams([ createListStream([1, 2, 3]), - createMapStream(async (n, i) => { + createMapStream(async (n: number, i: number) => { await delay(n); return n * i; }), diff --git a/src/legacy/utils/streams/map_stream.js b/src/core/server/utils/streams/map_stream.ts similarity index 93% rename from src/legacy/utils/streams/map_stream.js rename to src/core/server/utils/streams/map_stream.ts index 4e906471330f1..aad53cc526626 100644 --- a/src/legacy/utils/streams/map_stream.js +++ b/src/core/server/utils/streams/map_stream.ts @@ -19,7 +19,7 @@ import { Transform } from 'stream'; -export function createMapStream(fn) { +export function createMapStream(fn: (value: T, i: number) => void) { let i = 0; return new Transform({ diff --git a/src/legacy/utils/streams/promise_from_streams.test.js b/src/core/server/utils/streams/promise_from_streams.test.ts similarity index 96% rename from src/legacy/utils/streams/promise_from_streams.test.js rename to src/core/server/utils/streams/promise_from_streams.test.ts index e4d9835106f12..1f2596c16a6fa 100644 --- a/src/legacy/utils/streams/promise_from_streams.test.js +++ b/src/core/server/utils/streams/promise_from_streams.test.ts @@ -19,7 +19,7 @@ import { Readable, Writable, Duplex, Transform } from 'stream'; -import { createListStream, createPromiseFromStreams, createReduceStream } from './'; +import { createListStream, createPromiseFromStreams, createReduceStream } from './index'; describe('promiseFromStreams', () => { test('pipes together an array of streams', async () => { @@ -76,14 +76,13 @@ describe('promiseFromStreams', () => { test('waits for writing and resolves to final value', async () => { let written = ''; - const duplexReadQueue = []; + const duplexReadQueue: Array> = []; const duplexItemsToPush = ['foo', 'bar', null]; const result = await createPromiseFromStreams([ createListStream(['a', 'b', 'c']), new Duplex({ async read() { - const result = await duplexReadQueue.shift(); - this.push(result); + this.push(await duplexReadQueue.shift()); }, write(chunk, enc, cb) { diff --git a/src/legacy/utils/streams/promise_from_streams.js b/src/core/server/utils/streams/promise_from_streams.ts similarity index 80% rename from src/legacy/utils/streams/promise_from_streams.js rename to src/core/server/utils/streams/promise_from_streams.ts index 05f6a08aa1a09..f5fc4af62bc83 100644 --- a/src/legacy/utils/streams/promise_from_streams.js +++ b/src/core/server/utils/streams/promise_from_streams.ts @@ -34,16 +34,20 @@ * @return {Promise} */ -import { pipeline, Writable } from 'stream'; +import { pipeline, Writable, Readable } from 'stream'; -export async function createPromiseFromStreams(streams) { - let finalChunk; +function isReadable(stream: Readable | Writable): stream is Readable { + return 'read' in stream && typeof stream.read === 'function'; +} + +export async function createPromiseFromStreams(streams: [Readable, ...Writable[]]): Promise { + let finalChunk: any; const last = streams[streams.length - 1]; - if (typeof last.read !== 'function' && streams.length === 1) { + if (!isReadable(last) && streams.length === 1) { // For a nicer error than what stream.pipeline throws throw new Error('A minimum of 2 streams is required when a non-readable stream is given'); } - if (typeof last.read === 'function') { + if (isReadable(last)) { // We are pushing a writable stream to capture the last chunk streams.push( new Writable({ @@ -57,7 +61,9 @@ export async function createPromiseFromStreams(streams) { }) ); } + return new Promise((resolve, reject) => { + // @ts-expect-error 'pipeline' doesn't support variable length of arguments pipeline(...streams, (err) => { if (err) return reject(err); resolve(finalChunk); diff --git a/src/legacy/utils/streams/reduce_stream.test.js b/src/core/server/utils/streams/reduce_stream.test.ts similarity index 92% rename from src/legacy/utils/streams/reduce_stream.test.js rename to src/core/server/utils/streams/reduce_stream.test.ts index 2c073f67f82a8..e4a7dc1cef491 100644 --- a/src/legacy/utils/streams/reduce_stream.test.js +++ b/src/core/server/utils/streams/reduce_stream.test.ts @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ +import { Transform } from 'stream'; +import { createReduceStream, createPromiseFromStreams, createListStream } from './index'; -import { createReduceStream, createPromiseFromStreams, createListStream } from './'; - -const promiseFromEvent = (name, emitter) => +const promiseFromEvent = (name: string, emitter: Transform) => new Promise((resolve) => emitter.on(name, () => resolve(name))); describe('reduceStream', () => { @@ -47,7 +47,10 @@ describe('reduceStream', () => { }); test('emits an error if an iteration fails', async () => { - const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0); + const reduce = createReduceStream((acc, i) => { + expect(i).toBe(1); + return acc; + }, 0); const errorEvent = promiseFromEvent('error', reduce); reduce.write(1); diff --git a/src/legacy/utils/streams/reduce_stream.js b/src/core/server/utils/streams/reduce_stream.ts similarity index 95% rename from src/legacy/utils/streams/reduce_stream.js rename to src/core/server/utils/streams/reduce_stream.ts index d66b0124d1dab..9129df096ad13 100644 --- a/src/legacy/utils/streams/reduce_stream.js +++ b/src/core/server/utils/streams/reduce_stream.ts @@ -32,7 +32,10 @@ import { Transform } from 'stream'; * initial value. * @return {Transform} */ -export function createReduceStream(reducer, initial) { +export function createReduceStream( + reducer: (value: any, chunk: T, enc: string) => T, + initial?: T +) { let i = -1; let value = initial; diff --git a/src/legacy/utils/streams/replace_stream.test.js b/src/core/server/utils/streams/replace_stream.test.ts similarity index 91% rename from src/legacy/utils/streams/replace_stream.test.js rename to src/core/server/utils/streams/replace_stream.test.ts index 01b89f93e5af0..c9da42395fb85 100644 --- a/src/legacy/utils/streams/replace_stream.test.js +++ b/src/core/server/utils/streams/replace_stream.test.ts @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import { Writable, Readable } from 'stream'; import { createReplaceStream, @@ -23,19 +24,19 @@ import { createPromiseFromStreams, createListStream, createMapStream, -} from './'; +} from './index'; -async function concatToString(streams) { +async function concatToString(streams: [Readable, ...Writable[]]) { return await createPromiseFromStreams([ ...streams, - createMapStream((buff) => buff.toString('utf8')), + createMapStream((buff: Buffer) => buff.toString('utf8')), createConcatStream(''), ]); } describe('replaceStream', () => { test('produces buffers when it receives buffers', async () => { - const chunks = await createPromiseFromStreams([ + const chunks = await createPromiseFromStreams([ createListStream([Buffer.from('foo'), Buffer.from('bar')]), createReplaceStream('o', '0'), createConcatStream([]), @@ -47,7 +48,7 @@ describe('replaceStream', () => { }); test('produces buffers when it receives strings', async () => { - const chunks = await createPromiseFromStreams([ + const chunks = await createPromiseFromStreams([ createListStream(['foo', 'bar']), createReplaceStream('o', '0'), createConcatStream([]), @@ -59,6 +60,7 @@ describe('replaceStream', () => { }); test('expects toReplace to be a string', () => { + // @ts-expect-error expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/); }); diff --git a/src/legacy/utils/streams/replace_stream.js b/src/core/server/utils/streams/replace_stream.ts similarity index 96% rename from src/legacy/utils/streams/replace_stream.js rename to src/core/server/utils/streams/replace_stream.ts index 7309bd241fa52..05391bb3341c2 100644 --- a/src/legacy/utils/streams/replace_stream.js +++ b/src/core/server/utils/streams/replace_stream.ts @@ -19,7 +19,7 @@ import { Transform } from 'stream'; -export function createReplaceStream(toReplace, replacement) { +export function createReplaceStream(toReplace: string, replacement: string | Buffer) { if (typeof toReplace !== 'string') { throw new TypeError('toReplace must be a string'); } @@ -78,6 +78,7 @@ export function createReplaceStream(toReplace, replacement) { this.push(buffer); } + // @ts-expect-error buffer = null; callback(); }, diff --git a/src/legacy/utils/streams/split_stream.test.js b/src/core/server/utils/streams/split_stream.test.ts similarity index 93% rename from src/legacy/utils/streams/split_stream.test.js rename to src/core/server/utils/streams/split_stream.test.ts index e0736d220ba5c..f131bd0661e54 100644 --- a/src/legacy/utils/streams/split_stream.test.js +++ b/src/core/server/utils/streams/split_stream.test.ts @@ -16,16 +16,16 @@ * specific language governing permissions and limitations * under the License. */ +import { Transform } from 'stream'; +import { createSplitStream, createConcatStream, createPromiseFromStreams } from './index'; -import { createSplitStream, createConcatStream, createPromiseFromStreams } from './'; - -async function split(stream, input) { +async function split(stream: Transform, input: Array) { const concat = createConcatStream(); concat.write([]); stream.pipe(concat); const output = createPromiseFromStreams([concat]); - input.forEach((i) => { + input.forEach((i: any) => { stream.write(i); }); stream.end(); diff --git a/src/legacy/utils/streams/split_stream.js b/src/core/server/utils/streams/split_stream.ts similarity index 95% rename from src/legacy/utils/streams/split_stream.js rename to src/core/server/utils/streams/split_stream.ts index f55cbc7bd290d..ae820f60abbf6 100644 --- a/src/legacy/utils/streams/split_stream.js +++ b/src/core/server/utils/streams/split_stream.ts @@ -38,7 +38,7 @@ import { Transform } from 'stream'; * @param {String} splitChunk * @return {Transform} */ -export function createSplitStream(splitChunk) { +export function createSplitStream(splitChunk: string | Uint8Array) { let unsplitBuffer = Buffer.alloc(0); return new Transform({ @@ -55,7 +55,7 @@ export function createSplitStream(splitChunk) { } unsplitBuffer = toSplit; - callback(null); + callback(); } catch (err) { callback(err); } @@ -65,7 +65,7 @@ export function createSplitStream(splitChunk) { try { this.push(unsplitBuffer.toString('utf8')); - callback(null); + callback(); } catch (err) { callback(err); } diff --git a/src/dev/build/lib/watch_stdio_for_line.ts b/src/dev/build/lib/watch_stdio_for_line.ts index 2322d017abc61..3d7929ccfc33a 100644 --- a/src/dev/build/lib/watch_stdio_for_line.ts +++ b/src/dev/build/lib/watch_stdio_for_line.ts @@ -24,7 +24,7 @@ import { createPromiseFromStreams, createSplitStream, createMapStream, -} from '../../../legacy/utils/streams'; +} from '../../../core/server/utils'; // creates a stream that skips empty lines unless they are followed by // another line, preventing the empty lines produced by splitStream diff --git a/src/legacy/server/i18n/index.ts b/src/legacy/server/i18n/index.ts index 09f7022436049..e895f83fe6901 100644 --- a/src/legacy/server/i18n/index.ts +++ b/src/legacy/server/i18n/index.ts @@ -20,7 +20,6 @@ import { i18n, i18nLoader } from '@kbn/i18n'; import { basename } from 'path'; import { Server } from 'hapi'; -// eslint-disable-next-line @kbn/eslint/no-restricted-paths import { fromRoot } from '../../../core/server/utils'; import { getTranslationPaths } from './get_translations_path'; import { I18N_RC } from './constants'; diff --git a/src/legacy/server/logging/log_format_json.test.js b/src/legacy/server/logging/log_format_json.test.js index 31e622ecae611..f4fb939750566 100644 --- a/src/legacy/server/logging/log_format_json.test.js +++ b/src/legacy/server/logging/log_format_json.test.js @@ -21,7 +21,7 @@ import moment from 'moment'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { attachMetaData } from '../../../../src/core/server/legacy/logging/legacy_logging_server'; -import { createListStream, createPromiseFromStreams } from '../../utils'; +import { createListStream, createPromiseFromStreams } from '../../../core/server/utils'; import KbnLoggerJsonFormat from './log_format_json'; diff --git a/src/legacy/server/logging/log_format_string.test.js b/src/legacy/server/logging/log_format_string.test.js index 067ad70380961..842325865cce2 100644 --- a/src/legacy/server/logging/log_format_string.test.js +++ b/src/legacy/server/logging/log_format_string.test.js @@ -21,7 +21,7 @@ import moment from 'moment'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { attachMetaData } from '../../../../src/core/server/legacy/logging/legacy_logging_server'; -import { createListStream, createPromiseFromStreams } from '../../utils'; +import { createListStream, createPromiseFromStreams } from '../../../core/server/utils'; import KbnLoggerStringFormat from './log_format_string'; diff --git a/src/legacy/utils/artifact_type.ts b/src/legacy/utils/artifact_type.ts index 69f728e9e2220..ef471ef8e050d 100644 --- a/src/legacy/utils/artifact_type.ts +++ b/src/legacy/utils/artifact_type.ts @@ -17,7 +17,6 @@ * under the License. */ -// eslint-disable-next-line @kbn/eslint/no-restricted-paths import { pkg } from '../../core/server/utils'; export const IS_KIBANA_DISTRIBUTABLE = pkg.build && pkg.build.distributable === true; export const IS_KIBANA_RELEASE = pkg.build && pkg.build.release === true; diff --git a/src/legacy/utils/index.d.ts b/src/legacy/utils/index.d.ts index c294c79542bbe..a57caad1d34bf 100644 --- a/src/legacy/utils/index.d.ts +++ b/src/legacy/utils/index.d.ts @@ -18,16 +18,3 @@ */ export function unset(object: object, rawPath: string): void; - -export { - concatStreamProviders, - createConcatStream, - createFilterStream, - createIntersperseStream, - createListStream, - createMapStream, - createPromiseFromStreams, - createReduceStream, - createReplaceStream, - createSplitStream, -} from './streams'; diff --git a/src/legacy/utils/index.js b/src/legacy/utils/index.js index 4274fb2e4901a..529b1ddfd8a4d 100644 --- a/src/legacy/utils/index.js +++ b/src/legacy/utils/index.js @@ -23,15 +23,3 @@ export { deepCloneWithBuffers } from './deep_clone_with_buffers'; export { unset } from './unset'; export { IS_KIBANA_DISTRIBUTABLE } from './artifact_type'; export { IS_KIBANA_RELEASE } from './artifact_type'; - -export { - concatStreamProviders, - createConcatStream, - createIntersperseStream, - createListStream, - createPromiseFromStreams, - createReduceStream, - createSplitStream, - createMapStream, - createReplaceStream, -} from './streams'; diff --git a/src/legacy/utils/streams/index.d.ts b/src/legacy/utils/streams/index.d.ts deleted file mode 100644 index 470b5d9fa3505..0000000000000 --- a/src/legacy/utils/streams/index.d.ts +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { Readable, Writable, Transform, TransformOptions } from 'stream'; - -export function concatStreamProviders( - sourceProviders: Array<() => Readable>, - options: TransformOptions -): Transform; -export function createIntersperseStream(intersperseChunk: string | Buffer): Transform; -export function createSplitStream(splitChunk: T): Transform; -export function createListStream(items: any | any[]): Readable; -export function createReduceStream(reducer: (value: any, chunk: T, enc: string) => T): Transform; -export function createPromiseFromStreams([first, ...rest]: [Readable, ...Writable[]]): Promise< - T ->; -export function createConcatStream(initial?: any): Transform; -export function createMapStream(fn: (value: T, i: number) => void): Transform; -export function createReplaceStream(toReplace: string, replacement: string | Buffer): Transform; -export function createFilterStream(fn: (obj: T) => boolean): Transform; diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/import_rules_route.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/import_rules_route.ts index 18eea7c45585f..8a7215e5a5bad 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/import_rules_route.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/import_rules_route.ts @@ -20,7 +20,7 @@ import { importRulesSchema as importRulesResponseSchema, } from '../../../../../common/detection_engine/schemas/response/import_rules_schema'; import { IRouter } from '../../../../../../../../src/core/server'; -import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams'; +import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils/'; import { DETECTION_ENGINE_RULES_URL } from '../../../../../common/constants'; import { ConfigType } from '../../../../config'; import { SetupPlugins } from '../../../../plugin'; diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils.test.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils.test.ts index 3122db1919c3c..11f74c264ae0c 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils.test.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils.test.ts @@ -22,7 +22,7 @@ import { INTERNAL_IDENTIFIER } from '../../../../../common/constants'; import { RuleTypeParams } from '../../types'; import { BulkError, ImportSuccessError } from '../utils'; import { getOutputRuleAlertForRest } from '../__mocks__/utils'; -import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams'; +import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils'; import { PartialAlert } from '../../../../../../alerts/server'; import { SanitizedAlert } from '../../../../../../alerts/server/types'; import { createRulesStreamFromNdJson } from '../../rules/create_rules_stream_from_ndjson'; diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.test.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.test.ts index f2061ce1d36de..60071bc2cef41 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.test.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.test.ts @@ -5,7 +5,7 @@ */ import { Readable } from 'stream'; import { createRulesStreamFromNdJson } from './create_rules_stream_from_ndjson'; -import { createPromiseFromStreams } from 'src/legacy/utils/streams'; +import { createPromiseFromStreams } from 'src/core/server/utils'; import { BadRequestError } from '../errors/bad_request_error'; import { ImportRulesSchemaDecoded } from '../../../../common/detection_engine/schemas/request/import_rules_schema'; diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.ts index d7723232ca921..cd574a8d95615 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/rules/create_rules_stream_from_ndjson.ts @@ -19,7 +19,7 @@ import { createSplitStream, createMapStream, createConcatStream, -} from '../../../../../../../src/legacy/utils/streams'; +} from '../../../../../../../src/core/server/utils'; import { BadRequestError } from '../errors/bad_request_error'; import { parseNdjsonStrings, diff --git a/x-pack/plugins/security_solution/server/lib/timeline/create_timelines_stream_from_ndjson.ts b/x-pack/plugins/security_solution/server/lib/timeline/create_timelines_stream_from_ndjson.ts index 6b4017b5e4d5c..2827cd373d5e7 100644 --- a/x-pack/plugins/security_solution/server/lib/timeline/create_timelines_stream_from_ndjson.ts +++ b/x-pack/plugins/security_solution/server/lib/timeline/create_timelines_stream_from_ndjson.ts @@ -13,7 +13,7 @@ import { createConcatStream, createSplitStream, createMapStream, -} from '../../../../../../src/legacy/utils'; +} from '../../../../../../src/core/server/utils'; import { parseNdjsonStrings, filterExportedCounts, diff --git a/x-pack/plugins/security_solution/server/lib/timeline/routes/import_timelines_route.test.ts b/x-pack/plugins/security_solution/server/lib/timeline/routes/import_timelines_route.test.ts index ff76045db90cb..388ab5db3e852 100644 --- a/x-pack/plugins/security_solution/server/lib/timeline/routes/import_timelines_route.test.ts +++ b/x-pack/plugins/security_solution/server/lib/timeline/routes/import_timelines_route.test.ts @@ -79,7 +79,7 @@ describe('import timelines', () => { }; }); - jest.doMock('../../../../../../../src/legacy/utils', () => { + jest.doMock('../../../../../../../src/core/server/utils', () => { return { createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedObjects), }; @@ -543,7 +543,7 @@ describe('import timeline templates', () => { }; }); - jest.doMock('../../../../../../../src/legacy/utils', () => { + jest.doMock('../../../../../../../src/core/server/utils', () => { return { createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedTemplateTimelineObjects), }; diff --git a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/common.ts b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/common.ts index fc25f1a48194e..9a3dbf365e026 100644 --- a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/common.ts +++ b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/common.ts @@ -10,7 +10,7 @@ import { Readable } from 'stream'; import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; -import { createListStream } from '../../../../../../../../src/legacy/utils'; +import { createListStream } from '../../../../../../../../src/core/server/utils'; import { SetupPlugins } from '../../../../plugin'; diff --git a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/import_timelines.ts b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/import_timelines.ts index f62f02cc7bba9..a19b18e7d89b1 100644 --- a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/import_timelines.ts +++ b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/import_timelines.ts @@ -21,7 +21,7 @@ import { createBulkErrorObject, BulkError } from '../../../detection_engine/rout import { createTimelines } from './create_timelines'; import { FrameworkRequest } from '../../../framework'; import { createTimelinesStreamFromNdJson } from '../../create_timelines_stream_from_ndjson'; -import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils'; +import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils'; import { getTupleDuplicateErrorsAndUniqueTimeline } from './get_timelines_from_stream'; import { CompareTimelinesStatus } from './compare_timelines_status'; diff --git a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/install_prepacked_timelines.test.ts b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/install_prepacked_timelines.test.ts index 66f16db01a508..c63978a1f046e 100644 --- a/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/install_prepacked_timelines.test.ts +++ b/x-pack/plugins/security_solution/server/lib/timeline/routes/utils/install_prepacked_timelines.test.ts @@ -5,7 +5,7 @@ */ import { join, resolve } from 'path'; -import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams'; +import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils'; import { SecurityPluginSetup } from '../../../../../../security/server'; import { FrameworkRequest } from '../../../framework'; diff --git a/x-pack/plugins/security_solution/server/utils/read_stream/create_stream_from_ndjson.ts b/x-pack/plugins/security_solution/server/utils/read_stream/create_stream_from_ndjson.ts index 0eb021bfe2a83..4446e82f99de4 100644 --- a/x-pack/plugins/security_solution/server/utils/read_stream/create_stream_from_ndjson.ts +++ b/x-pack/plugins/security_solution/server/utils/read_stream/create_stream_from_ndjson.ts @@ -16,7 +16,7 @@ import { ImportRulesSchema, } from '../../../common/detection_engine/schemas/request/import_rules_schema'; import { exactCheck } from '../../../common/exact_check'; -import { createMapStream, createFilterStream } from '../../../../../../src/legacy/utils/streams'; +import { createMapStream, createFilterStream } from '../../../../../../src/core/server/utils'; import { BadRequestError } from '../../lib/detection_engine/errors/bad_request_error'; export interface RulesObjectsExportResultDetails { diff --git a/x-pack/plugins/spaces/server/routes/api/__fixtures__/create_copy_to_space_mocks.ts b/x-pack/plugins/spaces/server/routes/api/__fixtures__/create_copy_to_space_mocks.ts index 0e117b3f16e3f..ef6f5e1541a46 100644 --- a/x-pack/plugins/spaces/server/routes/api/__fixtures__/create_copy_to_space_mocks.ts +++ b/x-pack/plugins/spaces/server/routes/api/__fixtures__/create_copy_to_space_mocks.ts @@ -5,7 +5,7 @@ */ import { Readable } from 'stream'; -import { createPromiseFromStreams, createConcatStream } from 'src/legacy/utils'; +import { createPromiseFromStreams, createConcatStream } from 'src/core/server/utils'; async function readStreamToCompletion(stream: Readable) { return (await (createPromiseFromStreams([stream, createConcatStream([])]) as unknown)) as any[];