diff --git a/packages/kbn-logging-mocks/src/logger.mock.ts b/packages/kbn-logging-mocks/src/logger.mock.ts index dd3303dda9410..90436fb4516d2 100644 --- a/packages/kbn-logging-mocks/src/logger.mock.ts +++ b/packages/kbn-logging-mocks/src/logger.mock.ts @@ -24,9 +24,10 @@ const createLoggerMock = (context: string[] = []) => { isLevelEnabled: jest.fn(), }; mockLog.get.mockImplementation((...ctx) => ({ - ctx, ...mockLog, + context: Array.isArray(context) ? context.concat(ctx) : [context, ...ctx].filter(Boolean), })); + mockLog.isLevelEnabled.mockReturnValue(true); return mockLog; diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts b/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts index d94b2c78c5885..947d9eecd8fd0 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts @@ -7,19 +7,22 @@ */ import { Readable } from 'stream'; +import { encode } from 'cbor-x'; import { promisify } from 'util'; import { loggingSystemMock } from '@kbn/core-logging-server-mocks'; import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; import { Semaphore } from '@kbn/std'; +import { errors } from '@elastic/elasticsearch'; +import type { GetResponse } from '@elastic/elasticsearch/lib/api/types'; import { ElasticsearchBlobStorageClient } from './es'; -import { errors } from '@elastic/elasticsearch'; const setImmediate = promisify(global.setImmediate); describe('ElasticsearchBlobStorageClient', () => { let esClient: ReturnType; - let semaphore: Semaphore; + let uploadSemaphore: Semaphore; + let downloadSemaphore: Semaphore; let logger: ReturnType; // Exposed `clearCache()` which resets the cache for the memoized `createIndexIfNotExists()` method @@ -38,20 +41,24 @@ describe('ElasticsearchBlobStorageClient', () => { index, undefined, logger, - semaphore, + uploadSemaphore, + downloadSemaphore, indexIsAlias ); }; beforeEach(() => { - semaphore = new Semaphore(1); + uploadSemaphore = new Semaphore(1); + downloadSemaphore = new Semaphore(1); logger = loggingSystemMock.createLogger(); esClient = elasticsearchServiceMock.createElasticsearchClient(); + + jest.clearAllMocks(); }); test('limits max concurrent uploads', async () => { const blobStoreClient = createBlobStoreClient(); - const acquireSpy = jest.spyOn(semaphore, 'acquire'); + const uploadAcquireSpy = jest.spyOn(uploadSemaphore, 'acquire'); esClient.index.mockImplementation(() => { return new Promise((res, rej) => setTimeout(() => rej('failed'), 100)); }); @@ -62,7 +69,7 @@ describe('ElasticsearchBlobStorageClient', () => { blobStoreClient.upload(Readable.from(['test'])).catch(() => {}), ]; await setImmediate(); - expect(acquireSpy).toHaveBeenCalledTimes(4); + expect(uploadAcquireSpy).toHaveBeenCalledTimes(4); await p1; expect(esClient.index).toHaveBeenCalledTimes(1); await p2; @@ -71,6 +78,96 @@ describe('ElasticsearchBlobStorageClient', () => { expect(esClient.index).toHaveBeenCalledTimes(4); }); + test('limits max concurrent downloads', async () => { + const index = 'someplace'; + + const blobStoreClient = createBlobStoreClient(index); + const downloadAcquireSpy = jest.spyOn(downloadSemaphore, 'acquire'); + + const downloadsToQueueCount = 4; + const documentsChunkCount = 2; + + const createDownloadContent = (documentId: number, chunkId: number) => { + return Buffer.concat([ + Buffer.from(`download content ${documentId}.${chunkId}`, 'utf8'), + Buffer.alloc(10 * 1028, `chunk ${chunkId}`), + ]); + }; + + const downloadContentMap = Array.from(new Array(downloadsToQueueCount)).map( + (_, documentIdx) => ({ + fileContent: Array.from(new Array(documentsChunkCount)).map((__, chunkIdx) => + createDownloadContent(documentIdx, chunkIdx) + ), + }) + ); + + esClient.get.mockImplementation(({ id: headChunkId }) => { + const [documentId, chunkId] = headChunkId.split(/\./); + + return new Promise(function (resolve) { + setTimeout( + () => + resolve( + Readable.from([ + encode({ + found: true, + _source: { + data: downloadContentMap[Number(documentId)].fileContent[Number(chunkId)], + }, + }), + ]) as unknown as GetResponse + ), + 100 + ); + }); + }); + + const getDownloadStreamContent = async (stream: Readable) => { + const chunks: Buffer[] = []; + + for await (const chunk of stream) { + chunks.push(chunk); + } + + /** + * we are guaranteed that the chunks for the complete document + * will equal the document chunk count specified within this test suite. + * See {@link ContentStream#isRead} + */ + expect(chunks.length).toBe(documentsChunkCount); + + return Buffer.concat(chunks).toString(); + }; + + const [p1, p2, ...rest] = downloadContentMap.map(({ fileContent }, idx) => { + // expected document size will be our returned mock file content + // will be the sum of the lengths of chunks the entire document is split into + const documentSize = fileContent.reduce((total, chunk) => total + chunk.length, 0); + + return blobStoreClient.download({ + id: String(idx), + size: documentSize, + }); + }); + + await setImmediate(); + expect(downloadAcquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount); + + const p1DownloadStream = await p1; + const p1DownloadContent = await getDownloadStreamContent(p1DownloadStream); + expect(esClient.get).toHaveBeenCalledTimes(1 * documentsChunkCount); + expect(p1DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s0.*/)); + + const p2DownloadStream = await p2; + const p2DownloadContent = await getDownloadStreamContent(p2DownloadStream); + expect(esClient.get).toHaveBeenCalledTimes(2 * documentsChunkCount); + expect(p2DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s1.*/)); + + await Promise.all(rest.map((dp) => dp.then((ds) => getDownloadStreamContent(ds)))); + expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount * documentsChunkCount); + }); + describe('.createIndexIfNotExists()', () => { let data: Readable; diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/es.ts b/src/plugins/files/server/blob_storage_service/adapters/es/es.ts index a08d220b1c8e2..5650d9ae29684 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/es.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/es.ts @@ -13,7 +13,7 @@ import { Semaphore } from '@kbn/std'; import { Readable, Transform } from 'stream'; import { pipeline } from 'stream/promises'; import { promisify } from 'util'; -import { lastValueFrom, defer } from 'rxjs'; +import { lastValueFrom, defer, firstValueFrom } from 'rxjs'; import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools'; import { memoize } from 'lodash'; import { FilesPlugin } from '../../../plugin'; @@ -38,14 +38,21 @@ interface UploadOptions { } export class ElasticsearchBlobStorageClient implements BlobStorageClient { - private static defaultSemaphore: Semaphore; + private static defaultUploadSemaphore: Semaphore; + private static defaultDownloadSemaphore: Semaphore; /** - * Call this function once to globally set a concurrent upload limit for + * Call this function once to globally set the concurrent transfer (upload/download) limit for * all {@link ElasticsearchBlobStorageClient} instances. */ - public static configureConcurrentUpload(capacity: number) { - this.defaultSemaphore = new Semaphore(capacity); + public static configureConcurrentTransfers(capacity: number | [number, number]) { + if (Array.isArray(capacity)) { + this.defaultUploadSemaphore = new Semaphore(capacity[0]); + this.defaultDownloadSemaphore = new Semaphore(capacity[1]); + } else { + this.defaultUploadSemaphore = new Semaphore(capacity); + this.defaultDownloadSemaphore = new Semaphore(capacity); + } } constructor( @@ -57,11 +64,23 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient { * Override the default concurrent upload limit by passing in a different * semaphore */ - private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultSemaphore, + private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultUploadSemaphore, + /** + * Override the default concurrent download limit by passing in a different + * semaphore + */ + private readonly downloadSemaphore = ElasticsearchBlobStorageClient.defaultDownloadSemaphore, /** Indicates that the index provided is an alias (changes how content is retrieved internally) */ private readonly indexIsAlias: boolean = false ) { - assert(this.uploadSemaphore, `No default semaphore provided and no semaphore was passed in.`); + assert( + this.uploadSemaphore, + `No default semaphore provided and no semaphore was passed in for uploads.` + ); + assert( + this.downloadSemaphore, + `No default semaphore provided and no semaphore was passed in for downloads.` + ); } /** @@ -187,7 +206,11 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient { // right after uploading it, we refresh the index before downloading the file. await this.esClient.indices.refresh({ index: this.index }); - return this.getReadableContentStream(id, size); + return firstValueFrom( + defer(() => Promise.resolve(this.getReadableContentStream(id, size))).pipe( + this.downloadSemaphore.acquire() + ) + ); } public async delete(id: string): Promise { diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/integration_tests/es.test.ts b/src/plugins/files/server/blob_storage_service/adapters/es/integration_tests/es.test.ts index 1e6b357cbf874..981810e968f65 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/integration_tests/es.test.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/integration_tests/es.test.ts @@ -25,7 +25,7 @@ describe('Elasticsearch blob storage', () => { let esRefreshIndexSpy: jest.SpyInstance; beforeAll(async () => { - ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity); + ElasticsearchBlobStorageClient.configureConcurrentTransfers(Infinity); const { startES, startKibana } = createTestServers({ adjustTimeout: jest.setTimeout }); manageES = await startES(); manageKbn = await startKibana(); diff --git a/src/plugins/files/server/blob_storage_service/blob_storage_service.ts b/src/plugins/files/server/blob_storage_service/blob_storage_service.ts index 0cc4ba81997b8..3667957ba25d8 100644 --- a/src/plugins/files/server/blob_storage_service/blob_storage_service.ts +++ b/src/plugins/files/server/blob_storage_service/blob_storage_service.ts @@ -22,8 +22,16 @@ export class BlobStorageService { */ private readonly concurrentUploadsToES = 20; + /** + * The number of downloads per Kibana instance that can be running simultaneously + */ + private readonly concurrentDownloadsFromES = 5; + constructor(private readonly esClient: ElasticsearchClient, private readonly logger: Logger) { - ElasticsearchBlobStorageClient.configureConcurrentUpload(this.concurrentUploadsToES); + ElasticsearchBlobStorageClient.configureConcurrentTransfers([ + this.concurrentUploadsToES, + this.concurrentDownloadsFromES, + ]); } private createESBlobStorage({ diff --git a/src/plugins/files/server/file_client/create_es_file_client.test.ts b/src/plugins/files/server/file_client/create_es_file_client.test.ts index 68589b334c8ea..1cf7343824ff3 100644 --- a/src/plugins/files/server/file_client/create_es_file_client.test.ts +++ b/src/plugins/files/server/file_client/create_es_file_client.test.ts @@ -23,7 +23,7 @@ describe('When initializing file client via createESFileClient()', () => { let logger: MockedLogger; beforeEach(() => { - ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity); + ElasticsearchBlobStorageClient.configureConcurrentTransfers(Infinity); esClient = elasticsearchServiceMock.createElasticsearchClient(); logger = loggingSystemMock.createLogger(); }); diff --git a/src/plugins/files/server/file_client/create_es_file_client.ts b/src/plugins/files/server/file_client/create_es_file_client.ts index 47b044618efc2..755071d66328c 100644 --- a/src/plugins/files/server/file_client/create_es_file_client.ts +++ b/src/plugins/files/server/file_client/create_es_file_client.ts @@ -78,6 +78,7 @@ export function createEsFileClient(arg: CreateEsFileClientArgs): FileClient { undefined, logger, undefined, + undefined, indexIsAlias ), undefined,