Skip to content

Commit

Permalink
[Reporting] Limit report document chunk size to 4MB (#181395)
Browse files Browse the repository at this point in the history
## Summary

Closes #180829



### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios


### For maintainers

- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

---------

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
vadimkibana and kibanamachine authored Apr 24, 2024
1 parent 70c574d commit 15e5598
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 58 deletions.
8 changes: 2 additions & 6 deletions x-pack/plugins/reporting/server/lib/content_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ describe('ContentStream', () => {
});

it('should split raw data into chunks', async () => {
client.cluster.getSettings.mockResponseOnce(
set<any>({}, 'defaults.http.max_content_length', 1028)
);
stream.chunkSize = 2;
stream.end('123456');
await new Promise((resolve) => stream.once('finish', resolve));

Expand Down Expand Up @@ -322,9 +320,7 @@ describe('ContentStream', () => {
});

it('should encode every chunk separately', async () => {
client.cluster.getSettings.mockResponseOnce(
set<any>({}, 'defaults.http.max_content_length', 1028)
);
base64Stream.chunkSize = 3;
base64Stream.end('12345678');
await new Promise((resolve) => base64Stream.once('finish', resolve));

Expand Down
63 changes: 11 additions & 52 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@
* 2.0.
*/

import { defaults, get } from 'lodash';
import { Duplex } from 'stream';
import { v4 as uuidv4 } from 'uuid';

import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ByteSizeValue } from '@kbn/config-schema';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { ReportSource } from '@kbn/reporting-common/types';
import type { ReportingCore } from '..';

/**
* @note The Elasticsearch `http.max_content_length` is including the whole POST body.
* But the update/index request also contains JSON-serialized query parameters.
* 1Kb span should be enough for that.
*/
const REQUEST_SPAN_SIZE_IN_BYTES = 1024;
const ONE_MB = 1024 * 1024;

type Callback = (error?: Error) => void;
type SearchRequest = estypes.SearchRequest;
Expand Down Expand Up @@ -52,29 +45,13 @@ interface ContentStreamParameters {
}

export class ContentStream extends Duplex {
/**
* @see https://en.wikipedia.org/wiki/Base64#Output_padding
*/
private static getMaxBase64EncodedSize(max: number) {
return Math.floor(max / 4) * 3;
}

/**
* @note Raw data might be escaped during JSON serialization.
* In the worst-case, every character is escaped, so the max raw data length is twice less.
*/
private static getMaxJsonEscapedSize(max: number) {
return Math.floor(max / 2);
}

private buffers: Buffer[] = [];
private bytesBuffered = 0;

private bytesRead = 0;
private chunksRead = 0;
private chunksWritten = 0;
private jobSize?: number;
private maxChunkSize?: number;
private parameters: Required<ContentStreamParameters>;
private primaryTerm?: number;
private seqNo?: number;
Expand All @@ -85,6 +62,14 @@ export class ContentStream extends Duplex {
*/
bytesWritten = 0;

/**
* The chunking size of reporting files. Larger CSV files will be split into
* multiple documents, where the stream is chunked into pieces of approximately
* this size. The actual document size will be slightly larger due to Base64
* encoding and JSON metadata.
*/
chunkSize = 4 * ONE_MB;

constructor(
private client: ElasticsearchClient,
private logger: Logger,
Expand All @@ -103,30 +88,6 @@ export class ContentStream extends Duplex {
return buffer.toString(this.parameters.encoding === 'base64' ? 'base64' : undefined);
}

private async getMaxContentSize() {
const body = await this.client.cluster.getSettings({ include_defaults: true });
const { persistent, transient, defaults: defaultSettings } = body;
const settings = defaults({}, persistent, transient, defaultSettings);
const maxContentSize = get(settings, 'http.max_content_length', '100mb');

return ByteSizeValue.parse(maxContentSize).getValueInBytes();
}

private async getMaxChunkSize() {
if (!this.maxChunkSize) {
const maxContentSize = (await this.getMaxContentSize()) - REQUEST_SPAN_SIZE_IN_BYTES;

this.maxChunkSize =
this.parameters.encoding === 'base64'
? ContentStream.getMaxBase64EncodedSize(maxContentSize)
: ContentStream.getMaxJsonEscapedSize(maxContentSize);

this.logger.debug(`Chunk size is ${this.maxChunkSize} bytes.`);
}

return this.maxChunkSize;
}

private async readHead() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
Expand Down Expand Up @@ -306,10 +267,8 @@ export class ContentStream extends Duplex {
}

private async flushAllFullChunks() {
const maxChunkSize = await this.getMaxChunkSize();

while (this.bytesBuffered >= maxChunkSize && this.buffers.length) {
await this.flush(maxChunkSize);
while (this.bytesBuffered >= this.chunkSize && this.buffers.length) {
await this.flush(this.chunkSize);
}
}

Expand Down

0 comments on commit 15e5598

Please sign in to comment.