Skip to content

Commit

Permalink
change approach for returning readable stream
Browse files Browse the repository at this point in the history
simplify approach to return content stream
  • Loading branch information
eokoneyo committed Oct 30, 2023
1 parent 304f5a7 commit 80943f5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,29 @@ describe('ElasticsearchBlobStorageClient', () => {
const index = 'someplace';

const blobStoreClient = createBlobStoreClient(index);
const downloadAccquireSpy = jest.spyOn(downloadSemaphore, 'acquire');
const downloadAcquireSpy = jest.spyOn(downloadSemaphore, 'acquire');

const downloadsToQueueCount = 4;
const documentsChunkCount = 2;

const createDownloadContent = (documentId: number, chunkId: number) => {
return Buffer.concat([
Buffer.from(`download content ${documentId}.${chunkId}`, 'utf8'),
Buffer.alloc(10 * 1028, `chunk ${chunkId}`),
]);
};

const downloadContentMap = Array.from(new Array(downloadsToQueueCount)).map(
(_, documentIdx) => ({
fileContent: Array.from(new Array(documentsChunkCount)).map((__, chunkIdx) =>
createDownloadContent(documentIdx, chunkIdx)
),
})
);

const createDownloadContent = (headChunkId: string) =>
Buffer.from(`download content ${headChunkId}`, 'utf8');
esClient.get.mockImplementation(({ id: headChunkId }) => {
const [documentId, chunkId] = headChunkId.split(/\./);

esClient.get.mockImplementation(({ id }) => {
return new Promise(function (resolve) {
setTimeout(
() =>
Expand All @@ -98,7 +113,7 @@ describe('ElasticsearchBlobStorageClient', () => {
encode({
found: true,
_source: {
data: createDownloadContent(id),
data: downloadContentMap[Number(documentId)].fileContent[Number(chunkId)],
},
}),
]) as unknown as GetResponse
Expand All @@ -108,31 +123,49 @@ describe('ElasticsearchBlobStorageClient', () => {
});
});

const [p1, p2, ...rest] = Array.from(new Array(downloadsToQueueCount)).map((_, idx) =>
blobStoreClient.download({ id: String(idx), size: 1 }).then(async (stream) => {
const chunks = [];
const getDownloadStreamContent = async (stream: Readable) => {
const chunks: Buffer[] = [];

for await (const chunk of stream) {
chunks.push(chunk);
}
for await (const chunk of stream) {
chunks.push(chunk);
}

return Buffer.from(chunks).toString();
})
);
/**
* we are guaranteed that the chunks for the complete document
* will equal the document chunk count specified within this test suite.
* See {@link ContentStream#isRead}
*/
expect(chunks.length).toBe(documentsChunkCount);

return Buffer.concat(chunks).toString();
};

const [p1, p2, ...rest] = downloadContentMap.map(({ fileContent }, idx) => {
// expected document size will be our returned mock file content
// will be the sum of the lengths of chunks the entire document is split into
const documentSize = fileContent.reduce((total, chunk) => total + chunk.length, 0);

return blobStoreClient.download({
id: String(idx),
size: documentSize,
});
});

await setImmediate();
expect(downloadAccquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount);
expect(downloadAcquireSpy).toHaveBeenCalledTimes(downloadsToQueueCount);

expect(esClient.get).toHaveBeenCalledTimes(1);
const p1DownloadContent = await p1;
expect(p1DownloadContent).toEqual(expect.stringContaining('download content 0'));
const p1DownloadStream = await p1;
const p1DownloadContent = await getDownloadStreamContent(p1DownloadStream);
expect(esClient.get).toHaveBeenCalledTimes(1 * documentsChunkCount);
expect(p1DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s0.*/));

expect(esClient.get).toHaveBeenCalledTimes(2);
const p2DownloadContent = await p2;
expect(p2DownloadContent).toEqual(expect.stringContaining('download content 1'));
const p2DownloadStream = await p2;
const p2DownloadContent = await getDownloadStreamContent(p2DownloadStream);
expect(esClient.get).toHaveBeenCalledTimes(2 * documentsChunkCount);
expect(p2DownloadContent).toEqual(expect.stringMatching(/^download\scontent\s1.*/));

await Promise.all(rest);
expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount);
await Promise.all(rest.map((dp) => dp.then((ds) => getDownloadStreamContent(ds))));
expect(esClient.get).toHaveBeenCalledTimes(downloadsToQueueCount * documentsChunkCount);
});

describe('.createIndexIfNotExists()', () => {
Expand Down
12 changes: 7 additions & 5 deletions src/plugins/files/server/blob_storage_service/adapters/es/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Semaphore } from '@kbn/std';
import { Readable, Transform } from 'stream';
import { pipeline } from 'stream/promises';
import { promisify } from 'util';
import { lastValueFrom, defer } from 'rxjs';
import { lastValueFrom, defer, firstValueFrom } from 'rxjs';
import { PerformanceMetricEvent, reportPerformanceMetricEvent } from '@kbn/ebt-tools';
import { memoize } from 'lodash';
import { FilesPlugin } from '../../../plugin';
Expand Down Expand Up @@ -188,8 +188,8 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
return lastValueFrom(defer(processUpload).pipe(this.uploadSemaphore.acquire()));
}

private getReadableContentStream(id: string, size?: number): () => ReadableContentStream {
return getReadableContentStream.bind(this, {
private getReadableContentStream(id: string, size?: number): ReadableContentStream {
return getReadableContentStream({
id,
client: this.esClient,
index: this.index,
Expand All @@ -206,8 +206,10 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
// right after uploading it, we refresh the index before downloading the file.
await this.esClient.indices.refresh({ index: this.index });

return lastValueFrom(
defer(this.getReadableContentStream(id, size)).pipe(this.downloadSemaphore.acquire())
return firstValueFrom(
defer(() => Promise.resolve(this.getReadableContentStream(id, size))).pipe(
this.downloadSemaphore.acquire()
)
);
}

Expand Down

0 comments on commit 80943f5

Please sign in to comment.