diff --git a/docs/settings/alert-action-settings.asciidoc b/docs/settings/alert-action-settings.asciidoc index e02c7f212277e..88858c36643ec 100644 --- a/docs/settings/alert-action-settings.asciidoc +++ b/docs/settings/alert-action-settings.asciidoc @@ -37,12 +37,12 @@ You can configure the following settings in the `kibana.yml` file. [cols="2*<"] |=== -| `xpack.actions.whitelistedHosts` +| `xpack.actions.whitelistedHosts` {ess-icon} | A list of hostnames that {kib} is allowed to connect to when built-in actions are triggered. It defaults to `[*]`, allowing any host, but keep in mind the potential for SSRF attacks when hosts are not explicitly whitelisted. An empty list `[]` can be used to block built-in actions from making any external connections. + + Note that hosts associated with built-in actions, such as Slack and PagerDuty, are not automatically whitelisted. If you are not using the default `[*]` setting, you must ensure that the corresponding endpoints are whitelisted as well. -| `xpack.actions.enabledActionTypes` +| `xpack.actions.enabledActionTypes` {ess-icon} | A list of action types that are enabled. It defaults to `[*]`, enabling all types. The names for built-in {kib} action types are prefixed with a `.` and include: `.server-log`, `.slack`, `.email`, `.index`, `.pagerduty`, and `.webhook`. An empty list `[]` will disable all action types. + + Disabled action types will not appear as an option when creating new connectors, but existing connectors and actions of that type will remain in {kib} and will not function. diff --git a/packages/kbn-es-archiver/src/lib/streams/concat_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/concat_stream.test.js new file mode 100644 index 0000000000000..1498334013d1a --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/concat_stream.test.js @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createListStream, createPromiseFromStreams, createConcatStream } from './'; + +describe('concatStream', () => { + test('accepts an initial value', async () => { + const output = await createPromiseFromStreams([ + createListStream([1, 2, 3]), + createConcatStream([0]), + ]); + + expect(output).toEqual([0, 1, 2, 3]); + }); + + describe(`combines using the previous value's concat method`, () => { + test('works with strings', async () => { + const output = await createPromiseFromStreams([ + createListStream(['a', 'b', 'c']), + createConcatStream(), + ]); + expect(output).toEqual('abc'); + }); + + test('works with arrays', async () => { + const output = await createPromiseFromStreams([ + createListStream([[1], [2, 3, 4], [10]]), + createConcatStream(), + ]); + expect(output).toEqual([1, 2, 3, 4, 10]); + }); + + test('works with a mixture, starting with array', async () => { + const output = await createPromiseFromStreams([ + createListStream([[], 1, 2, 3, 4, [5, 6, 7]]), + createConcatStream(), + ]); + expect(output).toEqual([1, 2, 3, 4, 5, 6, 7]); + }); + + test('fails when the value does not have a concat method', async () => { + let promise; + try { + promise = createPromiseFromStreams([createListStream([1, '1']), createConcatStream()]); + } catch (err) { + throw new Error('createPromiseFromStreams() should not fail synchronously'); + } + + try { + await promise; + throw new Error('Promise should have rejected'); + } catch (err) { + expect(err).toBeInstanceOf(Error); + expect(err.message).toContain('concat'); + } + }); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/concat_stream.ts b/packages/kbn-es-archiver/src/lib/streams/concat_stream.ts new file mode 100644 index 0000000000000..03dd894067afc --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/concat_stream.ts @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createReduceStream } from './reduce_stream'; + +/** + * Creates a Transform stream that consumes all provided + * values and concatenates them using each values `concat` + * method. + * + * Concatenate strings: + * createListStream(['f', 'o', 'o']) + * .pipe(createConcatStream()) + * .on('data', console.log) + * // logs "foo" + * + * Concatenate values into an array: + * createListStream([1,2,3]) + * .pipe(createConcatStream([])) + * .on('data', console.log) + * // logs "[1,2,3]" + */ +export function createConcatStream(initial: any) { + return createReduceStream((acc, chunk) => acc.concat(chunk), initial); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.test.js b/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.test.js new file mode 100644 index 0000000000000..b742a770b70c8 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.test.js @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Readable } from 'stream'; + +import { concatStreamProviders } from './concat_stream_providers'; +import { createListStream } from './list_stream'; +import { createConcatStream } from './concat_stream'; +import { createPromiseFromStreams } from './promise_from_streams'; + +describe('concatStreamProviders() helper', () => { + test('writes the data from an array of stream providers into a destination stream in order', async () => { + const results = await createPromiseFromStreams([ + concatStreamProviders([ + () => createListStream(['foo', 'bar']), + () => createListStream(['baz']), + () => createListStream(['bug']), + ]), + createConcatStream(''), + ]); + + expect(results).toBe('foobarbazbug'); + }); + + test('emits the errors from a sub-stream to the destination', async () => { + const dest = concatStreamProviders([ + () => createListStream(['foo', 'bar']), + () => + new Readable({ + read() { + this.emit('error', new Error('foo')); + }, + }), + ]); + + const errorListener = jest.fn(); + dest.on('error', errorListener); + + await expect(createPromiseFromStreams([dest])).rejects.toThrowErrorMatchingInlineSnapshot( + `"foo"` + ); + expect(errorListener.mock.calls).toMatchInlineSnapshot(` +Array [ + Array [ + [Error: foo], + ], +] +`); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.ts b/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.ts new file mode 100644 index 0000000000000..4794d76cc7f84 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/concat_stream_providers.ts @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { PassThrough, TransformOptions } from 'stream'; + +/** + * Write the data and errors from a list of stream providers + * to a single stream in order. Stream providers are only + * called right before they will be consumed, and only one + * provider will be active at a time. + */ +export function concatStreamProviders( + sourceProviders: Array<() => NodeJS.ReadableStream>, + options: TransformOptions = {} +) { + const destination = new PassThrough(options); + const queue = sourceProviders.slice(); + + (function pipeNext() { + const provider = queue.shift(); + + if (!provider) { + return; + } + + const source = provider(); + const isLast = !queue.length; + + // if there are more sources to pipe, hook + // into the source completion + if (!isLast) { + source.once('end', pipeNext); + } + + source + // proxy errors from the source to the destination + .once('error', (error) => destination.emit('error', error)) + // pipe the source to the destination but only proxy the + // end event if this is the last source + .pipe(destination, { end: isLast }); + })(); + + return destination; +} diff --git a/packages/kbn-es-archiver/src/lib/streams/filter_stream.test.ts b/packages/kbn-es-archiver/src/lib/streams/filter_stream.test.ts new file mode 100644 index 0000000000000..28b7f2588628e --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/filter_stream.test.ts @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { + createConcatStream, + createFilterStream, + createListStream, + createPromiseFromStreams, +} from './'; + +describe('createFilterStream()', () => { + test('calls the function with each item in the source stream', async () => { + const filter = jest.fn().mockReturnValue(true); + + await createPromiseFromStreams([createListStream(['a', 'b', 'c']), createFilterStream(filter)]); + + expect(filter).toMatchInlineSnapshot(` + [MockFunction] { + "calls": Array [ + Array [ + "a", + ], + Array [ + "b", + ], + Array [ + "c", + ], + ], + "results": Array [ + Object { + "type": "return", + "value": true, + }, + Object { + "type": "return", + "value": true, + }, + Object { + "type": "return", + "value": true, + }, + ], + } + `); + }); + + test('send the filtered values on the output stream', async () => { + const result = await createPromiseFromStreams([ + createListStream([1, 2, 3]), + createFilterStream((n) => n % 2 === 0), + createConcatStream([]), + ]); + + expect(result).toMatchInlineSnapshot(` + Array [ + 2, + ] + `); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams.ts b/packages/kbn-es-archiver/src/lib/streams/filter_stream.ts similarity index 71% rename from packages/kbn-es-archiver/src/lib/streams.ts rename to packages/kbn-es-archiver/src/lib/streams/filter_stream.ts index a90afbe0c4d25..738b9d5793d06 100644 --- a/packages/kbn-es-archiver/src/lib/streams.ts +++ b/packages/kbn-es-archiver/src/lib/streams/filter_stream.ts @@ -17,4 +17,17 @@ * under the License. */ -export * from '../../../../src/legacy/utils/streams'; +import { Transform } from 'stream'; + +export function createFilterStream(fn: (obj: T) => boolean) { + return new Transform({ + objectMode: true, + async transform(obj, _, done) { + const canPushDownStream = fn(obj); + if (canPushDownStream) { + this.push(obj); + } + done(); + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/index.ts b/packages/kbn-es-archiver/src/lib/streams/index.ts new file mode 100644 index 0000000000000..447d1ed5b1c53 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/index.ts @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { concatStreamProviders } from './concat_stream_providers'; +export { createIntersperseStream } from './intersperse_stream'; +export { createSplitStream } from './split_stream'; +export { createListStream } from './list_stream'; +export { createReduceStream } from './reduce_stream'; +export { createPromiseFromStreams } from './promise_from_streams'; +export { createConcatStream } from './concat_stream'; +export { createMapStream } from './map_stream'; +export { createReplaceStream } from './replace_stream'; +export { createFilterStream } from './filter_stream'; diff --git a/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.test.js new file mode 100644 index 0000000000000..e11b36d77106a --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.test.js @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { + createPromiseFromStreams, + createListStream, + createIntersperseStream, + createConcatStream, +} from './'; + +describe('intersperseStream', () => { + test('places the intersperse value between each provided value', async () => { + expect( + await createPromiseFromStreams([ + createListStream(['to', 'be', 'or', 'not', 'to', 'be']), + createIntersperseStream(' '), + createConcatStream(), + ]) + ).toBe('to be or not to be'); + }); + + test('emits values as soon as possible, does not needlessly buffer', async () => { + const str = createIntersperseStream('y'); + const onData = jest.fn(); + str.on('data', onData); + + str.write('a'); + expect(onData).toHaveBeenCalledTimes(1); + expect(onData.mock.calls[0]).toEqual(['a']); + onData.mockClear(); + + str.write('b'); + expect(onData).toHaveBeenCalledTimes(2); + expect(onData.mock.calls[0]).toEqual(['y']); + expect(onData).toHaveBeenCalledTimes(2); + expect(onData.mock.calls[1]).toEqual(['b']); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.ts b/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.ts new file mode 100644 index 0000000000000..eb2e3d3087d4a --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/intersperse_stream.ts @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Transform } from 'stream'; + +/** + * Create a Transform stream that receives values in object mode, + * and intersperses a chunk between each object received. + * + * This is useful for writing lists: + * + * createListStream(['foo', 'bar']) + * .pipe(createIntersperseStream('\n')) + * .pipe(process.stdout) // outputs "foo\nbar" + * + * Combine with a concat stream to get "join" like functionality: + * + * await createPromiseFromStreams([ + * createListStream(['foo', 'bar']), + * createIntersperseStream(' '), + * createConcatStream() + * ]) // produces a single value "foo bar" + */ +export function createIntersperseStream(intersperseChunk: any) { + let first = true; + + return new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform(chunk, _, callback) { + try { + if (first) { + first = false; + } else { + this.push(intersperseChunk); + } + + this.push(chunk); + callback(undefined); + } catch (err) { + callback(err); + } + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/list_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/list_stream.test.js new file mode 100644 index 0000000000000..12e20696b0510 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/list_stream.test.js @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createListStream } from './'; + +describe('listStream', () => { + test('provides the values in the initial list', async () => { + const str = createListStream([1, 2, 3, 4]); + const onData = jest.fn(); + str.on('data', onData); + + await new Promise((resolve) => str.on('end', resolve)); + + expect(onData).toHaveBeenCalledTimes(4); + expect(onData.mock.calls[0]).toEqual([1]); + expect(onData.mock.calls[1]).toEqual([2]); + expect(onData.mock.calls[2]).toEqual([3]); + expect(onData.mock.calls[3]).toEqual([4]); + }); + + test('does not modify the list passed', async () => { + const list = [1, 2, 3, 4]; + const str = createListStream(list); + str.resume(); + await new Promise((resolve) => str.on('end', resolve)); + expect(list).toEqual([1, 2, 3, 4]); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/list_stream.ts b/packages/kbn-es-archiver/src/lib/streams/list_stream.ts new file mode 100644 index 0000000000000..c061b969b3c09 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/list_stream.ts @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Readable } from 'stream'; + +/** + * Create a Readable stream that provides the items + * from a list as objects to subscribers + */ +export function createListStream(items: any | any[] = []) { + const queue: any[] = [].concat(items); + + return new Readable({ + objectMode: true, + read(size) { + queue.splice(0, size).forEach((item) => { + this.push(item); + }); + + if (!queue.length) { + this.push(null); + } + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/map_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/map_stream.test.js new file mode 100644 index 0000000000000..d86da178f0c1b --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/map_stream.test.js @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { delay } from 'bluebird'; + +import { createPromiseFromStreams } from './promise_from_streams'; +import { createListStream } from './list_stream'; +import { createMapStream } from './map_stream'; +import { createConcatStream } from './concat_stream'; + +describe('createMapStream()', () => { + test('calls the function with each item in the source stream', async () => { + const mapper = jest.fn(); + + await createPromiseFromStreams([createListStream(['a', 'b', 'c']), createMapStream(mapper)]); + + expect(mapper).toHaveBeenCalledTimes(3); + expect(mapper).toHaveBeenCalledWith('a', 0); + expect(mapper).toHaveBeenCalledWith('b', 1); + expect(mapper).toHaveBeenCalledWith('c', 2); + }); + + test('send the return value from the mapper on the output stream', async () => { + const result = await createPromiseFromStreams([ + createListStream([1, 2, 3]), + createMapStream((n) => n * 100), + createConcatStream([]), + ]); + + expect(result).toEqual([100, 200, 300]); + }); + + test('supports async mappers', async () => { + const result = await createPromiseFromStreams([ + createListStream([1, 2, 3]), + createMapStream(async (n, i) => { + await delay(n); + return n * i; + }), + createConcatStream([]), + ]); + + expect(result).toEqual([0, 2, 6]); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/map_stream.ts b/packages/kbn-es-archiver/src/lib/streams/map_stream.ts new file mode 100644 index 0000000000000..e88c512a38653 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/map_stream.ts @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Transform } from 'stream'; + +export function createMapStream(fn: (chunk: any, i: number) => T | Promise) { + let i = 0; + + return new Transform({ + objectMode: true, + async transform(value, _, done) { + try { + this.push(await fn(value, i++)); + done(); + } catch (err) { + done(err); + } + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.test.js b/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.test.js new file mode 100644 index 0000000000000..e4d9835106f12 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.test.js @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Readable, Writable, Duplex, Transform } from 'stream'; + +import { createListStream, createPromiseFromStreams, createReduceStream } from './'; + +describe('promiseFromStreams', () => { + test('pipes together an array of streams', async () => { + const str1 = createListStream([1, 2, 3]); + const str2 = createReduceStream((acc, n) => acc + n, 0); + const sumPromise = new Promise((resolve) => str2.once('data', resolve)); + createPromiseFromStreams([str1, str2]); + await new Promise((resolve) => str2.once('end', resolve)); + expect(await sumPromise).toBe(6); + }); + + describe('last stream is writable', () => { + test('waits for the last stream to finish writing', async () => { + let written = ''; + + await createPromiseFromStreams([ + createListStream(['a']), + new Writable({ + write(chunk, enc, cb) { + setTimeout(() => { + written += chunk; + cb(); + }, 100); + }, + }), + ]); + + expect(written).toBe('a'); + }); + + test('resolves to undefined', async () => { + const result = await createPromiseFromStreams([ + createListStream(['a']), + new Writable({ + write(chunk, enc, cb) { + cb(); + }, + }), + ]); + + expect(result).toBe(undefined); + }); + }); + + describe('last stream is readable', () => { + test(`resolves to it's final value`, async () => { + const result = await createPromiseFromStreams([createListStream(['a', 'b', 'c'])]); + + expect(result).toBe('c'); + }); + }); + + describe('last stream is duplex', () => { + test('waits for writing and resolves to final value', async () => { + let written = ''; + + const duplexReadQueue = []; + const duplexItemsToPush = ['foo', 'bar', null]; + const result = await createPromiseFromStreams([ + createListStream(['a', 'b', 'c']), + new Duplex({ + async read() { + const result = await duplexReadQueue.shift(); + this.push(result); + }, + + write(chunk, enc, cb) { + duplexReadQueue.push( + new Promise((resolve) => { + setTimeout(() => { + written += chunk; + cb(); + resolve(duplexItemsToPush.shift()); + }, 50); + }) + ); + }, + }).setEncoding('utf8'), + ]); + + expect(written).toEqual('abc'); + expect(result).toBe('bar'); + }); + }); + + describe('error handling', () => { + test('read stream gets destroyed when transform stream fails', async () => { + let destroyCalled = false; + const readStream = new Readable({ + read() { + this.push('a'); + this.push('b'); + this.push('c'); + this.push(null); + }, + destroy() { + destroyCalled = true; + }, + }); + const transformStream = new Transform({ + transform(chunk, enc, done) { + done(new Error('Test error')); + }, + }); + try { + await createPromiseFromStreams([readStream, transformStream]); + throw new Error('Should fail'); + } catch (e) { + expect(e.message).toBe('Test error'); + expect(destroyCalled).toBe(true); + } + }); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.ts b/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.ts new file mode 100644 index 0000000000000..fefb18be14780 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/promise_from_streams.ts @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Take an array of streams, pipe the output + * from each one into the next, listening for + * errors from any of the streams, and then resolve + * the promise once the final stream has finished + * writing/reading. + * + * If the last stream is readable, it's final value + * will be provided as the promise value. + * + * Errors emitted from any stream will cause + * the promise to be rejected with that error. + */ + +import { pipeline, Writable } from 'stream'; +import { promisify } from 'util'; + +const asyncPipeline = promisify(pipeline); + +export async function createPromiseFromStreams(streams: any): Promise { + let finalChunk: any; + const last = streams[streams.length - 1]; + if (typeof last.read !== 'function' && streams.length === 1) { + // For a nicer error than what stream.pipeline throws + throw new Error('A minimum of 2 streams is required when a non-readable stream is given'); + } + if (typeof last.read === 'function') { + // We are pushing a writable stream to capture the last chunk + streams.push( + new Writable({ + // Use object mode even when "last" stream isn't. This allows to + // capture the last chunk as-is. + objectMode: true, + write(chunk, _, done) { + finalChunk = chunk; + done(); + }, + }) + ); + } + + await asyncPipeline(...(streams as [any])); + + return finalChunk; +} diff --git a/packages/kbn-es-archiver/src/lib/streams/reduce_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/reduce_stream.test.js new file mode 100644 index 0000000000000..2c073f67f82a8 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/reduce_stream.test.js @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createReduceStream, createPromiseFromStreams, createListStream } from './'; + +const promiseFromEvent = (name, emitter) => + new Promise((resolve) => emitter.on(name, () => resolve(name))); + +describe('reduceStream', () => { + test('calls the reducer for each item provided', async () => { + const stub = jest.fn(); + await createPromiseFromStreams([ + createListStream([1, 2, 3]), + createReduceStream((val, chunk, enc) => { + stub(val, chunk, enc); + return chunk; + }, 0), + ]); + expect(stub).toHaveBeenCalledTimes(3); + expect(stub.mock.calls[0]).toEqual([0, 1, 'utf8']); + expect(stub.mock.calls[1]).toEqual([1, 2, 'utf8']); + expect(stub.mock.calls[2]).toEqual([2, 3, 'utf8']); + }); + + test('provides the return value of the last iteration of the reducer', async () => { + const result = await createPromiseFromStreams([ + createListStream('abcdefg'.split('')), + createReduceStream((acc) => acc + 1, 0), + ]); + expect(result).toBe(7); + }); + + test('emits an error if an iteration fails', async () => { + const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0); + const errorEvent = promiseFromEvent('error', reduce); + + reduce.write(1); + reduce.write(2); + reduce.resume(); + await errorEvent; + }); + + test('stops calling the reducer if an iteration fails, emits no data', async () => { + const reducer = jest.fn((acc, i) => { + if (i < 100) return acc + i; + else throw new Error(i); + }); + const reduce$ = createReduceStream(reducer, 0); + + const dataStub = jest.fn(); + const errorStub = jest.fn(); + reduce$.on('data', dataStub); + reduce$.on('error', errorStub); + const endEvent = promiseFromEvent('end', reduce$); + + reduce$.write(1); + reduce$.write(2); + reduce$.write(300); + reduce$.write(400); + reduce$.write(1000); + reduce$.end(); + + await endEvent; + expect(reducer).toHaveBeenCalledTimes(3); + expect(dataStub).toHaveBeenCalledTimes(0); + expect(errorStub).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/reduce_stream.ts b/packages/kbn-es-archiver/src/lib/streams/reduce_stream.ts new file mode 100644 index 0000000000000..d9458e9a11c33 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/reduce_stream.ts @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Transform } from 'stream'; + +/** + * Create a transform stream that consumes each chunk it receives + * and passes it to the reducer, which will return the new value + * for the stream. Once all chunks have been received the reduce + * stream provides the result of final call to the reducer to + * subscribers. + */ +export function createReduceStream( + reducer: (acc: any, chunk: any, env: string) => any, + initial: any +) { + let i = -1; + let value = initial; + + // if the reducer throws an error then the value is + // considered invalid and the stream will never provide + // it to subscribers. We will also stop calling the + // reducer for any new data that is provided to us + let failed = false; + + if (typeof reducer !== 'function') { + throw new TypeError('reducer must be a function'); + } + + return new Transform({ + readableObjectMode: true, + writableObjectMode: true, + async transform(chunk, enc, callback) { + try { + if (failed) { + return callback(); + } + + i += 1; + if (i === 0 && initial === undefined) { + value = chunk; + } else { + value = await reducer(value, chunk, enc); + } + + callback(); + } catch (err) { + failed = true; + callback(err); + } + }, + + flush(callback) { + if (!failed) { + this.push(value); + } + + callback(); + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/replace_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/replace_stream.test.js new file mode 100644 index 0000000000000..01b89f93e5af0 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/replace_stream.test.js @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { + createReplaceStream, + createConcatStream, + createPromiseFromStreams, + createListStream, + createMapStream, +} from './'; + +async function concatToString(streams) { + return await createPromiseFromStreams([ + ...streams, + createMapStream((buff) => buff.toString('utf8')), + createConcatStream(''), + ]); +} + +describe('replaceStream', () => { + test('produces buffers when it receives buffers', async () => { + const chunks = await createPromiseFromStreams([ + createListStream([Buffer.from('foo'), Buffer.from('bar')]), + createReplaceStream('o', '0'), + createConcatStream([]), + ]); + + chunks.forEach((chunk) => { + expect(chunk).toBeInstanceOf(Buffer); + }); + }); + + test('produces buffers when it receives strings', async () => { + const chunks = await createPromiseFromStreams([ + createListStream(['foo', 'bar']), + createReplaceStream('o', '0'), + createConcatStream([]), + ]); + + chunks.forEach((chunk) => { + expect(chunk).toBeInstanceOf(Buffer); + }); + }); + + test('expects toReplace to be a string', () => { + expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/); + }); + + test('replaces multiple single-char instances in a single chunk', async () => { + expect( + await concatToString([ + createListStream([Buffer.from('f00 bar')]), + createReplaceStream('0', 'o'), + ]) + ).toBe('foo bar'); + }); + + test('replaces multiple single-char instances in multiple chunks', async () => { + expect( + await concatToString([ + createListStream([Buffer.from('f0'), Buffer.from('0 bar')]), + createReplaceStream('0', 'o'), + ]) + ).toBe('foo bar'); + }); + + test('replaces single multi-char instances in single chunks', async () => { + expect( + await concatToString([ + createListStream([Buffer.from('f0'), Buffer.from('0 bar')]), + createReplaceStream('0', 'o'), + ]) + ).toBe('foo bar'); + }); + + test('replaces multiple multi-char instances in single chunks', async () => { + expect( + await concatToString([ + createListStream([Buffer.from('foo ba'), Buffer.from('r b'), Buffer.from('az bar')]), + createReplaceStream('bar', '*'), + ]) + ).toBe('foo * baz *'); + }); + + test('replaces multi-char instance that stretches multiple chunks', async () => { + expect( + await concatToString([ + createListStream([ + Buffer.from('foo supe'), + Buffer.from('rcalifra'), + Buffer.from('gilistic'), + Buffer.from('expialid'), + Buffer.from('ocious bar'), + ]), + createReplaceStream('supercalifragilisticexpialidocious', '*'), + ]) + ).toBe('foo * bar'); + }); + + test('ignores missing multi-char instance', async () => { + expect( + await concatToString([ + createListStream([ + Buffer.from('foo supe'), + Buffer.from('rcalifra'), + Buffer.from('gili stic'), + Buffer.from('expialid'), + Buffer.from('ocious bar'), + ]), + createReplaceStream('supercalifragilisticexpialidocious', '*'), + ]) + ).toBe('foo supercalifragili sticexpialidocious bar'); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/replace_stream.ts b/packages/kbn-es-archiver/src/lib/streams/replace_stream.ts new file mode 100644 index 0000000000000..fe2ba1fcdf31c --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/replace_stream.ts @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Transform } from 'stream'; + +export function createReplaceStream(toReplace: string, replacement: string) { + if (typeof toReplace !== 'string') { + throw new TypeError('toReplace must be a string'); + } + + let buffer = Buffer.alloc(0); + return new Transform({ + objectMode: false, + async transform(value, _, done) { + try { + buffer = Buffer.concat([buffer, value], buffer.length + value.length); + + while (true) { + // try to find the next instance of `toReplace` in buffer + const index = buffer.indexOf(toReplace); + + // if there is no next instance, break + if (index === -1) { + break; + } + + // flush everything to the left of the next instance + // of `toReplace` + this.push(buffer.slice(0, index)); + + // then flush an instance of `replacement` + this.push(replacement); + + // and finally update the buffer to include everything + // to the right of `toReplace`, dropping to replace from the buffer + buffer = buffer.slice(index + toReplace.length); + } + + // until now we have only flushed data that is to the left + // of a discovered instance of `toReplace`. If `toReplace` is + // never found this would lead to us buffering the entire stream. + // + // Instead, we only keep enough buffer to complete a potentially + // partial instance of `toReplace` + if (buffer.length > toReplace.length) { + // the entire buffer except the last `toReplace.length` bytes + // so that if all but one byte from `toReplace` is in the buffer, + // and the next chunk delivers the necessary byte, the buffer will then + // contain a complete `toReplace` token. + this.push(buffer.slice(0, buffer.length - toReplace.length)); + buffer = buffer.slice(-toReplace.length); + } + + done(); + } catch (err) { + done(err); + } + }, + + flush(callback) { + if (buffer.length) { + this.push(buffer); + } + + callback(); + }, + }); +} diff --git a/packages/kbn-es-archiver/src/lib/streams/split_stream.test.js b/packages/kbn-es-archiver/src/lib/streams/split_stream.test.js new file mode 100644 index 0000000000000..e0736d220ba5c --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/split_stream.test.js @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createSplitStream, createConcatStream, createPromiseFromStreams } from './'; + +async function split(stream, input) { + const concat = createConcatStream(); + concat.write([]); + stream.pipe(concat); + const output = createPromiseFromStreams([concat]); + + input.forEach((i) => { + stream.write(i); + }); + stream.end(); + + return await output; +} + +describe('splitStream', () => { + test('splits buffers, produces strings', async () => { + const output = await split(createSplitStream('&'), [Buffer.from('foo&bar')]); + expect(output).toEqual(['foo', 'bar']); + }); + + test('supports mixed input', async () => { + const output = await split(createSplitStream('&'), [Buffer.from('foo&b'), 'ar']); + expect(output).toEqual(['foo', 'bar']); + }); + + test('supports buffer split chunks', async () => { + const output = await split(createSplitStream(Buffer.from('&')), ['foo&b', 'ar']); + expect(output).toEqual(['foo', 'bar']); + }); + + test('splits provided values by a delimiter', async () => { + const output = await split(createSplitStream('&'), ['foo&b', 'ar']); + expect(output).toEqual(['foo', 'bar']); + }); + + test('handles multi-character delimiters', async () => { + const output = await split(createSplitStream('oo'), ['foo&b', 'ar']); + expect(output).toEqual(['f', '&bar']); + }); + + test('handles delimiters that span multiple chunks', async () => { + const output = await split(createSplitStream('ba'), ['foo&b', 'ar']); + expect(output).toEqual(['foo&', 'r']); + }); + + test('produces an empty chunk if the split char is at the end of the input', async () => { + const output = await split(createSplitStream('&bar'), ['foo&b', 'ar']); + expect(output).toEqual(['foo', '']); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/streams/split_stream.ts b/packages/kbn-es-archiver/src/lib/streams/split_stream.ts new file mode 100644 index 0000000000000..1c9b59449bd92 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/streams/split_stream.ts @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Transform } from 'stream'; + +/** + * Creates a Transform stream that consumes a stream of Buffers + * and produces a stream of strings (in object mode) by splitting + * the received bytes using the splitChunk. + * + * Ways this is behaves like String#split: + * - instances of splitChunk are removed from the input + * - splitChunk can be on any size + * - if there are no bytes found after the last splitChunk + * a final empty chunk is emitted + * + * Ways this deviates from String#split: + * - splitChunk cannot be a regexp + * - an empty string or Buffer will not produce a stream of individual + * bytes like `string.split('')` would + */ +export function createSplitStream(splitChunk: string) { + let unsplitBuffer = Buffer.alloc(0); + + return new Transform({ + writableObjectMode: false, + readableObjectMode: true, + transform(chunk, _, callback) { + try { + let i; + let toSplit = Buffer.concat([unsplitBuffer, chunk]); + while ((i = toSplit.indexOf(splitChunk)) !== -1) { + const slice = toSplit.slice(0, i); + toSplit = toSplit.slice(i + splitChunk.length); + this.push(slice.toString('utf8')); + } + + unsplitBuffer = toSplit; + callback(undefined); + } catch (err) { + callback(err); + } + }, + + flush(callback) { + try { + this.push(unsplitBuffer.toString('utf8')); + + callback(undefined); + } catch (err) { + callback(err); + } + }, + }); +} diff --git a/packages/kbn-pm/dist/index.js b/packages/kbn-pm/dist/index.js index eb2d0d2581a34..9a3bb1c687032 100644 --- a/packages/kbn-pm/dist/index.js +++ b/packages/kbn-pm/dist/index.js @@ -27651,6 +27651,7 @@ var eos = function(stream, opts, callback) { var rs = stream._readableState; var readable = opts.readable || (opts.readable !== false && stream.readable); var writable = opts.writable || (opts.writable !== false && stream.writable); + var cancelled = false; var onlegacyfinish = function() { if (!stream.writable) onfinish(); @@ -27675,8 +27676,13 @@ var eos = function(stream, opts, callback) { }; var onclose = function() { - if (readable && !(rs && rs.ended)) return callback.call(stream, new Error('premature close')); - if (writable && !(ws && ws.ended)) return callback.call(stream, new Error('premature close')); + process.nextTick(onclosenexttick); + }; + + var onclosenexttick = function() { + if (cancelled) return; + if (readable && !(rs && (rs.ended && !rs.destroyed))) return callback.call(stream, new Error('premature close')); + if (writable && !(ws && (ws.ended && !ws.destroyed))) return callback.call(stream, new Error('premature close')); }; var onrequest = function() { @@ -27701,6 +27707,7 @@ var eos = function(stream, opts, callback) { stream.on('close', onclose); return function() { + cancelled = true; stream.removeListener('complete', onfinish); stream.removeListener('abort', onclose); stream.removeListener('request', onrequest); diff --git a/src/plugins/visualizations/public/wizard/__snapshots__/new_vis_modal.test.tsx.snap b/src/plugins/visualizations/public/wizard/__snapshots__/new_vis_modal.test.tsx.snap index 6aed16e937713..3c4c983efa9fa 100644 --- a/src/plugins/visualizations/public/wizard/__snapshots__/new_vis_modal.test.tsx.snap +++ b/src/plugins/visualizations/public/wizard/__snapshots__/new_vis_modal.test.tsx.snap @@ -248,7 +248,7 @@ exports[`NewVisModal filter for visualization types should render as expected 1`