Skip to content

Commit

Permalink
Prometheus Query Exemplars
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Jul 17, 2023
1 parent 8c8e08c commit f04e834
Show file tree
Hide file tree
Showing 34 changed files with 1,503 additions and 201 deletions.
79 changes: 73 additions & 6 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ PromQL Support for prometheus Connector

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
- `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
Expand All @@ -210,3 +209,71 @@ Example::
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+


Prometheus Connector Table Functions
==========================================

`query_exemplars` Table Function
----------------------------
* This table function can be used to fetch exemplars of a query in a specific time range.
* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)`
- `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)`
Example::

> source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)
"schema": [
{
"name": "seriesLabels",
"type": "struct"
},
{
"name": "exemplars",
"type": "array"
}
],
"datarows": [
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "bar",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "EpTxMJ40fUus7aGY"
},
"timestamp": "2020-09-14 15:22:25.479",
"value": 6.0
}
]
],
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "foo",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "Olp9XHlq763ccsfa"
},
"timestamp": "2020-09-14 15:22:35.479",
"value": 19.0
},
{
"labels": {
"traceID": "hCtjygkIHwAN9vs4"
},
"timestamp": "2020-09-14 15:22:45.489",
"value": 20.0
}
]
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -218,4 +223,20 @@ public void testMetricSumAggregationCommand() {
}
}

@Test
public void testExplainForQueryExemplars() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_exemplars.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus."
+ "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryExemplarsFunctionTableScanOperator",
"description": {
"request": "query_exemplars(app_ads_ad_requests_total, 1689228292, 1689232299)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.prometheus.request.system.model.MetricMetadata;

Expand All @@ -18,4 +19,6 @@ public interface PrometheusClient {
List<String> getLabels(String metricName) throws IOException;

Map<String, List<MetricMetadata>> getAllMetrics() throws IOException;

JSONArray queryExemplars(String query, Long start, Long end) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public Map<String, List<MetricMetadata>> getAllMetrics() throws IOException {
return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef);
}

@Override
public JSONArray queryExemplars(String query, Long start, Long end) throws IOException {
String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s",
uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8),
start, end);
logger.debug("queryUrl: " + queryUrl);
Request request = new Request.Builder()
.url(queryUrl)
.build();
Response response = this.okHttpClient.newCall(request).execute();
JSONObject jsonObject = readResponse(response);
return jsonObject.getJSONArray("data");
}

private List<String> toListOfLabels(JSONArray array) {
List<String> result = new ArrayList<>();
for (int i = 0; i < array.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,15 @@ public class PrometheusFieldConstants {
public static final String TIMESTAMP = "@timestamp";
public static final String VALUE = "@value";
public static final String LABELS = "@labels";
public static final String MATRIX_KEY = "matrix";
public static final String RESULT_TYPE_KEY = "resultType";
public static final String METRIC_KEY = "metric";
public static final String RESULT_KEY = "result";
public static final String VALUES_KEY = "values";
public static final String SERIES_LABELS_KEY = "seriesLabels";
public static final String EXEMPLARS_KEY = "exemplars";
public static final String TRACE_ID_KEY = "traceID";
public static final String LABELS_KEY = "labels";
public static final String TIMESTAMP_KEY = "timestamp";
public static final String VALUE_KEY = "value";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.implementation;

import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest;
import org.opensearch.sql.prometheus.storage.QueryExemplarsTable;
import org.opensearch.sql.storage.Table;

public class QueryExemplarFunctionImplementation extends FunctionExpression implements
TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final PrometheusClient prometheusClient;

/**
* Required argument constructor.
*
* @param functionName name of the function
* @param arguments a list of arguments provided
*/
public QueryExemplarFunctionImplementation(FunctionName functionName, List<Expression> arguments,
PrometheusClient prometheusClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.prometheusClient = prometheusClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Prometheus defined function [%s] is only "
+ "supported in SOURCE clause with prometheus connector catalog",
functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg)
.getArgName(), ((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments));
}

private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List<Expression> arguments) {

PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
switch (argName) {
case QUERY:
request
.setQuery((String) literalValue.value());
break;
case STARTTIME:
request.setStartTime(((Number) literalValue.value()).longValue());
break;
case ENDTIME:
request.setEndTime(((Number) literalValue.value()).longValue());
break;
default:
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation;

/**
* This class is for query_exemplars table function resolver {@link FunctionResolver}.
* It takes care of validating function arguments and also creating
* required {@link org.opensearch.sql.expression.function.TableFunctionImplementation} Class.
*/
@RequiredArgsConstructor
public class QueryExemplarsTableFunctionResolver implements FunctionResolver {

private final PrometheusClient prometheusClient;
public static final String QUERY_EXEMPLARS = "query_exemplars";

public static final String QUERY = "query";
public static final String STARTTIME = "starttime";
public static final String ENDTIME = "endtime";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS);
FunctionSignature functionSignature =
new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG));
FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME);
validatePrometheusTableFunctionArguments(arguments, argumentNames);
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
return new QueryExemplarFunctionImplementation(functionName,
namedArguments, prometheusClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(QUERY_EXEMPLARS);
}
}
Loading

0 comments on commit f04e834

Please sign in to comment.