Skip to content

Commit

Permalink
Enable Table Function and PromQL function (#1719) (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
opensearch-trigger-bot[bot] authored Jun 16, 2023
1 parent 7fe2fb4 commit a528031
Show file tree
Hide file tree
Showing 16 changed files with 598 additions and 15 deletions.
24 changes: 24 additions & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,27 @@ Example queries
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

PromQL Support for prometheus Connector
==========================================

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
3 changes: 3 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,11 @@ mlArg
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
| SOURCE EQUAL tableFunction
| INDEX EQUAL tableFunction
;


tableSourceClause
: tableSource (COMMA tableSource)*
;
Expand Down
24 changes: 18 additions & 6 deletions ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFromFilterContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableFunctionContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableSourceClauseContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TopCommandContext;
import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.WhereCommandContext;
Expand All @@ -33,7 +34,6 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.Token;
Expand All @@ -46,6 +46,7 @@
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedArgument;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
Expand All @@ -62,6 +63,7 @@
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser;
Expand Down Expand Up @@ -346,7 +348,11 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) {
*/
@Override
public UnresolvedPlan visitFromClause(FromClauseContext ctx) {
return visitTableSourceClause(ctx.tableSourceClause());
if (ctx.tableFunction() != null) {
return visitTableFunction(ctx.tableFunction());
} else {
return visitTableSourceClause(ctx.tableSourceClause());
}
}

@Override
Expand All @@ -357,10 +363,16 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) {
//<TODO>
return null;
public UnresolvedPlan visitTableFunction(TableFunctionContext ctx) {
ImmutableList.Builder<UnresolvedExpression> builder = ImmutableList.builder();
ctx.functionArgs().functionArg().forEach(arg
-> {
String argName = (arg.ident() != null) ? arg.ident().getText() : null;
builder.add(
new UnresolvedArgument(argName,
this.internalVisitExpression(arg.valueExpression())));
});
return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.opensearch.sql.ppl.parser;

import static org.opensearch.sql.ast.dsl.AstDSL.qualifiedName;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NOT_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.POSITION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ public String visitRelation(Relation node, String context) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public String visitTableFunction(TableFunction node, String context) {
//<TODO>
return null;
String arguments =
node.getArguments().stream()
.map(unresolvedExpression
-> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.joining(","));
return StringUtils.format("source=%s(%s)", node.getFunctionName().toString(), arguments);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public void testSearchCommandWithDotInIndexName() {
);
}

@Ignore
@Test
public void testSearchWithPrometheusQueryRangeWithPositionedArguments() {
assertEqual("search source = prometheus.query_range(\"test{code='200'}\",1234, 12345, 3)",
Expand All @@ -124,7 +123,6 @@ public void testSearchWithPrometheusQueryRangeWithPositionedArguments() {
));
}

@Ignore
@Test
public void testSearchWithPrometheusQueryRangeWithNamedArguments() {
assertEqual("search source = prometheus.query_range(query = \"test{code='200'}\", "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.opensearch.sql.ast.dsl.AstDSL.relation;

import java.util.Collections;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Expand All @@ -36,7 +35,6 @@ public void testSearchCommand() {
}

@Test
@Ignore
public void testTableFunctionCommand() {
assertEquals("source=prometheus.query_range(***,***,***,***)",
anonymize("source=prometheus.query_range('afsd',123,123,3)")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.response;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;

/**
* Default implementation of QueryRangeFunctionResponseHandle.
*/
public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFunctionResponseHandle {

private final JSONObject responseObject;
private Iterator<ExprValue> responseIterator;
private ExecutionEngine.Schema schema;

/**
* Constructor.
*
* @param responseObject Prometheus responseObject.
*/
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
this.responseObject = responseObject;
constructIteratorAndSchema();
}

private void constructIteratorAndSchema() {
List<ExprValue> result = new ArrayList<>();
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
if ("matrix".equals(responseObject.getString("resultType"))) {
JSONArray itemArray = responseObject.getJSONArray("result");
for (int i = 0; i < itemArray.length(); i++) {
JSONObject item = itemArray.getJSONObject(i);
JSONObject metric = item.getJSONObject("metric");
JSONArray values = item.getJSONArray("values");
if (i == 0) {
columnList = getColumnList(metric);
}
for (int j = 0; j < values.length(); j++) {
LinkedHashMap<String, ExprValue> linkedHashMap =
extractRow(metric, values.getJSONArray(j), columnList);
result.add(new ExprTupleValue(linkedHashMap));
}
}
} else {
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
+ "Response Parsing. 'matrix' resultType is expected",
responseObject.getString("resultType")));
}
this.schema = new ExecutionEngine.Schema(columnList);
this.responseIterator = result.iterator();
}

@NotNull
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
for (ExecutionEngine.Schema.Column column : columnList) {
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
} else if (column.getName().equals(VALUE)) {
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
} else {
linkedHashMap.put(column.getName(),
new ExprStringValue(metric.getString(column.getName())));
}
}
return linkedHashMap;
}


private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
for (String key : metric.keySet()) {
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
}
return columnList;
}

@Override
public boolean hasNext() {
return responseIterator.hasNext();
}

@Override
public ExprValue next() {
return responseIterator.next();
}

@Override
public ExecutionEngine.Schema schema() {
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.response;

import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;

/**
* Handle Prometheus response.
*/
public interface QueryRangeFunctionResponseHandle {

/**
* Return true if Prometheus response has more result.
*/
boolean hasNext();

/**
* Return Prometheus response as {@link ExprValue}. Attention, the method must been called when
* hasNext return true.
*/
ExprValue next();

/**
* Return ExecutionEngine.Schema of the Prometheus response.
*/
ExecutionEngine.Schema schema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.scan;

import lombok.AllArgsConstructor;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* TableScanBuilder for query_range table function of prometheus connector.
* we can merge this when we refactor for existing
* ppl queries based on prometheus connector.
*/
@AllArgsConstructor
public class QueryRangeFunctionTableScanBuilder extends TableScanBuilder {

private final PrometheusClient prometheusClient;

private final PrometheusQueryRequest prometheusQueryRequest;

@Override
public TableScanOperator build() {
return new QueryRangeFunctionTableScanOperator(prometheusClient, prometheusQueryRequest);
}

@Override
public boolean pushDownProject(LogicalProject project) {
return true;
}
}
Loading

0 comments on commit a528031

Please sign in to comment.