forked from elastic/kibana
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Dataset Quality ] Apply chunking strategy for data stream stats retr…
…ieval (elastic#194816) ## 📓 Summary Closes elastic#192169 This work fixes the issue with some requests hitting the too-long HTTP line once we combine all the dataset names into a single request. We had a suggested strategy from the work done with elastic#171735 , but it presented a couple of problems. - The HTTP line length issue occurs for an exceeding length of the request URL, which goes over 4096 bytes (4096 characters.) This also includes the whole URL protocol, domain, path and any other parameters, so assuming that we have 4096 characters for the `index` parameter is incorrect, as we would exceed the maximum anyway in a worst-case scenario, where we have a chunk of 16 values with length 255 chars. - Always chunking the requests in groups of 16 items might not be optimal in the most common scenario where we have short data stream patterns. I opted to adopt a different chunking strategy that optimizes each chunk so that we reduce the requests triggered on the cluster to a minimum. I'll leave more notes in the code to help with the review. --------- Co-authored-by: Marco Antonio Ghiani <[email protected]>
- Loading branch information
1 parent
3a3f130
commit bff69e2
Showing
6 changed files
with
137 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
...k/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { reduceAsyncChunks } from './reduce_async_chunks'; | ||
|
||
describe('reduceAsyncChunks', () => { | ||
const spyChunkExecutor = jest | ||
.fn() | ||
.mockImplementation((chunk: string[]) => | ||
Promise.resolve(chunk.map((str) => str.toUpperCase())) | ||
); | ||
|
||
afterEach(() => { | ||
spyChunkExecutor.mockClear(); | ||
}); | ||
|
||
it('should run a iterator mapping callback on each chunk and merge the result', async () => { | ||
const input = Array(20).fill('logs-dataset-default'); | ||
const expected = Array(20).fill('LOGS-DATASET-DEFAULT'); | ||
|
||
const res = await reduceAsyncChunks(input, spyChunkExecutor); | ||
|
||
expect(res).toEqual(expected); | ||
expect(spyChunkExecutor).toHaveBeenCalledTimes(1); | ||
}); | ||
|
||
it('should create chunks where the total strings length does not exceed the allowed maximum', async () => { | ||
const input = Array(1000).fill('logs-dataset-default'); // 20k chars => 20k/3072 => Expected 7 chunks | ||
const expected = Array(1000).fill('LOGS-DATASET-DEFAULT'); | ||
const expectedChunks = 7; | ||
|
||
const res = await reduceAsyncChunks(input, spyChunkExecutor); | ||
|
||
expect(res).toEqual(expected); | ||
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks); | ||
}); | ||
|
||
it('should maximize the chunks length the chunks count', async () => { | ||
const input = [ | ||
...Array(1000).fill('logs-dataset_30letters-default'), | ||
...Array(1000).fill('logs-dataset-default'), | ||
]; // 30k chars + 20k chars + ~2k commas => 52k/3072 => Expected 17 chunks | ||
const expected = [ | ||
...Array(1000).fill('LOGS-DATASET_30LETTERS-DEFAULT'), | ||
...Array(1000).fill('LOGS-DATASET-DEFAULT'), | ||
]; | ||
const expectedChunks = 17; | ||
|
||
const res = await reduceAsyncChunks(input, spyChunkExecutor); | ||
|
||
expect(res).toEqual(expected); | ||
expect(spyChunkExecutor).toHaveBeenCalledTimes(expectedChunks); | ||
}); | ||
}); |
59 changes: 59 additions & 0 deletions
59
x-pack/plugins/observability_solution/dataset_quality/server/utils/reduce_async_chunks.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
import { Observable, OperatorFunction, from, lastValueFrom, mergeMap, reduce } from 'rxjs'; | ||
import deepmerge from 'deepmerge'; | ||
|
||
type CallbackFn<TResult> = (chunk: string[], id: number) => Promise<TResult>; | ||
|
||
const MAX_HTTP_LINE_LENGTH = 4096; | ||
// Apply an 80% threshold to the http line max length to guarantee enough space for url and potentially other parameters. | ||
// This value might need to vary as it's an estimate of how much we can reserve for the chunked list length. | ||
const MAX_CHUNK_LENGTH = MAX_HTTP_LINE_LENGTH * 0.75; // 4096 *0.75 === 3072 characters, as 1 chars = 1 byte | ||
|
||
export const reduceAsyncChunks = <TResult>(list: string[], chunkExecutor: CallbackFn<TResult>) => { | ||
const result$ = from(list).pipe( | ||
bufferUntil(isLessThanMaxChunkLength), | ||
mergeMap((chunk, id) => from(chunkExecutor(chunk, id))), | ||
reduce((result, chunkResult) => deepmerge(result, chunkResult)) | ||
); | ||
|
||
return lastValueFrom(result$); | ||
}; | ||
|
||
/** | ||
* Support functions for reduceAsyncChunks | ||
*/ | ||
const bufferUntil = <TItem>( | ||
predicate: (chunk: TItem[], currentItem: TItem) => boolean | ||
): OperatorFunction<TItem, TItem[]> => { | ||
return (source) => | ||
new Observable((observer) => { | ||
let chunk: TItem[] = []; | ||
|
||
return source.subscribe({ | ||
next(currentItem) { | ||
if (predicate(chunk, currentItem)) { | ||
chunk.push(currentItem); | ||
} else { | ||
// Emit the current chunk, start a new one | ||
if (chunk.length > 0) observer.next(chunk); | ||
chunk = [currentItem]; // Reset the chunk with the current item | ||
} | ||
}, | ||
complete() { | ||
// Emit the final chunk if it has any items | ||
if (chunk.length > 0) observer.next(chunk); | ||
observer.complete(); | ||
}, | ||
}); | ||
}); | ||
}; | ||
|
||
const isLessThanMaxChunkLength = (chunk: string[], currentItem: string) => { | ||
const totalLength = [...chunk, currentItem].join().length; | ||
return totalLength <= MAX_CHUNK_LENGTH; // Allow the chunk until it exceeds the max chunk length | ||
}; |