diff --git a/x-pack/plugins/lists/common/constants.mock.ts b/x-pack/plugins/lists/common/constants.mock.ts index 185de02d555b7..7f7a90eeba5a2 100644 --- a/x-pack/plugins/lists/common/constants.mock.ts +++ b/x-pack/plugins/lists/common/constants.mock.ts @@ -41,6 +41,8 @@ export const OPERATOR = 'included'; export const ENTRY_VALUE = 'some host name'; export const MATCH = 'match'; export const MATCH_ANY = 'match_any'; +export const MAX_IMPORT_PAYLOAD_BYTES = 40000000; +export const IMPORT_BUFFER_SIZE = 1000; export const LIST = 'list'; export const EXISTS = 'exists'; export const NESTED = 'nested'; diff --git a/x-pack/plugins/lists/server/config.mock.ts b/x-pack/plugins/lists/server/config.mock.ts new file mode 100644 index 0000000000000..3cf5040c73675 --- /dev/null +++ b/x-pack/plugins/lists/server/config.mock.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + IMPORT_BUFFER_SIZE, + LIST_INDEX, + LIST_ITEM_INDEX, + MAX_IMPORT_PAYLOAD_BYTES, +} from '../common/constants.mock'; + +import { ConfigType } from './config'; + +export const getConfigMock = (): Partial => ({ + listIndex: LIST_INDEX, + listItemIndex: LIST_ITEM_INDEX, +}); + +export const getConfigMockDecoded = (): ConfigType => ({ + enabled: true, + importBufferSize: IMPORT_BUFFER_SIZE, + listIndex: LIST_INDEX, + listItemIndex: LIST_ITEM_INDEX, + maxImportPayloadBytes: MAX_IMPORT_PAYLOAD_BYTES, +}); diff --git a/x-pack/plugins/lists/server/config.test.ts b/x-pack/plugins/lists/server/config.test.ts new file mode 100644 index 0000000000000..60501322dcfa2 --- /dev/null +++ b/x-pack/plugins/lists/server/config.test.ts @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { ConfigSchema, ConfigType } from './config'; +import { getConfigMock, getConfigMockDecoded } from './config.mock'; + +describe('config_schema', () => { + test('it works with expected basic mock data set and defaults', () => { + expect(ConfigSchema.validate(getConfigMock())).toEqual(getConfigMockDecoded()); + }); + + test('it throws if given an invalid value', () => { + const mock: Partial & { madeUpValue: string } = { + madeUpValue: 'something', + ...getConfigMock(), + }; + expect(() => ConfigSchema.validate(mock)).toThrow( + '[madeUpValue]: definition for this key is missing' + ); + }); + + test('it throws if the "maxImportPayloadBytes" value is 0', () => { + const mock: ConfigType = { + ...getConfigMockDecoded(), + maxImportPayloadBytes: 0, + }; + expect(() => ConfigSchema.validate(mock)).toThrow( + '[maxImportPayloadBytes]: Value must be equal to or greater than [1].' + ); + }); + + test('it throws if the "maxImportPayloadBytes" value is less than 0', () => { + const mock: ConfigType = { + ...getConfigMockDecoded(), + maxImportPayloadBytes: -1, + }; + expect(() => ConfigSchema.validate(mock)).toThrow( + '[maxImportPayloadBytes]: Value must be equal to or greater than [1].' + ); + }); + + test('it throws if the "importBufferSize" value is 0', () => { + const mock: ConfigType = { + ...getConfigMockDecoded(), + importBufferSize: 0, + }; + expect(() => ConfigSchema.validate(mock)).toThrow( + '[importBufferSize]: Value must be equal to or greater than [1].' + ); + }); + + test('it throws if the "importBufferSize" value is less than 0', () => { + const mock: ConfigType = { + ...getConfigMockDecoded(), + importBufferSize: -1, + }; + expect(() => ConfigSchema.validate(mock)).toThrow( + '[importBufferSize]: Value must be equal to or greater than [1].' + ); + }); +}); diff --git a/x-pack/plugins/lists/server/config.ts b/x-pack/plugins/lists/server/config.ts index f2fa7e8801033..0fcc68419f8fe 100644 --- a/x-pack/plugins/lists/server/config.ts +++ b/x-pack/plugins/lists/server/config.ts @@ -8,8 +8,10 @@ import { TypeOf, schema } from '@kbn/config-schema'; export const ConfigSchema = schema.object({ enabled: schema.boolean({ defaultValue: true }), + importBufferSize: schema.number({ defaultValue: 1000, min: 1 }), listIndex: schema.string({ defaultValue: '.lists' }), listItemIndex: schema.string({ defaultValue: '.items' }), + maxImportPayloadBytes: schema.number({ defaultValue: 40000000, min: 1 }), }); export type ConfigType = TypeOf; diff --git a/x-pack/plugins/lists/server/create_config.ts b/x-pack/plugins/lists/server/create_config.ts index 7e2e639ce7a35..e46c71798eb9f 100644 --- a/x-pack/plugins/lists/server/create_config.ts +++ b/x-pack/plugins/lists/server/create_config.ts @@ -12,12 +12,6 @@ import { ConfigType } from './config'; export const createConfig$ = ( context: PluginInitializerContext -): Observable< - Readonly<{ - enabled: boolean; - listIndex: string; - listItemIndex: string; - }> -> => { +): Observable> => { return context.config.create().pipe(map((config) => config)); }; diff --git a/x-pack/plugins/lists/server/plugin.ts b/x-pack/plugins/lists/server/plugin.ts index cdd674a19ceb6..b4f2639f24923 100644 --- a/x-pack/plugins/lists/server/plugin.ts +++ b/x-pack/plugins/lists/server/plugin.ts @@ -48,7 +48,7 @@ export class ListPlugin core.http.registerRouteHandlerContext('lists', this.createRouteHandlerContext()); const router = core.http.createRouter(); - initRoutes(router); + initRoutes(router, config); return { getExceptionListClient: (savedObjectsClient, user): ExceptionListClient => { diff --git a/x-pack/plugins/lists/server/routes/import_list_item_route.ts b/x-pack/plugins/lists/server/routes/import_list_item_route.ts index d75199140ea8e..2e629d7516dd1 100644 --- a/x-pack/plugins/lists/server/routes/import_list_item_route.ts +++ b/x-pack/plugins/lists/server/routes/import_list_item_route.ts @@ -4,50 +4,40 @@ * you may not use this file except in compliance with the Elastic License. */ -import { Readable } from 'stream'; - import { IRouter } from 'kibana/server'; +import { schema } from '@kbn/config-schema'; import { LIST_ITEM_URL } from '../../common/constants'; import { buildRouteValidation, buildSiemResponse, transformError } from '../siem_server_deps'; import { validate } from '../../common/siem_common_deps'; -import { importListItemQuerySchema, importListItemSchema, listSchema } from '../../common/schemas'; - -import { getListClient } from '.'; +import { importListItemQuerySchema, listSchema } from '../../common/schemas'; +import { ConfigType } from '../config'; -export interface HapiReadableStream extends Readable { - hapi: { - filename: string; - }; -} +import { createStreamFromBuffer } from './utils/create_stream_from_buffer'; -/** - * Special interface since we are streaming in a file through a reader - */ -export interface ImportListItemHapiFileSchema { - file: HapiReadableStream; -} +import { getListClient } from '.'; -export const importListItemRoute = (router: IRouter): void => { +export const importListItemRoute = (router: IRouter, config: ConfigType): void => { router.post( { options: { body: { - output: 'stream', + accepts: ['multipart/form-data'], + maxBytes: config.maxImportPayloadBytes, + parse: false, }, tags: ['access:lists'], }, path: `${LIST_ITEM_URL}/_import`, validate: { - body: buildRouteValidation( - importListItemSchema - ), + body: schema.buffer(), query: buildRouteValidation(importListItemQuerySchema), }, }, async (context, request, response) => { const siemResponse = buildSiemResponse(response); try { + const stream = createStreamFromBuffer(request.body); const { deserializer, list_id: listId, serializer, type } = request.query; const lists = getListClient(context); if (listId != null) { @@ -63,7 +53,7 @@ export const importListItemRoute = (router: IRouter): void => { listId, meta: undefined, serializer: list.serializer, - stream: request.body.file, + stream, type: list.type, }); @@ -74,26 +64,21 @@ export const importListItemRoute = (router: IRouter): void => { return response.ok({ body: validated ?? {} }); } } else if (type != null) { - const { filename } = request.body.file.hapi; - // TODO: Should we prevent the same file from being uploaded multiple times? - const list = await lists.createListIfItDoesNotExist({ - description: `File uploaded from file system of ${filename}`, + const importedList = await lists.importListItemsToStream({ deserializer, - id: filename, + listId: undefined, meta: undefined, - name: filename, serializer, + stream, type, }); - await lists.importListItemsToStream({ - deserializer: list.deserializer, - listId: list.id, - meta: undefined, - serializer: list.serializer, - stream: request.body.file, - type: list.type, - }); - const [validated, errors] = validate(list, listSchema); + if (importedList == null) { + return siemResponse.error({ + body: 'Unable to parse a valid fileName during import', + statusCode: 400, + }); + } + const [validated, errors] = validate(importedList, listSchema); if (errors != null) { return siemResponse.error({ body: errors, statusCode: 500 }); } else { diff --git a/x-pack/plugins/lists/server/routes/init_routes.ts b/x-pack/plugins/lists/server/routes/init_routes.ts index e74fa471734b0..ffd8afd54913f 100644 --- a/x-pack/plugins/lists/server/routes/init_routes.ts +++ b/x-pack/plugins/lists/server/routes/init_routes.ts @@ -6,6 +6,8 @@ import { IRouter } from 'kibana/server'; +import { ConfigType } from '../config'; + import { createExceptionListItemRoute, createExceptionListRoute, @@ -36,7 +38,7 @@ import { updateListRoute, } from '.'; -export const initRoutes = (router: IRouter): void => { +export const initRoutes = (router: IRouter, config: ConfigType): void => { // lists createListRoute(router); readListRoute(router); @@ -52,7 +54,7 @@ export const initRoutes = (router: IRouter): void => { deleteListItemRoute(router); patchListItemRoute(router); exportListItemRoute(router); - importListItemRoute(router); + importListItemRoute(router, config); findListItemRoute(router); // indexes of lists diff --git a/x-pack/plugins/lists/server/routes/utils/create_stream_from_buffer.ts b/x-pack/plugins/lists/server/routes/utils/create_stream_from_buffer.ts new file mode 100644 index 0000000000000..3dcf03617bcbc --- /dev/null +++ b/x-pack/plugins/lists/server/routes/utils/create_stream_from_buffer.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { Readable } from 'stream'; + +export const createStreamFromBuffer = (buffer: Buffer): Readable => { + const stream = new Readable(); + stream.push(buffer); + stream.push(null); + return stream; +}; diff --git a/x-pack/plugins/lists/server/services/items/buffer_lines.test.ts b/x-pack/plugins/lists/server/services/items/buffer_lines.test.ts index a283269271bd0..ad1511e28f80a 100644 --- a/x-pack/plugins/lists/server/services/items/buffer_lines.test.ts +++ b/x-pack/plugins/lists/server/services/items/buffer_lines.test.ts @@ -4,15 +4,44 @@ * you may not use this file except in compliance with the Elastic License. */ +import { IMPORT_BUFFER_SIZE } from '../../../common/constants.mock'; + import { BufferLines } from './buffer_lines'; import { TestReadable } from './test_readable.mock'; describe('buffer_lines', () => { + test('it will throw if given a buffer size of zero', () => { + expect(() => { + new BufferLines({ bufferSize: 0, input: new TestReadable() }); + }).toThrow('bufferSize must be greater than zero'); + }); + + test('it will throw if given a buffer size of -1', () => { + expect(() => { + new BufferLines({ bufferSize: -1, input: new TestReadable() }); + }).toThrow('bufferSize must be greater than zero'); + }); + test('it can read a single line', (done) => { const input = new TestReadable(); input.push('line one\n'); input.push(null); - const bufferedLine = new BufferLines({ input }); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); + let linesToTest: string[] = []; + bufferedLine.on('lines', (lines: string[]) => { + linesToTest = [...linesToTest, ...lines]; + }); + bufferedLine.on('close', () => { + expect(linesToTest).toEqual(['line one']); + done(); + }); + }); + + test('it can read a single line using a buffer size of 1', (done) => { + const input = new TestReadable(); + input.push('line one\n'); + input.push(null); + const bufferedLine = new BufferLines({ bufferSize: 1, input }); let linesToTest: string[] = []; bufferedLine.on('lines', (lines: string[]) => { linesToTest = [...linesToTest, ...lines]; @@ -28,7 +57,23 @@ describe('buffer_lines', () => { input.push('line one\n'); input.push('line two\n'); input.push(null); - const bufferedLine = new BufferLines({ input }); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); + let linesToTest: string[] = []; + bufferedLine.on('lines', (lines: string[]) => { + linesToTest = [...linesToTest, ...lines]; + }); + bufferedLine.on('close', () => { + expect(linesToTest).toEqual(['line one', 'line two']); + done(); + }); + }); + + test('it can read two lines using a buffer size of 1', (done) => { + const input = new TestReadable(); + input.push('line one\n'); + input.push('line two\n'); + input.push(null); + const bufferedLine = new BufferLines({ bufferSize: 1, input }); let linesToTest: string[] = []; bufferedLine.on('lines', (lines: string[]) => { linesToTest = [...linesToTest, ...lines]; @@ -44,7 +89,7 @@ describe('buffer_lines', () => { input.push('line one\n'); input.push('line one\n'); input.push(null); - const bufferedLine = new BufferLines({ input }); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); let linesToTest: string[] = []; bufferedLine.on('lines', (lines: string[]) => { linesToTest = [...linesToTest, ...lines]; @@ -58,7 +103,7 @@ describe('buffer_lines', () => { test('it can close out without writing any lines', (done) => { const input = new TestReadable(); input.push(null); - const bufferedLine = new BufferLines({ input }); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); let linesToTest: string[] = []; bufferedLine.on('lines', (lines: string[]) => { linesToTest = [...linesToTest, ...lines]; @@ -71,7 +116,7 @@ describe('buffer_lines', () => { test('it can read 200 lines', (done) => { const input = new TestReadable(); - const bufferedLine = new BufferLines({ input }); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); let linesToTest: string[] = []; const size200: string[] = new Array(200).fill(null).map((_, index) => `${index}\n`); size200.forEach((element) => input.push(element)); @@ -84,4 +129,66 @@ describe('buffer_lines', () => { done(); }); }); + + test('it can read an example multi-part message', (done) => { + const input = new TestReadable(); + input.push('--boundary\n'); + input.push('Content-type: text/plain\n'); + input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n'); + input.push('\n'); + input.push('127.0.0.1\n'); + input.push('127.0.0.2\n'); + input.push('127.0.0.3\n'); + input.push('\n'); + input.push('--boundary--\n'); + input.push(null); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); + let linesToTest: string[] = []; + bufferedLine.on('lines', (lines: string[]) => { + linesToTest = [...linesToTest, ...lines]; + }); + bufferedLine.on('close', () => { + expect(linesToTest).toEqual(['127.0.0.1', '127.0.0.2', '127.0.0.3']); + done(); + }); + }); + + test('it can read an empty multi-part message', (done) => { + const input = new TestReadable(); + input.push('--boundary\n'); + input.push('Content-type: text/plain\n'); + input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n'); + input.push('\n'); + input.push('\n'); + input.push('--boundary--\n'); + input.push(null); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); + let linesToTest: string[] = []; + bufferedLine.on('lines', (lines: string[]) => { + linesToTest = [...linesToTest, ...lines]; + }); + bufferedLine.on('close', () => { + expect(linesToTest).toEqual([]); + done(); + }); + }); + + test('it can read a fileName from a multipart message', (done) => { + const input = new TestReadable(); + input.push('--boundary\n'); + input.push('Content-type: text/plain\n'); + input.push('Content-Disposition: form-data; name="fieldName"; filename="filename.text"\n'); + input.push('\n'); + input.push('--boundary--\n'); + input.push(null); + const bufferedLine = new BufferLines({ bufferSize: IMPORT_BUFFER_SIZE, input }); + let fileNameToTest: string; + bufferedLine.on('fileName', (fileName: string) => { + fileNameToTest = fileName; + }); + bufferedLine.on('close', () => { + expect(fileNameToTest).toEqual('filename.text'); + done(); + }); + }); }); diff --git a/x-pack/plugins/lists/server/services/items/buffer_lines.ts b/x-pack/plugins/lists/server/services/items/buffer_lines.ts index 4ff84268f5e0c..dc257eadb7438 100644 --- a/x-pack/plugins/lists/server/services/items/buffer_lines.ts +++ b/x-pack/plugins/lists/server/services/items/buffer_lines.ts @@ -7,18 +7,50 @@ import readLine from 'readline'; import { Readable } from 'stream'; -const BUFFER_SIZE = 100; - export class BufferLines extends Readable { private set = new Set(); - constructor({ input }: { input: NodeJS.ReadableStream }) { + private boundary: string | null = null; + private readableText: boolean = false; + private paused: boolean = false; + private bufferSize: number; + constructor({ input, bufferSize }: { input: NodeJS.ReadableStream; bufferSize: number }) { super({ encoding: 'utf-8' }); + if (bufferSize <= 0) { + throw new RangeError('bufferSize must be greater than zero'); + } + this.bufferSize = bufferSize; + const readline = readLine.createInterface({ input, }); + // We are parsing multipart/form-data involving boundaries as fast as we can to get + // * The filename if it exists and emit it + // * The actual content within the multipart/form-data readline.on('line', (line) => { - this.push(line); + if (this.boundary == null && line.startsWith('--')) { + this.boundary = `${line}--`; + } else if (this.boundary != null && !this.readableText && line.trim() !== '') { + if (line.startsWith('Content-Disposition')) { + const fileNameMatch = RegExp('filename="(?.+)"'); + const matches = fileNameMatch.exec(line); + if (matches?.groups?.fileName != null) { + this.emit('fileName', matches.groups.fileName); + } + } + } else if (this.boundary != null && !this.readableText && line.trim() === '') { + // we are ready to be readable text now for parsing + this.readableText = true; + } else if (this.readableText && line.trim() === '') { + // skip and do nothing as this is either a empty line or an upcoming end is about to happen + } else if (this.boundary != null && this.readableText && line === this.boundary) { + // we are at the end of the stream + this.boundary = null; + this.readableText = false; + } else { + // we have actual content to push + this.push(line); + } }); readline.on('close', () => { @@ -26,23 +58,54 @@ export class BufferLines extends Readable { }); } - public _read(): void { - // No operation but this is required to be implemented + public _read(): void {} + + public pause(): this { + this.paused = true; + return this; } - public push(line: string | null): boolean { - if (line == null) { - this.emit('lines', Array.from(this.set)); - this.set.clear(); - this.emit('close'); - return true; + public resume(): this { + this.paused = false; + return this; + } + + private emptyBuffer(): void { + const arrayFromSet = Array.from(this.set); + if (arrayFromSet.length === 0) { + this.emit('lines', []); } else { + while (arrayFromSet.length) { + const spliced = arrayFromSet.splice(0, this.bufferSize); + this.emit('lines', spliced); + } + } + this.set.clear(); + } + + public push(line: string | null): boolean { + if (line != null) { this.set.add(line); - if (this.set.size > BUFFER_SIZE) { - this.emit('lines', Array.from(this.set)); - this.set.clear(); + if (this.paused) { + return false; + } else { + if (this.set.size > this.bufferSize) { + this.emptyBuffer(); + } return true; + } + } else { + if (this.paused) { + // If we paused but have buffered all of the available data + // we should do wait for 10(ms) and check again if we are paused + // or not. + setTimeout(() => { + this.push(line); + }, 10); + return false; } else { + this.emptyBuffer(); + this.emit('close'); return true; } } diff --git a/x-pack/plugins/lists/server/services/items/create_list_items_bulk.ts b/x-pack/plugins/lists/server/services/items/create_list_items_bulk.ts index 447c0f6bf95cc..463b9735b2578 100644 --- a/x-pack/plugins/lists/server/services/items/create_list_items_bulk.ts +++ b/x-pack/plugins/lists/server/services/items/create_list_items_bulk.ts @@ -80,9 +80,12 @@ export const createListItemsBulk = async ({ }, [] ); - - await callCluster('bulk', { - body, - index: listItemIndex, - }); + try { + await callCluster('bulk', { + body, + index: listItemIndex, + }); + } catch (error) { + // TODO: Log out the error with return values from the bulk insert into another index or saved object + } }; diff --git a/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.mock.ts b/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.mock.ts index b7e30e0a1c308..d868351fc4b33 100644 --- a/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.mock.ts +++ b/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.mock.ts @@ -5,14 +5,24 @@ */ import { getCallClusterMock } from '../../../common/get_call_cluster.mock'; import { ImportListItemsToStreamOptions, WriteBufferToItemsOptions } from '../items'; -import { LIST_ID, LIST_ITEM_INDEX, META, TYPE, USER } from '../../../common/constants.mock'; +import { + LIST_ID, + LIST_INDEX, + LIST_ITEM_INDEX, + META, + TYPE, + USER, +} from '../../../common/constants.mock'; +import { getConfigMockDecoded } from '../../config.mock'; import { TestReadable } from './test_readable.mock'; export const getImportListItemsToStreamOptionsMock = (): ImportListItemsToStreamOptions => ({ callCluster: getCallClusterMock(), + config: getConfigMockDecoded(), deserializer: undefined, listId: LIST_ID, + listIndex: LIST_INDEX, listItemIndex: LIST_ITEM_INDEX, meta: META, serializer: undefined, diff --git a/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.ts b/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.ts index 31b2b74c88431..2bffe338e9075 100644 --- a/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.ts +++ b/x-pack/plugins/lists/server/services/items/write_lines_to_bulk_list_items.ts @@ -8,20 +8,26 @@ import { Readable } from 'stream'; import { LegacyAPICaller } from 'kibana/server'; +import { createListIfItDoesNotExist } from '../lists/create_list_if_it_does_not_exist'; import { DeserializerOrUndefined, + ListIdOrUndefined, + ListSchema, MetaOrUndefined, SerializerOrUndefined, Type, } from '../../../common/schemas'; +import { ConfigType } from '../../config'; import { BufferLines } from './buffer_lines'; import { createListItemsBulk } from './create_list_items_bulk'; export interface ImportListItemsToStreamOptions { + listId: ListIdOrUndefined; + config: ConfigType; + listIndex: string; deserializer: DeserializerOrUndefined; serializer: SerializerOrUndefined; - listId: string; stream: Readable; callCluster: LegacyAPICaller; listItemIndex: string; @@ -31,34 +37,72 @@ export interface ImportListItemsToStreamOptions { } export const importListItemsToStream = ({ + config, deserializer, serializer, listId, stream, callCluster, listItemIndex, + listIndex, type, user, meta, -}: ImportListItemsToStreamOptions): Promise => { - return new Promise((resolve) => { - const readBuffer = new BufferLines({ input: stream }); +}: ImportListItemsToStreamOptions): Promise => { + return new Promise((resolve) => { + const readBuffer = new BufferLines({ bufferSize: config.importBufferSize, input: stream }); + let fileName: string | undefined; + let list: ListSchema | null = null; + readBuffer.on('fileName', async (fileNameEmitted: string) => { + readBuffer.pause(); + fileName = fileNameEmitted; + if (listId == null) { + list = await createListIfItDoesNotExist({ + callCluster, + description: `File uploaded from file system of ${fileNameEmitted}`, + deserializer, + id: fileNameEmitted, + listIndex, + meta, + name: fileNameEmitted, + serializer, + type, + user, + }); + } + readBuffer.resume(); + }); + readBuffer.on('lines', async (lines: string[]) => { - await writeBufferToItems({ - buffer: lines, - callCluster, - deserializer, - listId, - listItemIndex, - meta, - serializer, - type, - user, - }); + if (listId != null) { + await writeBufferToItems({ + buffer: lines, + callCluster, + deserializer, + listId, + listItemIndex, + meta, + serializer, + type, + user, + }); + } else if (fileName != null) { + await writeBufferToItems({ + buffer: lines, + callCluster, + deserializer, + listId: fileName, + listItemIndex, + meta, + serializer, + type, + user, + }); + } }); readBuffer.on('close', () => { - resolve(); + resolve(list); }); }); }; diff --git a/x-pack/plugins/lists/server/services/lists/create_list_if_it_does_not_exist.ts b/x-pack/plugins/lists/server/services/lists/create_list_if_it_does_not_exist.ts new file mode 100644 index 0000000000000..84f5ac0308191 --- /dev/null +++ b/x-pack/plugins/lists/server/services/lists/create_list_if_it_does_not_exist.ts @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { LegacyAPICaller } from 'kibana/server'; + +import { + Description, + DeserializerOrUndefined, + Id, + ListSchema, + MetaOrUndefined, + Name, + SerializerOrUndefined, + Type, +} from '../../../common/schemas'; + +import { getList } from './get_list'; +import { createList } from './create_list'; + +export interface CreateListIfItDoesNotExistOptions { + id: Id; + type: Type; + name: Name; + deserializer: DeserializerOrUndefined; + serializer: SerializerOrUndefined; + description: Description; + callCluster: LegacyAPICaller; + listIndex: string; + user: string; + meta: MetaOrUndefined; + dateNow?: string; + tieBreaker?: string; +} + +export const createListIfItDoesNotExist = async ({ + id, + name, + type, + description, + deserializer, + callCluster, + listIndex, + user, + meta, + serializer, + dateNow, + tieBreaker, +}: CreateListIfItDoesNotExistOptions): Promise => { + const list = await getList({ callCluster, id, listIndex }); + if (list == null) { + return createList({ + callCluster, + dateNow, + description, + deserializer, + id, + listIndex, + meta, + name, + serializer, + tieBreaker, + type, + user, + }); + } else { + return list; + } +}; diff --git a/x-pack/plugins/lists/server/services/lists/list_client.mock.ts b/x-pack/plugins/lists/server/services/lists/list_client.mock.ts index 43a01a3ca62dc..e5036d561ddc6 100644 --- a/x-pack/plugins/lists/server/services/lists/list_client.mock.ts +++ b/x-pack/plugins/lists/server/services/lists/list_client.mock.ts @@ -9,7 +9,12 @@ import { getFoundListSchemaMock } from '../../../common/schemas/response/found_l import { getListItemResponseMock } from '../../../common/schemas/response/list_item_schema.mock'; import { getListResponseMock } from '../../../common/schemas/response/list_schema.mock'; import { getCallClusterMock } from '../../../common/get_call_cluster.mock'; -import { LIST_INDEX, LIST_ITEM_INDEX } from '../../../common/constants.mock'; +import { + IMPORT_BUFFER_SIZE, + LIST_INDEX, + LIST_ITEM_INDEX, + MAX_IMPORT_PAYLOAD_BYTES, +} from '../../../common/constants.mock'; import { ListClient } from './list_client'; @@ -59,8 +64,10 @@ export const getListClientMock = (): ListClient => { callCluster: getCallClusterMock(), config: { enabled: true, + importBufferSize: IMPORT_BUFFER_SIZE, listIndex: LIST_INDEX, listItemIndex: LIST_ITEM_INDEX, + maxImportPayloadBytes: MAX_IMPORT_PAYLOAD_BYTES, }, spaceId: 'default', user: 'elastic', diff --git a/x-pack/plugins/lists/server/services/lists/list_client.ts b/x-pack/plugins/lists/server/services/lists/list_client.ts index be9da1a1c69f5..4acc2e7092491 100644 --- a/x-pack/plugins/lists/server/services/lists/list_client.ts +++ b/x-pack/plugins/lists/server/services/lists/list_client.ts @@ -70,6 +70,7 @@ import { UpdateListItemOptions, UpdateListOptions, } from './list_client_types'; +import { createListIfItDoesNotExist } from './create_list_if_it_does_not_exist'; export class ListClient { private readonly spaceId: string; @@ -140,12 +141,20 @@ export class ListClient { type, meta, }: CreateListIfItDoesNotExistOptions): Promise => { - const list = await this.getList({ id }); - if (list == null) { - return this.createList({ description, deserializer, id, meta, name, serializer, type }); - } else { - return list; - } + const { callCluster, user } = this; + const listIndex = this.getListIndex(); + return createListIfItDoesNotExist({ + callCluster, + description, + deserializer, + id, + listIndex, + meta, + name, + serializer, + type, + user, + }); }; public getListIndexExists = async (): Promise => { @@ -325,13 +334,16 @@ export class ListClient { listId, stream, meta, - }: ImportListItemsToStreamOptions): Promise => { - const { callCluster, user } = this; + }: ImportListItemsToStreamOptions): Promise => { + const { callCluster, user, config } = this; const listItemIndex = this.getListItemIndex(); + const listIndex = this.getListIndex(); return importListItemsToStream({ callCluster, + config, deserializer, listId, + listIndex, listItemIndex, meta, serializer, diff --git a/x-pack/plugins/lists/server/services/lists/list_client_types.ts b/x-pack/plugins/lists/server/services/lists/list_client_types.ts index 26e147a6fa130..68a018fa2fc16 100644 --- a/x-pack/plugins/lists/server/services/lists/list_client_types.ts +++ b/x-pack/plugins/lists/server/services/lists/list_client_types.ts @@ -16,6 +16,7 @@ import { Id, IdOrUndefined, ListId, + ListIdOrUndefined, MetaOrUndefined, Name, NameOrUndefined, @@ -86,9 +87,9 @@ export interface ExportListItemsToStreamOptions { } export interface ImportListItemsToStreamOptions { + listId: ListIdOrUndefined; deserializer: DeserializerOrUndefined; serializer: SerializerOrUndefined; - listId: string; type: Type; stream: Readable; meta: MetaOrUndefined;