Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch: Add PPL response parser for logs and table format #7

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 99 additions & 11 deletions public/app/plugins/datasource/elasticsearch/elastic_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ import {
MutableDataFrame,
PreferredVisualisationType,
} from '@grafana/data';
import { ElasticsearchAggregation } from './types';
import { ElasticsearchAggregation, ElasticsearchQueryType } from './types';

export class ElasticResponse {
constructor(private targets: any, private response: any) {
constructor(
private targets: any,
private response: any,
private targetType: ElasticsearchQueryType = ElasticsearchQueryType.Lucene
) {
this.targets = targets;
this.response = response;
this.targetType = targetType;
}

processMetrics(esAgg: any, target: any, seriesList: any, props: any) {
Expand Down Expand Up @@ -402,9 +407,16 @@ export class ElasticResponse {
}

getLogs(logMessageField?: string, logLevelField?: string): DataQueryResponse {
if (this.targetType === ElasticsearchQueryType.PPL) {
return this.processPPLResponseToDataFrames(true, logMessageField, logLevelField);
}
return this.processResponseToDataFrames(true, logMessageField, logLevelField);
}

getTable() {
return this.processPPLResponseToDataFrames(false);
}

processResponseToDataFrames(
isLogsRequest: boolean,
logMessageField?: string,
Expand All @@ -425,6 +437,7 @@ export class ElasticResponse {
propNames,
this.targets[0].timeField,
isLogsRequest,
this.targetType,
logMessageField,
logLevelField
);
Expand Down Expand Up @@ -478,7 +491,7 @@ export class ElasticResponse {
}
}

return { data: dataFrame };
return { data: dataFrame, key: this.targets[0]?.refId };
}

processResponseToSeries = () => {
Expand Down Expand Up @@ -516,8 +529,55 @@ export class ElasticResponse {
}
}

return { data: seriesList };
return { data: seriesList, key: this.targets[0]?.refId };
};

processPPLResponseToDataFrames(
isLogsRequest: boolean,
logMessageField?: string,
logLevelField?: string
): DataQueryResponse {
const dataFrame: DataFrame[] = [];

// Each target is inputted separately from Elasticdatasource for PPL
const target = this.targets;
//map the schema into an array of string containing its name
const schema = this.response.schema.map((a: { name: any }) => a.name);
//combine the schema key and response value
const response = _.map(this.response.datarows, arr => _.zipObject(schema, arr));
//flatten the response
const { flattenSchema, docs } = flattenResponses(response);

if (this.response.datarows.error) {
throw this.getErrorFromElasticResponse(this.response, this.response.datarows.error);
}

if (response.length > 0) {
let series = createEmptyDataFrame(
flattenSchema,
target.timeField,
isLogsRequest,
this.targetType,
logMessageField,
logLevelField
);
// Add a row for each document
for (const doc of docs) {
if (logLevelField) {
// Remap level field based on the datasource config. This field is then used in explore to figure out the
// log level. We may rewrite some actual data in the level field if they are different.
doc['level'] = doc[logLevelField];
}
series.add(doc);
}
if (isLogsRequest) {
series = addPreferredVisualisationType(series, 'logs');
}
series.refId = target.refId;
dataFrame.push(series);
}
return { data: dataFrame, key: target?.refId };
}
}

