Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reporting] Limit report document chunk size to 4MB #181395

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions x-pack/plugins/reporting/server/lib/content_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ describe('ContentStream', () => {
});

it('should split raw data into chunks', async () => {
client.cluster.getSettings.mockResponseOnce(
set<any>({}, 'defaults.http.max_content_length', 1028)
);
stream.chunkSize = 2;
stream.end('123456');
await new Promise((resolve) => stream.once('finish', resolve));

Expand Down Expand Up @@ -322,9 +320,7 @@ describe('ContentStream', () => {
});

it('should encode every chunk separately', async () => {
client.cluster.getSettings.mockResponseOnce(
set<any>({}, 'defaults.http.max_content_length', 1028)
);
base64Stream.chunkSize = 3;
base64Stream.end('12345678');
await new Promise((resolve) => base64Stream.once('finish', resolve));

Expand Down
63 changes: 11 additions & 52 deletions x-pack/plugins/reporting/server/lib/content_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@
* 2.0.
*/

import { defaults, get } from 'lodash';
import { Duplex } from 'stream';
import { v4 as uuidv4 } from 'uuid';

import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ByteSizeValue } from '@kbn/config-schema';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { ReportSource } from '@kbn/reporting-common/types';
import type { ReportingCore } from '..';

/**
* @note The Elasticsearch `http.max_content_length` is including the whole POST body.
* But the update/index request also contains JSON-serialized query parameters.
* 1Kb span should be enough for that.
*/
const REQUEST_SPAN_SIZE_IN_BYTES = 1024;
const ONE_MB = 1024 * 1024;

type Callback = (error?: Error) => void;
type SearchRequest = estypes.SearchRequest;
Expand Down Expand Up @@ -52,29 +45,13 @@ interface ContentStreamParameters {
}

export class ContentStream extends Duplex {
/**
* @see https://en.wikipedia.org/wiki/Base64#Output_padding
*/
private static getMaxBase64EncodedSize(max: number) {
return Math.floor(max / 4) * 3;
}

/**
* @note Raw data might be escaped during JSON serialization.
* In the worst-case, every character is escaped, so the max raw data length is twice less.
*/
private static getMaxJsonEscapedSize(max: number) {
return Math.floor(max / 2);
}

private buffers: Buffer[] = [];
private bytesBuffered = 0;

private bytesRead = 0;
private chunksRead = 0;
private chunksWritten = 0;
private jobSize?: number;
private maxChunkSize?: number;
private parameters: Required<ContentStreamParameters>;
private primaryTerm?: number;
private seqNo?: number;
Expand All @@ -85,6 +62,14 @@ export class ContentStream extends Duplex {
*/
bytesWritten = 0;

/**
* The chunking size of reporting files. Larger CSV files will be split into
* multiple documents, where the stream is chunked into pieces of approximately
* this size. The actual document size will be slightly larger due to Base64
* encoding and JSON metadata.
*/
chunkSize = 4 * ONE_MB;

constructor(
private client: ElasticsearchClient,
private logger: Logger,
Expand All @@ -103,30 +88,6 @@ export class ContentStream extends Duplex {
return buffer.toString(this.parameters.encoding === 'base64' ? 'base64' : undefined);
}

private async getMaxContentSize() {
const body = await this.client.cluster.getSettings({ include_defaults: true });
const { persistent, transient, defaults: defaultSettings } = body;
const settings = defaults({}, persistent, transient, defaultSettings);
const maxContentSize = get(settings, 'http.max_content_length', '100mb');

return ByteSizeValue.parse(maxContentSize).getValueInBytes();
}

private async getMaxChunkSize() {
if (!this.maxChunkSize) {
const maxContentSize = (await this.getMaxContentSize()) - REQUEST_SPAN_SIZE_IN_BYTES;

this.maxChunkSize =
this.parameters.encoding === 'base64'
? ContentStream.getMaxBase64EncodedSize(maxContentSize)
: ContentStream.getMaxJsonEscapedSize(maxContentSize);

this.logger.debug(`Chunk size is ${this.maxChunkSize} bytes.`);
}

return this.maxChunkSize;
}

private async readHead() {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
Expand Down Expand Up @@ -306,10 +267,8 @@ export class ContentStream extends Duplex {
}

private async flushAllFullChunks() {
const maxChunkSize = await this.getMaxChunkSize();

while (this.bytesBuffered >= maxChunkSize && this.buffers.length) {
await this.flush(maxChunkSize);
while (this.bytesBuffered >= this.chunkSize && this.buffers.length) {
await this.flush(this.chunkSize);
}
}

Expand Down