diff --git a/src/plugins/data/common/search/aggs/agg_config.ts b/src/plugins/data/common/search/aggs/agg_config.ts index 0e9cf6aeb1f2f..da42da79a87e1 100644 --- a/src/plugins/data/common/search/aggs/agg_config.ts +++ b/src/plugins/data/common/search/aggs/agg_config.ts @@ -17,6 +17,7 @@ import { SerializedFieldFormat, } from 'src/plugins/expressions/common'; +import moment from 'moment'; import { IAggType } from './agg_type'; import { writeParams } from './agg_params'; import { IAggConfigs } from './agg_configs'; @@ -172,6 +173,13 @@ export class AggConfig { return _.get(this.params, key); } + getTimeShift(): undefined | moment.Duration { + const rawTimeShift = this.getParam('timeShift'); + if (!rawTimeShift) return undefined; + const [, amount, unit] = rawTimeShift.match(/(\d+)(\w)/); + return moment.duration(Number(amount), unit); + } + write(aggs?: IAggConfigs) { return writeParams(this.type.params, this, aggs); } diff --git a/src/plugins/data/common/search/aggs/metrics/count.ts b/src/plugins/data/common/search/aggs/metrics/count.ts index 8a10d7edb3f83..2d2809e6bd4c8 100644 --- a/src/plugins/data/common/search/aggs/metrics/count.ts +++ b/src/plugins/data/common/search/aggs/metrics/count.ts @@ -25,6 +25,13 @@ export const getCountMetricAgg = () => defaultMessage: 'Count', }); }, + params: [ + { + name: 'timeShift', + type: 'string', + write: () => {}, + }, + ], getSerializedFormat(agg) { return { id: 'number', diff --git a/src/plugins/data/common/search/aggs/metrics/count_fn.ts b/src/plugins/data/common/search/aggs/metrics/count_fn.ts index 40c87db57eedc..c1b46f1a332d1 100644 --- a/src/plugins/data/common/search/aggs/metrics/count_fn.ts +++ b/src/plugins/data/common/search/aggs/metrics/count_fn.ts @@ -48,6 +48,10 @@ export const aggCount = (): FunctionDefinition => ({ defaultMessage: 'Schema to use for this aggregation', }), }, + timeShift: { + types: ['string'], + help: '', + }, customLabel: { types: ['string'], help: i18n.translate('data.search.aggs.metrics.count.customLabel.help', { diff --git a/src/plugins/data/common/search/aggs/types.ts b/src/plugins/data/common/search/aggs/types.ts index e57410962fc08..c11d4a6116d92 100644 --- a/src/plugins/data/common/search/aggs/types.ts +++ b/src/plugins/data/common/search/aggs/types.ts @@ -178,7 +178,7 @@ export interface AggParamsMapping { [BUCKET_TYPES.TERMS]: AggParamsTerms; [METRIC_TYPES.AVG]: AggParamsAvg; [METRIC_TYPES.CARDINALITY]: AggParamsCardinality; - [METRIC_TYPES.COUNT]: BaseAggParams; + [METRIC_TYPES.COUNT]: BaseAggParams & { timeShift?: string }; [METRIC_TYPES.GEO_BOUNDS]: AggParamsGeoBounds; [METRIC_TYPES.GEO_CENTROID]: AggParamsGeoCentroid; [METRIC_TYPES.MAX]: AggParamsMax; diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index e2ee1a31757cb..42d68f41b9b1a 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -7,6 +7,8 @@ */ import { i18n } from '@kbn/i18n'; +import moment from 'moment'; +import { DatatableColumn, DatatableRow } from 'src/plugins/expressions'; import { Adapters } from 'src/plugins/inspector/common'; import { @@ -62,126 +64,261 @@ export const handleRequest = async ({ searchSource.setField('index', indexPattern); searchSource.setField('size', 0); - // Create a new search source that inherits the original search source - // but has the appropriate timeRange applied via a filter. - // This is a temporary solution until we properly pass down all required - // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). - // Using callParentStartHandlers: true we make sure, that the parent searchSource - // onSearchRequestStart will be called properly even though we use an inherited - // search source. - const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); - const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true }); - - aggs.setTimeRange(timeRange as TimeRange); - - // For now we need to mirror the history of the passed search source, since - // the request inspector wouldn't work otherwise. - Object.defineProperty(requestSearchSource, 'history', { - get() { - return searchSource.history; - }, - set(history) { - return (searchSource.history = history); - }, - }); - - requestSearchSource.setField('aggs', function () { - return aggs.toDsl(metricsAtAllLevels); - }); - - requestSearchSource.onRequestStart((paramSearchSource, options) => { - return aggs.onSearchRequestStart(paramSearchSource, options); - }); - - // If timeFields have been specified, use the specified ones, otherwise use primary time field of index - // pattern if it's available. - const defaultTimeField = indexPattern?.getTimeField?.(); - const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; - const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; - - // If a timeRange has been specified and we had at least one timeField available, create range - // filters for that those time fields - if (timeRange && allTimeFields.length > 0) { - timeFilterSearchSource.setField('filter', () => { - return allTimeFields - .map((fieldName) => getTime(indexPattern, timeRange, { fieldName, forceNow })) - .filter(isRangeFilter); + const timeShifts: Record = {}; + aggs + .getAll() + .filter((agg) => agg.schema === 'metric') + .map((agg) => agg.getTimeShift()) + .forEach((timeShift) => { + timeShifts[String(timeShift?.asMilliseconds() || 0)] = timeShift; }); - } - requestSearchSource.setField('filter', filters); - requestSearchSource.setField('query', query); - - let request; - if (inspectorAdapters.requests) { - inspectorAdapters.requests.reset(); - request = inspectorAdapters.requests.start( - i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { - defaultMessage: 'Data', - }), - { - description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { - defaultMessage: - 'This request queries Elasticsearch to fetch the data for the visualization.', - }), - searchSessionId, + const originalAggs = aggs; + + const partialResponses = await Promise.all( + Object.values(timeShifts).map(async (timeShift) => { + const currentAggs = aggs; + if (timeShift) { + // currentAggs = originalAggs.clone(); + // currentAggs.aggs = currentAggs.aggs.filter( + // (agg) => + // agg.schema !== 'metric' || + // (agg.getTimeShift() && + // agg.getTimeShift()!.asMilliseconds() === timeShift.asMilliseconds()) + // ); + } else { + // currentAggs = Object.values(timeShifts).length === 1 ? originalAggs : originalAggs.clone(); + // currentAggs.aggs = currentAggs.aggs.filter((agg) => !agg.getTimeShift()); } - ); - request.stats(getRequestInspectorStats(requestSearchSource)); - } + // Create a new search source that inherits the original search source + // but has the appropriate timeRange applied via a filter. + // This is a temporary solution until we properly pass down all required + // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). + // Using callParentStartHandlers: true we make sure, that the parent searchSource + // onSearchRequestStart will be called properly even though we use an inherited + // search source. + const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); + const requestSearchSource = timeFilterSearchSource.createChild({ + callParentStartHandlers: true, + }); - try { - const response = await requestSearchSource.fetch({ - abortSignal, - sessionId: searchSessionId, - }); + const currentTimeRange: TimeRange | undefined = timeRange ? { ...timeRange } : undefined; - if (request) { - request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response }); - } - - (searchSource as any).rawResponse = response; - } catch (e) { - // Log any error during request to the inspector - if (request) { - request.error({ json: e }); - } - throw e; - } finally { - // Add the request body no matter if things went fine or not - if (request) { - request.json(await requestSearchSource.getSearchRequestBody()); - } - } + if (currentTimeRange) { + if (timeShift && currentTimeRange.from) { + currentTimeRange.from = moment(currentTimeRange.from).subtract(timeShift).toISOString(); + } - // Note that rawResponse is not deeply cloned here, so downstream applications using courier - // must take care not to mutate it, or it could have unintended side effects, e.g. displaying - // response data incorrectly in the inspector. - let response = (searchSource as any).rawResponse; - for (const agg of aggs.aggs) { - if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { - response = await agg.type.postFlightRequest( - response, - aggs, - agg, - requestSearchSource, - inspectorAdapters.requests, - abortSignal, - searchSessionId - ); - } - } + if (timeShift && currentTimeRange.from) { + currentTimeRange.to = moment(currentTimeRange.to).subtract(timeShift).toISOString(); + } + } - const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null; - const tabifyParams = { - metricsAtAllLevels, - partialRows, - timeRange: parsedTimeRange - ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } - : undefined, - }; + currentAggs.setTimeRange(currentTimeRange as TimeRange); - const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams); + // For now we need to mirror the history of the passed search source, since + // the request inspector wouldn't work otherwise. + Object.defineProperty(requestSearchSource, 'history', { + get() { + return searchSource.history; + }, + set(history) { + return (searchSource.history = history); + }, + }); + + requestSearchSource.setField('aggs', function () { + return currentAggs.toDsl(metricsAtAllLevels); + }); + + requestSearchSource.onRequestStart((paramSearchSource, options) => { + return currentAggs.onSearchRequestStart(paramSearchSource, options); + }); + + // If timeFields have been specified, use the specified ones, otherwise use primary time field of index + // pattern if it's available. + const defaultTimeField = indexPattern?.getTimeField?.(); + const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; + const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; + + // If a timeRange has been specified and we had at least one timeField available, create range + // filters for that those time fields + if (currentTimeRange && allTimeFields.length > 0) { + timeFilterSearchSource.setField('filter', () => { + return allTimeFields + .map((fieldName) => getTime(indexPattern, currentTimeRange, { fieldName, forceNow })) + .filter(isRangeFilter); + }); + } + + requestSearchSource.setField('filter', filters); + requestSearchSource.setField('query', query); + + let request; + if (inspectorAdapters.requests) { + inspectorAdapters.requests.reset(); + request = inspectorAdapters.requests.start( + i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { + defaultMessage: 'Data', + }), + { + description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { + defaultMessage: + 'This request queries Elasticsearch to fetch the data for the visualization.', + }), + searchSessionId, + } + ); + request.stats(getRequestInspectorStats(requestSearchSource)); + } - return tabifiedResponse; + try { + const response = await requestSearchSource.fetch({ + abortSignal, + sessionId: searchSessionId, + }); + + if (request) { + request.stats(getResponseInspectorStats(response, searchSource)).ok({ json: response }); + } + + if (!timeShift) { + (searchSource as any).rawResponse = response; + } + (requestSearchSource as any).rawResponse = response; + } catch (e) { + // Log any error during request to the inspector + if (request) { + request.error({ json: e }); + } + throw e; + } finally { + // Add the request body no matter if things went fine or not + if (request) { + request.json(await requestSearchSource.getSearchRequestBody()); + } + } + + // Note that rawResponse is not deeply cloned here, so downstream applications using courier + // must take care not to mutate it, or it could have unintended side effects, e.g. displaying + // response data incorrectly in the inspector. + let response = (requestSearchSource as any).rawResponse; + for (const agg of currentAggs.aggs) { + if (agg.enabled && typeof agg.type.postFlightRequest === 'function') { + response = await agg.type.postFlightRequest( + response, + currentAggs, + agg, + requestSearchSource, + inspectorAdapters.requests, + abortSignal, + searchSessionId + ); + } + } + + const parsedTimeRange = currentTimeRange + ? calculateBounds(currentTimeRange, { forceNow }) + : null; + const tabifyParams = { + metricsAtAllLevels, + partialRows, + timeRange: parsedTimeRange + ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } + : undefined, + }; + + const tabifiedResponse = tabifyAggResponse(currentAggs, response, tabifyParams); + console.log(tabifiedResponse); + + return tabifiedResponse; + }) + ); + + // todo - do an outer join on all partial responses + if (partialResponses.length === 1) { + return partialResponses[0]; + } else { + const fullResponse = partialResponses[0]; + const fullResponseTimeShift = Object.values(timeShifts)[0]; + fullResponse.rows.forEach((row) => { + fullResponse.columns.forEach((column) => { + const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; + if ( + columnAgg.getTimeShift()?.asMilliseconds() !== fullResponseTimeShift?.asMilliseconds() + ) { + delete row[column.id]; + } + }); + }); + const joinAggs = aggs + .bySchemaName('bucket') + .filter((agg) => !timeFields || !timeFields.includes(agg.fieldName())); + const joinColumns = fullResponse.columns.filter( + (c) => + c.meta.sourceParams.schema !== 'metric' && + (!timeFields || !timeFields.includes(c.meta.sourceParams?.params?.field)) + ); + const timeJoinAggs = aggs + .bySchemaName('bucket') + .filter((agg) => timeFields && timeFields.includes(agg.fieldName())); + const timeJoinColumns = fullResponse.columns.filter( + (c) => + c.meta.sourceParams.schema !== 'metric' && + timeFields && + timeFields.includes(c.meta.sourceParams?.params?.field) + ); + partialResponses.shift(); + partialResponses.forEach((partialResponse, index) => { + const timeShift = Object.values(timeShifts)[index + 1]; + const missingCols: DatatableColumn[] = []; + partialResponse.columns.forEach((column) => { + const columnAgg = aggs.aggs.find((a) => a.id === column.meta.sourceParams.id)!; + if (columnAgg.getTimeShift()?.asMilliseconds() === timeShift?.asMilliseconds()) { + missingCols.push(column); + } + }); + partialResponse.rows.forEach((row) => { + const targetRow = getColumnIdentifier(joinColumns, row, timeJoinColumns, timeShift); + const targetRowIndex = fullResponse.rows.findIndex((r) => { + return ( + getColumnIdentifier(joinColumns, r, timeJoinColumns, moment.duration(0, 'ms')) === + targetRow + ); + }); + if (targetRowIndex !== -1) { + missingCols.forEach((c) => { + fullResponse.rows[targetRowIndex][c.id] = row[c.id]; + }); + } else { + // add it to the bottom - this might be confusing in some cases + // can we insert it at the right place? + const updatedRow: DatatableRow = {}; + joinColumns.forEach((c) => { + updatedRow[c.id] = row[c.id]; + }); + timeJoinColumns.forEach((c) => { + updatedRow[c.id] = moment(row[c.id]).add(timeShift).valueOf(); + }); + missingCols.forEach((c) => { + updatedRow[c.id] = row[c.id]; + }); + fullResponse.rows.push(updatedRow); + } + }); + }); + return fullResponse; + } }; +function getColumnIdentifier( + joinColumns: DatatableColumn[], + row: DatatableRow, + timeJoinColumns: DatatableColumn[], + timeShift: moment.Duration | undefined +) { + const joinStr = joinColumns.map((c) => String(row[c.id])); + const timeJoinStr = timeJoinColumns.map((c) => + String(moment(row[c.id]).add(timeShift).valueOf()) + ); + return joinStr.join(',') + timeJoinStr.join(','); +} diff --git a/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx b/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx index a54901a2a2fe1..1595e88bf98ec 100644 --- a/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx +++ b/x-pack/plugins/lens/public/editor_frame_service/editor_frame/frame_layout.tsx @@ -45,7 +45,6 @@ export function FrameLayout(props: FrameLayoutProps) { {props.workspacePanel} - {props.suggestionsPanel}
{