Skip to content

Commit

Permalink
Fix translate metrics without rate (elastic#110614) (elastic#110621)
Browse files Browse the repository at this point in the history
Currently, we incorrectly remove the `@timestamp` attribute from
the EsRelation when translating metric aggregates.
  • Loading branch information
dnhatn authored Jul 9, 2024
1 parent 41c662d commit c23f124
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private static Aggregate toStandardAggregate(Aggregate metrics) {
final LogicalPlan child = metrics.child().transformDown(EsRelation.class, r -> {
var attributes = new ArrayList<>(new AttributeSet(metrics.inputSet()));
attributes.removeIf(a -> a.name().equals(MetadataAttribute.TSID_FIELD));
if (attributes.stream().noneMatch(a -> a.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) == false) {
if (attributes.stream().noneMatch(a -> a.name().equals(MetadataAttribute.TIMESTAMP_FIELD))) {
attributes.removeIf(a -> a.name().equals(MetadataAttribute.TIMESTAMP_FIELD));
}
return new EsRelation(r.source(), r.index(), new ArrayList<>(attributes), IndexMode.STANDARD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5477,6 +5477,56 @@ METRICS k8s avg(round(1.05 * rate(network.total_bytes_in))) BY bucket(@timestamp
assertThat(Expressions.attribute(values.field()).name(), equalTo("cluster"));
}

public void testMetricsWithoutRate() {
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
List<String> queries = List.of("""
METRICS k8s count(to_long(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
| LIMIT 10
""", """
METRICS k8s | STATS count(to_long(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
| LIMIT 10
""", """
FROM k8s | STATS count(to_long(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
| LIMIT 10
""");
List<LogicalPlan> plans = new ArrayList<>();
for (String query : queries) {
var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
plans.add(plan);
}
for (LogicalPlan plan : plans) {
Limit limit = as(plan, Limit.class);
Aggregate aggregate = as(limit.child(), Aggregate.class);
assertThat(aggregate.aggregateType(), equalTo(Aggregate.AggregateType.STANDARD));
assertThat(aggregate.aggregates(), hasSize(2));
assertThat(aggregate.groupings(), hasSize(1));
Eval eval = as(aggregate.child(), Eval.class);
assertThat(eval.fields(), hasSize(2));
assertThat(Alias.unwrap(eval.fields().get(0)), instanceOf(Bucket.class));
assertThat(Alias.unwrap(eval.fields().get(1)), instanceOf(ToLong.class));
EsRelation relation = as(eval.child(), EsRelation.class);
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
}
for (int i = 1; i < plans.size(); i++) {
assertThat(plans.get(i), equalTo(plans.get(0)));
}
}

public void testRateInStats() {
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
var query = """
METRICS k8s | STATS max(rate(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
| LIMIT 10
""";
VerificationException error = expectThrows(
VerificationException.class,
() -> logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)))
);
assertThat(error.getMessage(), equalTo("""
Found 1 problem
line 1:25: the rate aggregate[rate(network.total_bytes_in)] can only be used within the metrics command"""));
}

private Literal nullOf(DataType dataType) {
return new Literal(Source.EMPTY, null, dataType);
}
Expand Down

0 comments on commit c23f124

Please sign in to comment.