Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Files] Tests for createEsFileClient() usage when indexAsAlias option and minor refactor #153815

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { encode } from 'cbor-x';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from './content_stream';
import type { GetResponse } from '@elastic/elasticsearch/lib/api/types';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index';

describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createElasticsearchClient>;
Expand All @@ -30,8 +32,9 @@ describe('ContentStream', () => {
encoding: 'base64' as ContentStreamEncoding,
size: 1,
} as ContentStreamParameters,
indexIsAlias = false,
} = {}) => {
return new ContentStream(client, id, index, logger, params);
return new ContentStream(client, id, index, logger, params, indexIsAlias);
};

beforeEach(() => {
Expand All @@ -43,124 +46,193 @@ describe('ContentStream', () => {
});

describe('read', () => {
beforeEach(() => {
stream = getContentStream({ params: { size: 1 } });
});
describe('with `indexIsAlias` set to `true`', () => {
let searchResponse: estypes.SearchResponse<FileDocument<{}>>;

beforeEach(() => {
searchResponse = {
took: 3,
timed_out: false,
_shards: {
total: 2,
successful: 2,
skipped: 0,
failed: 0,
},
hits: {
total: {
value: 1,
relation: 'eq',
},
max_score: 0,
hits: [
{
_index: 'foo',
_id: '123',
_score: 1.0,
},
],
},
};

client.search.mockResolvedValue(searchResponse);
});

it('should perform a search using index and the document id', async () => {
await new Promise((resolve) => stream.once('data', resolve));
it('should use es.search() to find chunk index', async () => {
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });
const data = await new Promise((resolve) => stream.once('data', resolve));

expect(client.search).toHaveBeenCalledWith({
body: {
_source: false,
query: {
term: {
_id: 'something.0',
},
},
size: 1,
},
index: 'somewhere',
});
expect(data).toEqual(Buffer.from('some content'));
});

expect(client.get).toHaveBeenCalledTimes(1);
it('should throw if chunk is not found', async () => {
searchResponse.hits.hits = [];
stream = getContentStream({ params: { size: 1 }, indexIsAlias: true });

const [[request]] = client.get.mock.calls;
expect(request).toHaveProperty('index', 'somewhere');
expect(request).toHaveProperty('id', 'something.0');
});
const readPromise = new Promise((resolve, reject) => {
stream.once('data', resolve);
stream.once('error', reject);
});

it('should read the document contents', async () => {
const data = await new Promise((resolve) => stream.once('data', resolve));
expect(data).toEqual(Buffer.from('some content'));
await expect(readPromise).rejects.toHaveProperty(
'message',
'Unable to determine index for file chunk id [something.0] in index (alias) [somewhere]'
);
});
});

it('should be an empty stream on empty response', async () => {
client.get.mockResponseOnce(toReadable());
const onData = jest.fn();
describe('with `indexIsAlias` set to `false`', () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here down, everything is the same as before. All I did was wrap the test cases in a describe() whose title is more descriptive of that block of tests.

Here is a screen capture from my IDE:

image

beforeEach(() => {
stream = getContentStream({ params: { size: 1 } });
});

stream.on('data', onData);
await new Promise((resolve) => stream.once('end', resolve));
it('should perform a search using index and the document id', async () => {
await new Promise((resolve) => stream.once('data', resolve));

expect(onData).not.toHaveBeenCalled();
});
expect(client.get).toHaveBeenCalledTimes(1);

it('should emit an error event', async () => {
client.get.mockRejectedValueOnce('some error');
const [[request]] = client.get.mock.calls;
expect(request).toHaveProperty('index', 'somewhere');
expect(request).toHaveProperty('id', 'something.0');
});

stream.read();
const error = await new Promise((resolve) => stream.once('error', resolve));
it('should read the document contents', async () => {
const data = await new Promise((resolve) => stream.once('data', resolve));
expect(data).toEqual(Buffer.from('some content'));
});

expect(error).toBe('some error');
});
it('should be an empty stream on empty response', async () => {
client.get.mockResponseOnce(toReadable());
const onData = jest.fn();

it('should decode base64 encoded content', async () => {
client.get.mockResponseOnce(
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
);
const data = await new Promise((resolve) => stream.once('data', resolve));
stream.on('data', onData);
await new Promise((resolve) => stream.once('end', resolve));

expect(data).toEqual(Buffer.from('encoded content'));
});
expect(onData).not.toHaveBeenCalled();
});

it('should emit an error event', async () => {
client.get.mockRejectedValueOnce('some error');

it('should compound content from multiple chunks', async () => {
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream.read();
const error = await new Promise((resolve) => stream.once('error', resolve));

stream = getContentStream({
params: { size: 6 },
expect(error).toBe('some error');
});

let data = '';
for await (const chunk of stream) {
data += chunk;
}
it('should decode base64 encoded content', async () => {
client.get.mockResponseOnce(
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
);
const data = await new Promise((resolve) => stream.once('data', resolve));

expect(data).toEqual('123456');
expect(client.get).toHaveBeenCalledTimes(3);
expect(data).toEqual(Buffer.from('encoded content'));
});

const [[request1], [request2], [request3]] = client.get.mock.calls;
it('should compound content from multiple chunks', async () => {
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));

expect(request1).toHaveProperty('index', 'somewhere');
expect(request1).toHaveProperty('id', 'something.0');
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('id', 'something.1');
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('id', 'something.2');
});
stream = getContentStream({
params: { size: 6 },
});

it('should stop reading on empty chunk', async () => {
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});
let data = '';
for await (const chunk of stream) {
data += chunk;
}

it('should read while chunks are present when there is no size', async () => {
const [one, two] = ['12', '34'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable({ found: true }));
stream = getContentStream({ params: { size: undefined } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});
expect(data).toEqual('123456');
expect(client.get).toHaveBeenCalledTimes(3);

it('should decode every chunk separately', async () => {
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('123456');
const [[request1], [request2], [request3]] = client.get.mock.calls;

expect(request1).toHaveProperty('index', 'somewhere');
expect(request1).toHaveProperty('id', 'something.0');
expect(request2).toHaveProperty('index', 'somewhere');
expect(request2).toHaveProperty('id', 'something.1');
expect(request3).toHaveProperty('index', 'somewhere');
expect(request3).toHaveProperty('id', 'something.2');
});

it('should stop reading on empty chunk', async () => {
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});

it('should read while chunks are present when there is no size', async () => {
const [one, two] = ['12', '34'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable({ found: true }));
stream = getContentStream({ params: { size: undefined } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('1234');
expect(client.get).toHaveBeenCalledTimes(3);
});

it('should decode every chunk separately', async () => {
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
data += chunk;
}

expect(data).toEqual('123456');
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ export class ContentStream extends Duplex {
},
});

const docIndex = chunkDocMeta.hits.hits[0]._index;
const docIndex = chunkDocMeta.hits.hits?.[0]?._index;

if (!docIndex) {
throw new Error(
const err = new Error(
`Unable to determine index for file chunk id [${id}] in index (alias) [${this.index}]`
);

this.logger.error(err);
throw err;
}

return docIndex;
Expand Down
Loading