Skip to content

Commit

Permalink
[Logs UI] Logs overview queries for the observability dashboard (#70413)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alejandro Fernández authored Jul 3, 2020
1 parent bbda3f9 commit f3573f3
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 69 deletions.
7 changes: 7 additions & 0 deletions x-pack/plugins/infra/common/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export const DEFAULT_SOURCE_ID = 'default';
268 changes: 199 additions & 69 deletions x-pack/plugins/infra/public/utils/logs_overview_fetchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,90 +4,220 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { InfraClientCoreSetup } from '../types';
import { LogsFetchDataResponse } from '../../../observability/public';
import { encode } from 'rison-node';
import { i18n } from '@kbn/i18n';
import { DEFAULT_SOURCE_ID } from '../../common/constants';
import { InfraClientCoreSetup, InfraClientStartDeps } from '../types';
import {
FetchData,
LogsFetchDataResponse,
HasData,
FetchDataParams,
} from '../../../observability/public';
import { callFetchLogSourceConfigurationAPI } from '../containers/logs/log_source/api/fetch_log_source_configuration';
import { callFetchLogSourceStatusAPI } from '../containers/logs/log_source/api/fetch_log_source_status';

export function getLogsHasDataFetcher(getStartServices: InfraClientCoreSetup['getStartServices']) {
return async () => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
interface StatsAggregation {
buckets: Array<{ key: string; doc_count: number }>;
}

interface SeriesAggregation {
buckets: Array<{
key_as_string: string;
key: number;
doc_count: number;
dataset: StatsAggregation;
}>;
}

interface LogParams {
index: string;
timestampField: string;
}

// if you need a core dep, we need to pass in more than just getStartServices
type StatsAndSeries = Pick<LogsFetchDataResponse, 'stats' | 'series'>;

// perform query
return true;
export function getLogsHasDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
): HasData {
return async () => {
const [core] = await getStartServices();
const sourceStatus = await callFetchLogSourceStatusAPI(DEFAULT_SOURCE_ID, core.http.fetch);
return sourceStatus.data.logIndicesExist;
};
}

export function getLogsOverviewDataFetcher(
getStartServices: InfraClientCoreSetup['getStartServices']
) {
return async (): Promise<LogsFetchDataResponse> => {
// if you need the data plugin, this is how you get it
// const [, startPlugins] = await getStartServices();
// const { data } = startPlugins;
): FetchData<LogsFetchDataResponse> {
return async (params) => {
const [core, startPlugins] = await getStartServices();
const { data } = startPlugins;

const sourceConfiguration = await callFetchLogSourceConfigurationAPI(
DEFAULT_SOURCE_ID,
core.http.fetch
);

const { stats, series } = await fetchLogsOverview(
{
index: sourceConfiguration.data.configuration.logAlias,
timestampField: sourceConfiguration.data.configuration.fields.timestamp,
},
params,
data
);

// if you need a core dep, we need to pass in more than just getStartServices
const timeSpanInMinutes =
(Date.parse(params.endTime).valueOf() - Date.parse(params.startTime).valueOf()) / (1000 * 60);

// perform query
return {
title: 'Log rate',
appLink: 'TBD', // TODO: what format should this be in, relative I assume?
stats: {
nginx: {
type: 'number',
label: 'nginx',
value: 345341,
},
'elasticsearch.audit': {
type: 'number',
label: 'elasticsearch.audit',
value: 164929,
title: i18n.translate('xpack.infra.logs.logOverview.logOverviewTitle', {
defaultMessage: 'Logs',
}),
appLink: `/app/logs/stream?logPosition=(end:${encode(params.endTime)},start:${encode(
params.startTime
)})`,
stats: normalizeStats(stats, timeSpanInMinutes),
series: normalizeSeries(series),
};
};
}

async function fetchLogsOverview(
logParams: LogParams,
params: FetchDataParams,
dataPlugin: InfraClientStartDeps['data']
): Promise<StatsAndSeries> {
const esSearcher = dataPlugin.search.getSearchStrategy('es');
return new Promise((resolve, reject) => {
esSearcher
.search({
params: {
index: logParams.index,
body: {
size: 0,
query: buildLogOverviewQuery(logParams, params),
aggs: buildLogOverviewAggregations(logParams, params),
},
},
'haproxy.log': {
type: 'number',
label: 'haproxy.log',
value: 51101,
})
.subscribe(
(response) => {
if (response.rawResponse.aggregations) {
resolve(processLogsOverviewAggregations(response.rawResponse.aggregations));
} else {
resolve({ stats: {}, series: {} });
}
},
(error) => reject(error)
);
});
}

function buildLogOverviewQuery(logParams: LogParams, params: FetchDataParams) {
return {
range: {
[logParams.timestampField]: {
gt: params.startTime,
lte: params.endTime,
format: 'strict_date_optional_time',
},
// Note: My understanding is that these series coordinates will be
// combined into objects that look like:
// { x: timestamp, y: value, g: label (e.g. nginx) }
// so they fit the stacked bar chart API
// https://elastic.github.io/elastic-charts/?path=/story/bar-chart--stacked-with-axis-and-legend
series: {
nginx: {
label: 'nginx',
coordinates: [
{ x: 1593000000000, y: 10014 },
{ x: 1593000900000, y: 12827 },
{ x: 1593001800000, y: 2946 },
{ x: 1593002700000, y: 14298 },
{ x: 1593003600000, y: 4096 },
],
},
'elasticsearch.audit': {
label: 'elasticsearch.audit',
coordinates: [
{ x: 1593000000000, y: 5676 },
{ x: 1593000900000, y: 6783 },
{ x: 1593001800000, y: 2394 },
{ x: 1593002700000, y: 4554 },
{ x: 1593003600000, y: 5659 },
],
},
'haproxy.log': {
label: 'haproxy.log',
coordinates: [
{ x: 1593000000000, y: 9085 },
{ x: 1593000900000, y: 9002 },
{ x: 1593001800000, y: 3940 },
{ x: 1593002700000, y: 5451 },
{ x: 1593003600000, y: 9133 },
],
},
};
}

function buildLogOverviewAggregations(logParams: LogParams, params: FetchDataParams) {
return {
stats: {
terms: {
field: 'event.dataset',
size: 4,
},
},
series: {
date_histogram: {
field: logParams.timestampField,
fixed_interval: params.bucketSize,
},
aggs: {
dataset: {
terms: {
field: 'event.dataset',
size: 4,
},
},
},
};
},
};
}

function processLogsOverviewAggregations(aggregations: {
stats: StatsAggregation;
series: SeriesAggregation;
}): StatsAndSeries {
const processedStats = aggregations.stats.buckets.reduce<StatsAndSeries['stats']>(
(result, bucket) => {
result[bucket.key] = {
type: 'number',
label: bucket.key,
value: bucket.doc_count,
};

return result;
},
{}
);

const processedSeries = aggregations.series.buckets.reduce<StatsAndSeries['series']>(
(result, bucket) => {
const x = bucket.key; // the timestamp of the bucket
bucket.dataset.buckets.forEach((b) => {
const label = b.key;
result[label] = result[label] || { label, coordinates: [] };
result[label].coordinates.push({ x, y: b.doc_count });
});

return result;
},
{}
);

return {
stats: processedStats,
series: processedSeries,
};
}

function normalizeStats(
stats: LogsFetchDataResponse['stats'],
timeSpanInMinutes: number
): LogsFetchDataResponse['stats'] {
return Object.keys(stats).reduce<LogsFetchDataResponse['stats']>((normalized, key) => {
normalized[key] = {
...stats[key],
value: stats[key].value / timeSpanInMinutes,
};
return normalized;
}, {});
}

function normalizeSeries(series: LogsFetchDataResponse['series']): LogsFetchDataResponse['series'] {
const seriesKeys = Object.keys(series);
const timestamps = seriesKeys.flatMap((key) => series[key].coordinates.map((c) => c.x));
const [first, second] = [...new Set(timestamps)].sort();
const timeSpanInMinutes = (second - first) / (1000 * 60);

return seriesKeys.reduce<LogsFetchDataResponse['series']>((normalized, key) => {
normalized[key] = {
...series[key],
coordinates: series[key].coordinates.map((c) => {
if (c.y) {
return { ...c, y: c.y / timeSpanInMinutes };
}
return c;
}),
};
return normalized;
}, {});
}
75 changes: 75 additions & 0 deletions x-pack/plugins/infra/public/utils/logs_overview_fetches.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { coreMock } from 'src/core/public/mocks';
import { dataPluginMock } from 'src/plugins/data/public/mocks';
import { CoreStart } from 'kibana/public';
import { getLogsHasDataFetcher } from './logs_overview_fetchers';
import { InfraClientStartDeps, InfraClientStartExports } from '../types';
import { callFetchLogSourceStatusAPI } from '../containers/logs/log_source/api/fetch_log_source_status';

// Note
// Calls to `.mock*` functions will fail the typecheck because how jest does the mocking.
// The calls will be preluded with a `@ts-expect-error`
jest.mock('../containers/logs/log_source/api/fetch_log_source_status');

function setup() {
const core = coreMock.createStart();
const data = dataPluginMock.createStartContract();

const mockedGetStartServices = jest.fn(() => {
const deps = { data };
return Promise.resolve([
core as CoreStart,
deps as InfraClientStartDeps,
void 0 as InfraClientStartExports,
]) as Promise<[CoreStart, InfraClientStartDeps, InfraClientStartExports]>;
});
return { core, mockedGetStartServices };
}

describe('Logs UI Observability Homepage Functions', () => {
describe('getLogsHasDataFetcher()', () => {
beforeEach(() => {
// @ts-expect-error
callFetchLogSourceStatusAPI.mockReset();
});
it('should return true when some index is present', async () => {
const { mockedGetStartServices } = setup();

// @ts-expect-error
callFetchLogSourceStatusAPI.mockResolvedValue({
data: { logIndexFields: [], logIndicesExist: true },
});

const hasData = getLogsHasDataFetcher(mockedGetStartServices);
const response = await hasData();

expect(callFetchLogSourceStatusAPI).toHaveBeenCalledTimes(1);
expect(response).toBe(true);
});

it('should return false when no index is present', async () => {
const { mockedGetStartServices } = setup();

// @ts-expect-error
callFetchLogSourceStatusAPI.mockResolvedValue({
data: { logIndexFields: [], logIndicesExist: false },
});

const hasData = getLogsHasDataFetcher(mockedGetStartServices);
const response = await hasData();

expect(callFetchLogSourceStatusAPI).toHaveBeenCalledTimes(1);
expect(response).toBe(false);
});
});

describe('getLogsOverviewDataFetcher()', () => {
it.skip('should work', async () => {
// Pending
});
});
});

0 comments on commit f3573f3

Please sign in to comment.