type Doc = {
Expand Down Expand Up @@ -562,6 +622,30 @@ const flattenHits = (hits: Doc[]): { docs: Array<Record<string, any>>; propNames
return { docs, propNames };
};

/**
* Flatten the response which can be nested. This flattens it so that it is one level deep and the keys are:
* `level1Name.level2Name...`. Also returns list of all schemas from all the response
* @param responses
*/
const flattenResponses = (responses: any): { docs: Array<Record<string, any>>; flattenSchema: string[] } => {
const docs: any[] = [];
// We keep a list of all schemas so that we can create all the fields in the dataFrame, this can lead
// to wide sparse dataframes in case the scheme is different per document.
let flattenSchema: string[] = [];

for (const response of responses) {
const doc = flatten(response);

for (const schema of Object.keys(doc)) {
if (flattenSchema.indexOf(schema) === -1) {
flattenSchema.push(schema);
}
}
docs.push(doc);
}
return { docs, flattenSchema };
};

/**
* Create empty dataframe but with created fields. Fields are based from propNames (should be from the response) and
* also from configuration specified fields for message, time, and level.
Expand All @@ -574,18 +658,22 @@ const createEmptyDataFrame = (
propNames: string[],
timeField: string,
isLogsRequest: boolean,
targetType: ElasticsearchQueryType,
logMessageField?: string,
logLevelField?: string
): MutableDataFrame => {
const series = new MutableDataFrame({ fields: [] });

series.addField({
config: {
filterable: true,
},
name: timeField,
type: FieldType.time,
});
//PPL table response should add time field only when it is part of the query response
if (targetType === ElasticsearchQueryType.Lucene || isLogsRequest) {
series.addField({
config: {
filterable: true,
},
name: timeField,
type: FieldType.time,
});
}

if (logMessageField) {
series.addField({
Expand Down
10 changes: 10 additions & 0 deletions public/app/plugins/datasource/elasticsearch/query_def.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ export const movingAvgModelSettings: any = {
],
};

export const pplFormatTypes = [
{ text: 'Table', value: 'table' },
{ text: 'Logs', value: 'logs' },
{ text: 'Time series', value: 'time_series' },
];

export function getMetricAggTypes(esVersion: any) {
return _.filter(metricAggTypes, f => {
if (f.minVersion || f.maxVersion) {
Expand Down Expand Up @@ -299,6 +305,10 @@ export function defaultBucketAgg() {
return { type: 'date_histogram', id: '2', settings: { interval: 'auto' } };
}

export function defaultPPLFormat() {
return 'table';
}

export const findMetricById = (metrics: any[], id: any) => {
return _.find(metrics, { id: id });
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DataFrameView, FieldCache, KeyValue, MutableDataFrame } from '@grafana/data';
import { ElasticResponse } from '../elastic_response';
import flatten from 'app/core/utils/flatten';
import { ElasticsearchQueryType } from '../types';

describe('ElasticResponse', () => {
let targets: any;
Expand Down Expand Up @@ -1320,4 +1321,110 @@ describe('ElasticResponse', () => {
expect(field?.values.toArray()).toEqual(['debug', 'info']);
});
});

describe('PPL log query response', () => {
const targets: any = [
{
refId: 'A',
isLogsQuery: true,
queryType: ElasticsearchQueryType.PPL,
timeField: 'timestamp',
format: 'table',
query: 'source=sample_data_logs',
},
];
const response = {
datarows: [
['test-data1', 'message1', { coordinates: { lat: 5, lon: 10 } }],
['test-data2', 'message2', { coordinates: { lat: 6, lon: 11 } }],
['test-data3', 'message3', { coordinates: { lat: 7, lon: 12 } }],
],
schema: [
{ name: 'data', type: 'string' },
{ name: 'message', type: 'string' },
{ name: 'geo', type: 'struct' },
],
};
const targetType = ElasticsearchQueryType.PPL;
it('should return all data', () => {
const result = new ElasticResponse(targets, response, targetType).getLogs();
expect(result.data.length).toBe(1);
const logResults = result.data[0] as MutableDataFrame;
const fields = logResults.fields.map(f => {
return {
name: f.name,
type: f.type,
};
});
expect(fields).toContainEqual({ name: 'data', type: 'string' });
expect(fields).toContainEqual({ name: 'message', type: 'string' });
expect(fields).toContainEqual({ name: 'geo.coordinates.lat', type: 'string' });
expect(fields).toContainEqual({ name: 'geo.coordinates.lon', type: 'string' });

let rows = new DataFrameView(logResults);
expect(rows.length).toBe(3);
for (let i = 0; i < rows.length; i++) {
const r = rows.get(i);
expect(r.data).toEqual(response.datarows[i][0]);
expect(r.message).toEqual(response.datarows[i][1]);
}
});
});

describe('PPL table query response', () => {
const targets: any = [
{
refId: 'A',
context: 'explore',
interval: '10s',
isLogsQuery: false,
query: 'source=sample_data | stats count(test) by timestamp',
queryType: ElasticsearchQueryType.PPL,
timeField: 'timestamp',
format: 'table',
},
];
const response = {
datarows: [
[5, '2020-11-01 00:39:02.912Z'],
[1, '2020-11-01 03:26:21.326Z'],
[4, '2020-11-01 03:34:43.399Z'],
],
schema: [
{ name: 'test', type: 'string' },
{ name: 'timestamp', type: 'timestamp' },
],
};
const targetType = ElasticsearchQueryType.PPL;

it('should create dataframes with filterable fields', () => {
const result = new ElasticResponse(targets, response, targetType).getTable();
for (const field of result.data[0].fields) {
expect(field.config.filterable).toBe(true);
}
});
it('should return all data', () => {
const result = new ElasticResponse(targets, response, targetType).getTable();
expect(result.data.length).toBe(1);
const logResults = result.data[0] as MutableDataFrame;
const fields = logResults.fields.map(f => {
return {
name: f.name,
type: f.type,
};
});
expect(fields).toEqual([
{ name: 'test', type: 'string' },
{ name: 'timestamp', type: 'string' },
]);

let rows = new DataFrameView(logResults);
expect(rows.length).toBe(3);
for (let i = 0; i < rows.length; i++) {
const r = rows.get(i);
expect(r.test).toEqual(response.datarows[i][0]);
expect(r.timestamp).toEqual(response.datarows[i][1]);
}
});
});
});
8 changes: 8 additions & 0 deletions public/app/plugins/datasource/elasticsearch/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ElasticsearchOptions extends DataSourceJsonData {
logMessageField?: string;
logLevelField?: string;
dataLinks?: DataLinkConfig[];
pplSupportEnabled?: boolean;
}

export interface ElasticsearchAggregation {
Expand All @@ -23,12 +24,19 @@ export interface ElasticsearchQuery extends DataQuery {
isLogsQuery: boolean;
alias?: string;
query?: string;
queryType?: ElasticsearchQueryType;
bucketAggs?: ElasticsearchAggregation[];
metrics?: ElasticsearchAggregation[];
format?: string;
}

export type DataLinkConfig = {
field: string;
url: string;
datasourceUid?: string;
};

export enum ElasticsearchQueryType {
Lucene = 'lucene',
PPL = 'PPL',
}