diff --git a/x-pack/plugins/reporting/server/lib/content_stream.test.ts b/x-pack/plugins/reporting/server/lib/content_stream.test.ts index 288d528c722bf..d4f179c5c9359 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.test.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.test.ts @@ -280,9 +280,7 @@ describe('ContentStream', () => { }); it('should split raw data into chunks', async () => { - client.cluster.getSettings.mockResponseOnce( - set({}, 'defaults.http.max_content_length', 1028) - ); + stream.chunkSize = 2; stream.end('123456'); await new Promise((resolve) => stream.once('finish', resolve)); @@ -322,9 +320,7 @@ describe('ContentStream', () => { }); it('should encode every chunk separately', async () => { - client.cluster.getSettings.mockResponseOnce( - set({}, 'defaults.http.max_content_length', 1028) - ); + base64Stream.chunkSize = 3; base64Stream.end('12345678'); await new Promise((resolve) => base64Stream.once('finish', resolve)); diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 0be61705d84b5..17362516d2c9d 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -5,22 +5,15 @@ * 2.0. */ -import { defaults, get } from 'lodash'; import { Duplex } from 'stream'; import { v4 as uuidv4 } from 'uuid'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import { ByteSizeValue } from '@kbn/config-schema'; import type { ElasticsearchClient, Logger } from '@kbn/core/server'; import type { ReportSource } from '@kbn/reporting-common/types'; import type { ReportingCore } from '..'; -/** - * @note The Elasticsearch `http.max_content_length` is including the whole POST body. - * But the update/index request also contains JSON-serialized query parameters. - * 1Kb span should be enough for that. - */ -const REQUEST_SPAN_SIZE_IN_BYTES = 1024; +const ONE_MB = 1024 * 1024; type Callback = (error?: Error) => void; type SearchRequest = estypes.SearchRequest; @@ -52,21 +45,6 @@ interface ContentStreamParameters { } export class ContentStream extends Duplex { - /** - * @see https://en.wikipedia.org/wiki/Base64#Output_padding - */ - private static getMaxBase64EncodedSize(max: number) { - return Math.floor(max / 4) * 3; - } - - /** - * @note Raw data might be escaped during JSON serialization. - * In the worst-case, every character is escaped, so the max raw data length is twice less. - */ - private static getMaxJsonEscapedSize(max: number) { - return Math.floor(max / 2); - } - private buffers: Buffer[] = []; private bytesBuffered = 0; @@ -74,7 +52,6 @@ export class ContentStream extends Duplex { private chunksRead = 0; private chunksWritten = 0; private jobSize?: number; - private maxChunkSize?: number; private parameters: Required; private primaryTerm?: number; private seqNo?: number; @@ -85,6 +62,14 @@ export class ContentStream extends Duplex { */ bytesWritten = 0; + /** + * The chunking size of reporting files. Larger CSV files will be split into + * multiple documents, where the stream is chunked into pieces of approximately + * this size. The actual document size will be slightly larger due to Base64 + * encoding and JSON metadata. + */ + chunkSize = 4 * ONE_MB; + constructor( private client: ElasticsearchClient, private logger: Logger, @@ -103,30 +88,6 @@ export class ContentStream extends Duplex { return buffer.toString(this.parameters.encoding === 'base64' ? 'base64' : undefined); } - private async getMaxContentSize() { - const body = await this.client.cluster.getSettings({ include_defaults: true }); - const { persistent, transient, defaults: defaultSettings } = body; - const settings = defaults({}, persistent, transient, defaultSettings); - const maxContentSize = get(settings, 'http.max_content_length', '100mb'); - - return ByteSizeValue.parse(maxContentSize).getValueInBytes(); - } - - private async getMaxChunkSize() { - if (!this.maxChunkSize) { - const maxContentSize = (await this.getMaxContentSize()) - REQUEST_SPAN_SIZE_IN_BYTES; - - this.maxChunkSize = - this.parameters.encoding === 'base64' - ? ContentStream.getMaxBase64EncodedSize(maxContentSize) - : ContentStream.getMaxJsonEscapedSize(maxContentSize); - - this.logger.debug(`Chunk size is ${this.maxChunkSize} bytes.`); - } - - return this.maxChunkSize; - } - private async readHead() { const { id, index } = this.document; const body: SearchRequest['body'] = { @@ -306,10 +267,8 @@ export class ContentStream extends Duplex { } private async flushAllFullChunks() { - const maxChunkSize = await this.getMaxChunkSize(); - - while (this.bytesBuffered >= maxChunkSize && this.buffers.length) { - await this.flush(maxChunkSize); + while (this.bytesBuffered >= this.chunkSize && this.buffers.length) { + await this.flush(this.chunkSize); } }