Skip to content

Commit

Permalink
Filter Out Metrics with NoRecordedValue Flag Set (#157)
Browse files Browse the repository at this point in the history
Metrics that contain the NoRecordedValue Flag are being written to Druid with a 0 value. We should properly handle them in the backend
  • Loading branch information
Michael Li authored and pagrawal10 committed Nov 27, 2023
1 parent d5cf05b commit 9aa75cf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.metrics.v1.DataPointFlags;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
Expand Down Expand Up @@ -146,14 +147,22 @@ private List<InputRow> parseMetric(Metric metric, Map<String, Object> resourceAt
inputRows = new ArrayList<>(metric.getSum().getDataPointsCount());
metric.getSum()
.getDataPointsList()
.forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)));
.forEach(dataPoint -> {
if (hasRecordedValue(dataPoint)) {
inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName));
}
});
break;
}
case GAUGE: {
inputRows = new ArrayList<>(metric.getGauge().getDataPointsCount());
metric.getGauge()
.getDataPointsList()
.forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)));
.forEach(dataPoint -> {
if (hasRecordedValue(dataPoint)) {
inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName));
}
});
break;
}
// TODO Support HISTOGRAM and SUMMARY metrics
Expand All @@ -167,6 +176,11 @@ private List<InputRow> parseMetric(Metric metric, Map<String, Object> resourceAt
return inputRows;
}

private static boolean hasRecordedValue(NumberDataPoint d)
{
return (d.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) == 0;
}

private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,
Map<String, Object> resourceAttributes,
String metricName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import io.opentelemetry.proto.metrics.v1.DataPointFlags;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.druid.data.input.InputRow;
Expand Down Expand Up @@ -404,6 +405,32 @@ public void testInvalidMetricType()
Assert.assertEquals(0, rowList.size());
}

@Test
public void testNoRecordedValueMetric()
{
metricBuilder.setName("example_gauge")
.getGaugeBuilder()
.addDataPointsBuilder()
.setAsInt(6)
.setFlags(DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.setTimeUnixNano(TIMESTAMP);

MetricsData metricsData = metricsDataBuilder.build();

SettableByteEntity<ByteEntity> settableByteEntity = new SettableByteEntity<>();
settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray()));
CloseableIterator<InputRow> rows = new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
settableByteEntity,
"metric.name",
"raw.value",
"descriptor.",
"custom."
).read();

Assert.assertFalse(rows.hasNext());
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand Down

0 comments on commit 9aa75cf

Please sign in to comment.