Skip to content

Commit

Permalink
[Files] Tests for createEsFileClient() usage when indexAsAlias op…
Browse files Browse the repository at this point in the history
…tion and minor refactor (#153815)

## Summary

This PR builds on top of #153342
and:

- adds test for the `indexAsAlias` option that was added to
`createEsFileClient()`
- removes `fetchDoc()` utility (not needed)

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
paul-tavares and kibanamachine authored Mar 28, 2023
1 parent 2cb9628 commit 8c6e58c
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { encode } from 'cbor-x';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from './content_stream';
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index';

describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
Expand All @@ -30,8 +32,9 @@ describe('ContentStream', () => {
encoding: 'base64' as ContentStreamEncoding,
size: 1,
} as ContentStreamParameters,
indexIsAlias = false,
} = {}) => {
return new ContentStream(client, id, index, logger, params);
return new ContentStream(client, id, index, logger, params, indexIsAlias);
};

beforeEach(() => {
Expand All @@ -43,124 +46,193 @@ describe('ContentStream', () => {
});

describe('read', () => {
beforeEach(() => {
stream = getContentStream({ params: { size: 1 } });
});
describe('with `indexIsAlias` set to `true`', () => {
let searchResponse: estypes.SearchResponse<FileDocument<{}>>;

beforeEach(() => {
searchResponse = {
took: 3,
timed_out: false,
_shards: {
total: 2,
successful: 2,
skipped: 0,
failed: 0,
},
hits: {
total: {
value: 1,
relation: 'eq',
},
max_score: 0,
hits: [
{
_index: 'foo',
_id: '123',
_score: 1.0,
},
],
},
};

client.search.mockResolvedValue(searchResponse);
});

it('should perform a search using index and the document id', async () => {
await new Promise((resolve) => stream.once('data', resolve));
it('should use es.search() to find chunk index', async () => {
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });
const data = await new Promise((resolve) => stream.once('data', resolve));

expect(client.search).toHaveBeenCalledWith({
body: {
_source: false,
query: {
term: {
_id: 'something.0',
},
},
size: 1,
},
index: 'somewhere',
});
expect(data).toEqual(Buffer.from('some content'));
});

expect(client.get).toHaveBeenCalledTimes(1);
it('should throw if chunk is not found', async () => {
searchResponse.hits.hits = [];
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });

const [[request]] = client.get.mock.calls;
expect(request).toHaveProperty('index', 'somewhere');
expect(request).toHaveProperty('id', 'something.0');
});
const readPromise = new Promise((resolve, reject) => {
stream.once('data', resolve);
stream.once('error', reject);
});

it('should read the document contents', async () => {
const data = await new Promise((resolve) => stream.once('data', resolve));
expect(data).toEqual(Buffer.from('some content'));
await expect(readPromise).rejects.toHaveProperty(
'message',
'Unable to determine index for file chunk id [something.0] in index (alias) [somewhere]'
);
});
});

it('should be an empty stream on empty response', async () => {
client.get.mockResponseOnce(toReadable());
const onData = jest.fn();
describe('with `indexIsAlias` set to `false`', () => {
beforeEach(() => {
stream = getContentStream({ params: { size: 1 } });
});

stream.on('data', onData);
await new Promise((resolve) => stream.once('end', resolve));
it('should perform a search using index and the document id', async () => {
await new Promise((resolve) => stream.once('data', resolve));

expect(onData).not.toHaveBeenCalled();
});
expect(client.get).toHaveBeenCalledTimes(1);

it('should emit an error event', async () => {
client.get.mockRejectedValueOnce('some error');
const [[request]] = client.get.mock.calls;
expect(request).toHaveProperty('index', 'somewhere');
expect(request).toHaveProperty('id', 'something.0');
});

stream.read();
const error = await new Promise((resolve) => stream.once('error', resolve));
it('should read the document contents', async () => {
const data = await new Promise((resolve) => stream.once('data', resolve));
expect(data).toEqual(Buffer.from('some content'));
});

expect(error).toBe('some error');
});
it('should be an empty stream on empty response', async () => {
client.get.mockResponseOnce(toReadable());
const onData = jest.fn();

it('should decode base64 encoded content', async () => {
client.get.mockResponseOnce(
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
);
const data = await new Promise((resolve) => stream.once('data', resolve));
stream.on('data', onData);
await new Promise((resolve) => stream.once('end', resolve));

expect(data).toEqual(Buffer.from('encoded content'));
});
expect(onData).not.toHaveBeenCalled();
});

it('should emit an error event', async () => {
client.get.mockRejectedValueOnce('some error');

it('should compound content from multiple chunks', async () => {
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream.read();
const error = await new Promise((resolve) => stream.once('error', resolve));

stream = getContentStream({
params: { size: 6 },
expect(error).toBe('some error');
});

let data = '';
for await (const chunk of stream) {
data += chunk;
}
it('should decode base64 encoded content', async () => {
client.get.mockResponseOnce(
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
);
const data = await new Promise((resolve) => stream.once('data', resolve));

expect(data).toEqual('123456');
expect(client.get).toHaveBeenCalledTimes(3);
expect(data).toEqual(Buffer.from('encoded content'));
});

const [[request1], [request2], [request3]] = client.get.mock.calls;
it('should compound content from multiple chunks', async () => {
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));

expect(request1).toHaveProperty('index', 'somewhere');
expect(request1).toHaveProperty('id', 'something.0');
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('id', 'something.1');
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('id', 'something.2');
});
stream = getContentStream({
params: { size: 6 },
});

it('should stop reading on empty chunk', async () => {
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});
let data = '';
for await (const chunk of stream) {
data += chunk;
}

it('should read while chunks are present when there is no size', async () => {
const [one, two] = ['12', '34'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable({ found: true }));
stream = getContentStream({ params: { size: undefined } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});
expect(data).toEqual('123456');
expect(client.get).toHaveBeenCalledTimes(3);

it('should decode every chunk separately', async () => {
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('123456');
const [[request1], [request2], [request3]] = client.get.mock.calls;

expect(request1).toHaveProperty('index', 'somewhere');
expect(request1).toHaveProperty('id', 'something.0');
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('id', 'something.1');
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('id', 'something.2');
});

it('should stop reading on empty chunk', async () => {
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});

it('should read while chunks are present when there is no size', async () => {
const [one, two] = ['12', '34'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable({ found: true }));
stream = getContentStream({ params: { size: undefined } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});

it('should decode every chunk separately', async () => {
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('123456');
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ export class ContentStream extends Duplex {
},
});

const docIndex = chunkDocMeta.hits.hits[0]._index;
const docIndex = chunkDocMeta.hits.hits?.[0]?._index;

if (!docIndex) {
throw new Error(
const err = new Error(
`Unable to determine index for file chunk id [${id}] in index (alias) [${this.index}]`
);

this.logger.error(err);
throw err;
}

return docIndex;
Expand Down
Loading

0 comments on commit 8c6e58c

Please sign in to comment.