Skip to content

Commit

Permalink
[Security Solution + Files + Fleet] Add option to Files client to han…
Browse files Browse the repository at this point in the history
…dle index alias and fix Endpoint/Fleet usage to set new option to true (elastic#153342)

## Summary

- Adds `indexIsAlias` to `Files` plugin client. Used when provided
indexes are Aliases (changes how the documents are retrieved internally)
- Changes security solution (endpoint) file service to use `.search()`
instead of `.get()` when retrieving a file metadata via `id`
- Changed Security Solution call to `createEsFileClient()` (`Files`
plugin service) to set `indexIsAlias` to `true`
- Changed Fleet call to `createEsFileClient()` (`Files` plugin service)
to set `indexIsAlias` to `true`

Addresses the following Issues that were raised for 8.7:

- Fixes elastic#153322
- FIxes elastic#153334

(cherry picked from commit 50cc574)

# Conflicts:
#	src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts
#	src/plugins/files/server/file_client/create_es_file_client.ts
  • Loading branch information
paul-tavares committed Mar 21, 2023
1 parent ee4b913 commit e10734a
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ describe('ContentStream', () => {
beforeEach(() => {
client = elasticsearchServiceMock.createClusterClient().asInternalUser;
logger = loggingSystemMock.createLogger();
client.get.mockResponse(toReadable(set({}, '_source.data', Buffer.from('some content'))));
client.get.mockResponse(
toReadable(set({ found: true }, '_source.data', Buffer.from('some content')))
);
});

describe('read', () => {
Expand Down Expand Up @@ -81,7 +83,7 @@ describe('ContentStream', () => {

it('should decode base64 encoded content', async () => {
client.get.mockResponseOnce(
toReadable(set({}, '_source.data', Buffer.from('encoded content')))
toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content')))
);
const data = await new Promise((resolve) => stream.once('data', resolve));

Expand All @@ -90,9 +92,9 @@ describe('ContentStream', () => {

it('should compound content from multiple chunks', async () => {
const [one, two, three] = ['12', '34', '56'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({}, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));

stream = getContentStream({
params: { size: 6 },
Expand All @@ -118,9 +120,9 @@ describe('ContentStream', () => {

it('should stop reading on empty chunk', async () => {
const [one, two, three] = ['12', '34', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({}, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
Expand All @@ -133,9 +135,9 @@ describe('ContentStream', () => {

it('should read while chunks are present when there is no size', async () => {
const [one, two] = ['12', '34'].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({}, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', two)));
client.get.mockResponseOnce(toReadable({}));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable({ found: true }));
stream = getContentStream({ params: { size: undefined } });
let data = '';
for await (const chunk of stream) {
Expand All @@ -148,10 +150,10 @@ describe('ContentStream', () => {

it('should decode every chunk separately', async () => {
const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from);
client.get.mockResponseOnce(toReadable(set({}, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({}, '_source.data', four)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three)));
client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four)));
stream = getContentStream({ params: { size: 12 } });
let data = '';
for await (const chunk of stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { ByteSizeValue } from '@kbn/config-schema';
import { defaults } from 'lodash';
import { Duplex, Writable, Readable } from 'stream';

import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { inspect } from 'util';
import type { FileChunkDocument } from '../mappings';

type Callback = (error?: Error) => void;
Expand Down Expand Up @@ -61,7 +63,8 @@ export class ContentStream extends Duplex {
private id: undefined | string,
private readonly index: string,
private readonly logger: Logger,
parameters: ContentStreamParameters = {}
parameters: ContentStreamParameters = {},
private readonly indexIsAlias: boolean = false
) {
super();
this.parameters = defaults(parameters, {
Expand All @@ -84,19 +87,45 @@ export class ContentStream extends Duplex {
return this.maxChunkSize;
}

private async getChunkRealIndex(id: string): Promise<string> {
const chunkDocMeta = await this.client.search({
index: this.index,
body: {
size: 1,
query: {
term: {
_id: id,
},
},
_source: false, // suppress the document content
},
});

const docIndex = chunkDocMeta.hits.hits[0]._index;

if (!docIndex) {
throw new Error(
`Unable to determine index for file chunk id [${id}] in index (alias) [${this.index}]`
);
}

return docIndex;
}

private async readChunk(): Promise<[data: null | Buffer, last?: boolean]> {
if (!this.id) {
throw new Error('No document ID provided. Cannot read chunk.');
}
const id = this.getChunkId(this.chunksRead);
const chunkIndex = this.indexIsAlias ? await this.getChunkRealIndex(id) : this.index;

this.logger.debug(`Reading chunk #${this.chunksRead}.`);
this.logger.debug(`Reading chunk #${this.chunksRead} from index [${chunkIndex}]`);

try {
const stream = await this.client.get(
{
id,
index: this.index,
index: chunkIndex,
_source_includes: ['data', 'last'],
},
{
Expand All @@ -110,10 +139,22 @@ export class ContentStream extends Duplex {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
const source: undefined | FileChunkDocument = buffer.byteLength
? cborx.decode(Buffer.concat(chunks))?._source
const decodedChunkDoc: GetResponse<FileChunkDocument> | undefined = buffer.byteLength
? (cborx.decode(buffer) as GetResponse<FileChunkDocument>)
: undefined;

// Because `asStream` was used in retrieving the document, errors are also not be processed
// and thus are returned "as is", so we check to see if an ES error occurred while attempting
// to retrieve the chunk.
if (decodedChunkDoc && ('error' in decodedChunkDoc || !decodedChunkDoc.found)) {
const err = new Error(`Failed to retrieve chunk id [${id}] from index [${chunkIndex}]`);
this.logger.error(err);
this.logger.error(inspect(decodedChunkDoc, { depth: 5 }));
throw err;
}

const source: undefined | FileChunkDocument = decodedChunkDoc?._source;

const dataBuffer = source?.data as unknown as Buffer;
return [dataBuffer?.byteLength ? dataBuffer : null, source?.last];
} catch (e) {
Expand Down Expand Up @@ -361,10 +402,19 @@ export interface ContentStreamArgs {
*/
logger: Logger;
parameters?: ContentStreamParameters;
/** indicates the index provided is an alias (changes how the content is retrieved internally) */
indexIsAlias?: boolean;
}

function getContentStream({ client, id, index, logger, parameters }: ContentStreamArgs) {
return new ContentStream(client, id, index, logger, parameters);
function getContentStream({
client,
id,
index,
logger,
parameters,
indexIsAlias = false,
}: ContentStreamArgs) {
return new ContentStream(client, id, index, logger, parameters, indexIsAlias);
}

export type WritableContentStream = Writable &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
* Override the default concurrent upload limit by passing in a different
* semaphore
*/
private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultSemaphore
private readonly uploadSemaphore = ElasticsearchBlobStorageClient.defaultSemaphore,
/** Indicates that the index provided is an alias (changes how content is retrieved internally) */
private readonly indexIsAlias: boolean = false
) {
assert(this.uploadSemaphore, `No default semaphore provided and no semaphore was passed in.`);
}
Expand Down Expand Up @@ -166,6 +168,7 @@ export class ElasticsearchBlobStorageClient implements BlobStorageClient {
parameters: {
size,
},
indexIsAlias: this.indexIsAlias,
});
}

Expand Down
29 changes: 25 additions & 4 deletions src/plugins/files/server/file_client/create_es_file_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ export interface CreateEsFileClientArgs {
*/
elasticsearchClient: ElasticsearchClient;
/**
* The maximum file size to be write
* Treat the indices provided as Aliases. If set to true, ES `search()` will be used to
* retrieve the file info and content instead of `get()`. This is needed to ensure the
* content can be retrieved in cases where an index may have rolled over (ES `get()`
* needs a "real" index)
*/
indexIsAlias?: boolean;
/**
* The maximum file size to be written.
*/
maxSizeBytes?: number;
/**
Expand All @@ -49,15 +56,29 @@ export interface CreateEsFileClientArgs {
* @param arg - See {@link CreateEsFileClientArgs}
*/
export function createEsFileClient(arg: CreateEsFileClientArgs): FileClient {
const { blobStorageIndex, elasticsearchClient, logger, metadataIndex, maxSizeBytes } = arg;
const {
blobStorageIndex,
elasticsearchClient,
logger,
metadataIndex,
maxSizeBytes,
indexIsAlias,
} = arg;
return new FileClientImpl(
{
id: NO_FILE_KIND,
http: {},
maxSizeBytes,
},
new EsIndexFilesMetadataClient(metadataIndex, elasticsearchClient, logger),
new ElasticsearchBlobStorageClient(elasticsearchClient, blobStorageIndex, undefined, logger),
new EsIndexFilesMetadataClient(metadataIndex, elasticsearchClient, logger, indexIsAlias),
new ElasticsearchBlobStorageClient(
elasticsearchClient,
blobStorageIndex,
undefined,
logger,
undefined,
indexIsAlias
),
undefined,
undefined,
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Logger } from '@kbn/core/server';
import { toElasticsearchQuery } from '@kbn/es-query';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { MappingProperty, SearchTotalHits } from '@elastic/elasticsearch/lib/api/types';
import { fetchDoc } from '../../utils';
import type { FilesMetrics, FileMetadata, Pagination } from '../../../../common';
import type { FindFileArgs } from '../../../file_service';
import type {
Expand Down Expand Up @@ -43,7 +44,8 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
constructor(
private readonly index: string,
private readonly esClient: ElasticsearchClient,
private readonly logger: Logger
private readonly logger: Logger,
private readonly indexIsAlias: boolean = false
) {}

private createIfNotExists = once(async () => {
Expand Down Expand Up @@ -80,10 +82,8 @@ export class EsIndexFilesMetadataClient<M = unknown> implements FileMetadataClie
}

async get({ id }: GetArg): Promise<FileDescriptor<M>> {
const { _source: doc } = await this.esClient.get<FileDocument<M>>({
index: this.index,
id,
});
const { _source: doc } =
(await fetchDoc<FileDocument<M>>(this.esClient, this.index, id, this.indexIsAlias)) ?? {};

if (!doc) {
this.logger.error(`File with id "${id}" not found`);
Expand Down
30 changes: 30 additions & 0 deletions src/plugins/files/server/file_client/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* Side Public License, v 1.
*/

import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { FileMetadata } from '../../common';

export function createDefaultFileAttributes(): Pick<
Expand All @@ -19,3 +21,31 @@ export function createDefaultFileAttributes(): Pick<
Updated: dateString,
};
}

export const fetchDoc = async <TDocument = unknown>(
esClient: ElasticsearchClient,
index: string,
docId: string,
indexIsAlias: boolean = false
): Promise<GetResponse<TDocument> | undefined> => {
if (indexIsAlias) {
const fileDocSearchResult = await esClient.search<TDocument>({
index,
body: {
size: 1,
query: {
term: {
_id: docId,
},
},
},
});

return fileDocSearchResult.hits.hits[0] as GetResponse<TDocument>;
}

return esClient.get<TDocument>({
index,
id: docId,
});
};
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/server/services/agents/uploads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export async function getAgentUploadFile(
metadataIndex: FILE_STORAGE_METADATA_AGENT_INDEX,
elasticsearchClient: esClient,
logger: appContextService.getLogger(),
indexIsAlias: true,
});

const file = await fileClient.get({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,29 @@ export const sendEndpointActionResponse = async (
// Index the file content (just one chunk)
// call to `.index()` copied from File plugin here:
// https://github.com/elastic/kibana/blob/main/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts#L195
await esClient.index(
{
index: FILE_STORAGE_DATA_INDEX,
id: `${fileMeta._id}.0`,
document: cborx.encode({
bid: fileMeta._id,
last: true,
data: Buffer.from(
'UEsDBAoACQAAAFZeRFWpAsDLHwAAABMAAAAMABwAYmFkX2ZpbGUudHh0VVQJAANTVjxjU1Y8Y3V4CwABBPUBAAAEFAAAAMOcoyEq/Q4VyG02U9O0LRbGlwP/y5SOCfRKqLz1rsBQSwcIqQLAyx8AAAATAAAAUEsBAh4DCgAJAAAAVl5EVakCwMsfAAAAEwAAAAwAGAAAAAAAAQAAAKSBAAAAAGJhZF9maWxlLnR4dFVUBQADU1Y8Y3V4CwABBPUBAAAEFAAAAFBLBQYAAAAAAQABAFIAAAB1AAAAAAA=',
'base64'
),
}),
refresh: 'wait_for',
},
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
await esClient
.index(
{
index: FILE_STORAGE_DATA_INDEX,
id: `${fileMeta._id}.0`,
document: cborx.encode({
bid: fileMeta._id,
last: true,
data: Buffer.from(
'UEsDBAoACQAAAFZeRFWpAsDLHwAAABMAAAAMABwAYmFkX2ZpbGUudHh0VVQJAANTVjxjU1Y8Y3V4CwABBPUBAAAEFAAAAMOcoyEq/Q4VyG02U9O0LRbGlwP/y5SOCfRKqLz1rsBQSwcIqQLAyx8AAAATAAAAUEsBAh4DCgAJAAAAVl5EVakCwMsfAAAAEwAAAAwAGAAAAAAAAQAAAKSBAAAAAGJhZF9maWxlLnR4dFVUBQADU1Y8Y3V4CwABBPUBAAAEFAAAAFBLBQYAAAAAAQABAFIAAAB1AAAAAAA=',
'base64'
),
}),
refresh: 'wait_for',
},
}
);
{
headers: {
'content-type': 'application/cbor',
accept: 'application/json',
},
}
)
.then(() => sleep(2000));
}

return endpointResponse;
Expand Down
Loading

0 comments on commit e10734a

Please sign in to comment.