diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts b/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts index 47c113fed6c88..947d9eecd8fd0 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/es.test.ts @@ -82,14 +82,29 @@ describe('ElasticsearchBlobStorageClient', () => { const index = 'someplace'; const blobStoreClient = createBlobStoreClient(index); - const downloadAccquireSpy = jest.spyOn(downloadSemaphore, 'acquire'); + const downloadAcquireSpy = jest.spyOn(downloadSemaphore, 'acquire'); const downloadsToQueueCount = 4; + const documentsChunkCount = 2; + + const createDownloadContent = (documentId: number, chunkId: number) => { + return Buffer.concat([ + Buffer.from(`download content ${documentId}.${chunkId}`, 'utf8'), + Buffer.alloc(10 * 1028, `chunk ${chunkId}`), + ]); + }; + + const downloadContentMap = Array.from(new Array(downloadsToQueueCount)).map( + (_, documentIdx) => ({ + fileContent: Array.from(new Array(documentsChunkCount)).map((__, chunkIdx) => + createDownloadContent(documentIdx, chunkIdx) + ), + }) + ); - const createDownloadContent = (headChunkId: string) => - Buffer.from(`download content ${headChunkId}`, 'utf8'); + esClient.get.mockImplementation(({ id: headChunkId }) => { + const [documentId, chunkId] = headChunkId.split(/\./); - esClient.get.mockImplementation(({ id }) => { return new Promise(function (resolve) { setTimeout( () => @@ -98,7 +113,7 @@ describe('ElasticsearchBlobStorageClient', () => { encode({ found: true, _source: { - data: createDownloadContent(id), + data: downloadContentMap[Number(documentId)].fileContent[Number(chunkId)], }, }), ]) as unknown as GetResponse @@ -108,31 +123,49 @@ describe('ElasticsearchBlobStorageClient', () => { }); }); - const [p1, p2, ...rest] = Array.from(new Array(downloadsToQueueCount)).map((_, idx) => - blobStoreClient.download({ id: String(idx), size: 1 }).then(async (stream) => { - const chunks = []; + const getDownloadStreamContent = async (stream: Readable) => { + const chunks: Buffer[] = []; - for await (const chunk of stream) { - chunks.push(chunk); - } + for await (const chunk of stream) { + chunks.push(chunk); + } - return Buffer.from(chunks).toString(); - }) - ); + /** + * we are guaranteed that the chunks for the complete document + * will equal the document chunk count specified within this test suite. + * See {@link ContentStream#isRead} + */ + expect(chunks.length).toBe(documentsChunkCount); + + return Buffer.concat(chunks).toString(); + }; + + const [p1, p2, ...rest] = downloadContentMap.map(({ fileContent }, idx) => { + // expected document size will be our returned mock file content + // will be the sum of the lengths of chunks the entire document is split into + const documentSize = fileContent.reduce((total, chunk) => total + chunk.length, 0); + + return blobStoreClient.download({ + id: String(idx), + size: documentSize, + }); + }); await setImmediate(); - expect(downloadAccquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount); + expect(downloadAcquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount); - expect(esClient.get).toHaveBeenCalledTimes(1); - const p1DownloadContent = await p1; - expect(p1DownloadContent).toEqual(expect.stringContaining('download content 0')); + const p1DownloadStream = await p1; + const p1DownloadContent = await getDownloadStreamContent(p1DownloadStream); + expect(esClient.get).toHaveBeenCalledTimes(1 * documentsChunkCount); + expect(p1DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s0.*/)); - expect(esClient.get).toHaveBeenCalledTimes(2); - const p2DownloadContent = await p2; - expect(p2DownloadContent).toEqual(expect.stringContaining('download content 1')); + const p2DownloadStream = await p2; + const p2DownloadContent = await getDownloadStreamContent(p2DownloadStream); + expect(esClient.get).toHaveBeenCalledTimes(2 * documentsChunkCount); + expect(p2DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s1.*/)); - await Promise.all(rest); - expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount); + await Promise.all(rest.map((dp) => dp.then((ds) => getDownloadStreamContent(ds)))); + expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount * documentsChunkCount); }); describe('.createIndexIfNotExists()', () => { diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/es.ts b/src/plugins/files/server/blob_storage_service/adapters/es/es.ts index a3643eb9ee767..5650d9ae29684 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/es.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/es.ts @@ -13,7 +13,7 @@ import { Semaphore } from '@kbn/std'; import { Readable, Transform } from 'stream'; import { pipeline } from 'stream/promises'; import { promisify } from 'util'; -import { lastValueFrom, defer } from 'rxjs'; +import { lastValueFrom, defer, firstValueFrom } from 'rxjs'; import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools'; import { memoize } from 'lodash'; import { FilesPlugin } from '../../../plugin'; @@ -188,8 +188,8 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient { return lastValueFrom(defer(processUpload).pipe(this.uploadSemaphore.acquire())); } - private getReadableContentStream(id: string, size?: number): () => ReadableContentStream { - return getReadableContentStream.bind(this, { + private getReadableContentStream(id: string, size?: number): ReadableContentStream { + return getReadableContentStream({ id, client: this.esClient, index: this.index, @@ -206,8 +206,10 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient { // right after uploading it, we refresh the index before downloading the file. await this.esClient.indices.refresh({ index: this.index }); - return lastValueFrom( - defer(this.getReadableContentStream(id, size)).pipe(this.downloadSemaphore.acquire()) + return firstValueFrom( + defer(() => Promise.resolve(this.getReadableContentStream(id, size))).pipe( + this.downloadSemaphore.acquire() + ) ); }