From 848cf7ef6aca3f1eefe7ce6c0bd2b120cbde7d54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 16 Dec 2020 13:55:58 -0800 Subject: [PATCH] fix timestamp milliseconds in OpenCensusProtobufInputRowParser - fix millisecond resolution being dropped when converting timestamps - remove unnecessary conversion of ByteBuffer to ByteString - make test code a little more concise --- .../OpenCensusProtobufInputRowParser.java | 9 +- .../OpenCensusProtobufInputRowParserTest.java | 116 ++++++------------ 2 files changed, 41 insertions(+), 84 deletions(-) diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java index d7e337997c51..24a36e3a190f 100644 --- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java @@ -24,8 +24,8 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; import io.opencensus.proto.metrics.v1.Metric; import io.opencensus.proto.metrics.v1.Point; import io.opencensus.proto.metrics.v1.TimeSeries; @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.parsers.ParseException; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -120,7 +121,7 @@ public List parseBatch(ByteBuffer input) Metric metric; try { - metric = Metric.parseFrom(ByteString.copyFrom(input)); + metric = Metric.parseFrom(input); } catch (InvalidProtocolBufferException e) { throw new ParseException(e, "Protobuf message could not be parsed"); @@ -172,7 +173,9 @@ public List parseBatch(ByteBuffer input) // One row per timeSeries point. for (Point point : ts.getPointsList()) { // Time in millis - labels.put(TIMESTAMP_COLUMN, point.getTimestamp().getSeconds() * 1000); + final Timestamp t = point.getTimestamp(); + final long millis = Instant.ofEpochSecond(t.getSeconds(), t.getNanos()).toEpochMilli(); + labels.put(TIMESTAMP_COLUMN, millis); switch (point.getValueCase()) { case DOUBLE_VALUE: diff --git a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java index 9e0cf6881f53..503dc754a3cc 100644 --- a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java +++ b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java @@ -45,21 +45,23 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.joda.time.DateTime; -import org.joda.time.chrono.ISOChronology; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.Collections; import java.util.List; public class OpenCensusProtobufInputRowParserTest { + private static final Instant INSTANT = Instant.parse("2019-07-12T09:30:01.123Z"); + private static final Timestamp TIMESTAMP = Timestamp.newBuilder() + .setSeconds(INSTANT.getEpochSecond()) + .setNanos(INSTANT.getNano()).build(); @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -101,22 +103,15 @@ public void setUp() @Test - public void testDoubleGaugeParse() throws Exception + public void testDoubleGaugeParse() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); + Metric metric = doubleGaugeMetric(TIMESTAMP); - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); - - Metric metric = doubleGaugeMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - - InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + InputRow row = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())).get(0); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_gauge_double"); assertDimensionEquals(row, "foo_key", "foo_value"); @@ -126,22 +121,15 @@ public void testDoubleGaugeParse() throws Exception } @Test - public void testIntGaugeParse() throws Exception + public void testIntGaugeParse() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); - - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); + Metric metric = intGaugeMetric(TIMESTAMP); - Metric metric = intGaugeMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - - InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + InputRow row = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())).get(0); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_gauge_int64"); assertDimensionEquals(row, "foo_key", "foo_value"); @@ -150,85 +138,64 @@ public void testIntGaugeParse() throws Exception } @Test - public void testSummaryParse() throws Exception + public void testSummaryParse() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); - - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); - - Metric metric = summaryMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); + Metric metric = summaryMetric(TIMESTAMP); - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); InputRow row = rows.get(0); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_summary-count"); assertDimensionEquals(row, "foo_key", "foo_value"); Assert.assertEquals(40, row.getMetric("value").doubleValue(), 0.0); row = rows.get(1); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_summary-sum"); assertDimensionEquals(row, "foo_key", "foo_value"); Assert.assertEquals(10, row.getMetric("value").doubleValue(), 0.0); } @Test - public void testDistributionParse() throws Exception + public void testDistributionParse() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); + Metric metric = distributionMetric(TIMESTAMP); - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); - - Metric metric = distributionMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); InputRow row = rows.get(0); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_distribution-count"); assertDimensionEquals(row, "foo_key", "foo_value"); Assert.assertEquals(100, row.getMetric("value").intValue()); row = rows.get(1); - Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch()); + Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch()); assertDimensionEquals(row, "name", "metric_distribution-sum"); assertDimensionEquals(row, "foo_key", "foo_value"); Assert.assertEquals(500, row.getMetric("value").doubleValue(), 0.0); } @Test - public void testDimensionsParseWithParseSpecDimensions() throws Exception + public void testDimensionsParseWithParseSpecDimensions() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpecWithDimensions, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); - - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); + Metric metric = summaryMetric(TIMESTAMP); - Metric metric = summaryMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); @@ -245,21 +212,14 @@ public void testDimensionsParseWithParseSpecDimensions() throws Exception } @Test - public void testDimensionsParseWithoutParseSpecDimensions() throws Exception + public void testDimensionsParseWithoutParseSpecDimensions() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); - DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC()); - - Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000) - .setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build(); - - Metric metric = summaryMetric(timestamp); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); + Metric metric = summaryMetric(TIMESTAMP); - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); @@ -278,16 +238,14 @@ public void testDimensionsParseWithoutParseSpecDimensions() throws Exception } @Test - public void testMetricNameOverride() throws Exception + public void testMetricNameOverride() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "dimension_name", null, ""); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); @@ -305,16 +263,14 @@ public void testMetricNameOverride() throws Exception } @Test - public void testDefaultPrefix() throws Exception + public void testDefaultPrefix() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, null); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size()); @@ -332,16 +288,14 @@ public void testDefaultPrefix() throws Exception } @Test - public void testCustomPrefix() throws Exception + public void testCustomPrefix() { //configure parser with desc file OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, "descriptor.", "custom."); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - metric.writeTo(out); - List rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())); + List rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())); Assert.assertEquals(2, rows.size());