Skip to content

Commit

Permalink
fix timestamp milliseconds in OpenCensusProtobufInputRowParser
Browse files Browse the repository at this point in the history
- fix millisecond resolution being dropped when converting timestamps
- remove unnecessary conversion of ByteBuffer to ByteString
- make test code a little more concise
  • Loading branch information
xvrl authored and harinirajendran committed Apr 19, 2021
1 parent 643631f commit 848cf7e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import io.opencensus.proto.metrics.v1.Metric;
import io.opencensus.proto.metrics.v1.Point;
import io.opencensus.proto.metrics.v1.TimeSeries;
Expand All @@ -38,6 +38,7 @@
import org.apache.druid.java.util.common.parsers.ParseException;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -120,7 +121,7 @@ public List<InputRow> parseBatch(ByteBuffer input)

Metric metric;
try {
metric = Metric.parseFrom(ByteString.copyFrom(input));
metric = Metric.parseFrom(input);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Protobuf message could not be parsed");
Expand Down Expand Up @@ -172,7 +173,9 @@ public List<InputRow> parseBatch(ByteBuffer input)
// One row per timeSeries point.
for (Point point : ts.getPointsList()) {
// Time in millis
labels.put(TIMESTAMP_COLUMN, point.getTimestamp().getSeconds() * 1000);
final Timestamp t = point.getTimestamp();
final long millis = Instant.ofEpochSecond(t.getSeconds(), t.getNanos()).toEpochMilli();
labels.put(TIMESTAMP_COLUMN, millis);

switch (point.getValueCase()) {
case DOUBLE_VALUE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,23 @@
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
import java.util.List;

public class OpenCensusProtobufInputRowParserTest
{
private static final Instant INSTANT = Instant.parse("2019-07-12T09:30:01.123Z");
private static final Timestamp TIMESTAMP = Timestamp.newBuilder()
.setSeconds(INSTANT.getEpochSecond())
.setNanos(INSTANT.getNano()).build();
@Rule
public ExpectedException expectedException = ExpectedException.none();

Expand Down Expand Up @@ -101,22 +103,15 @@ public void setUp()


@Test
public void testDoubleGaugeParse() throws Exception
public void testDoubleGaugeParse()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());
Metric metric = doubleGaugeMetric(TIMESTAMP);

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = doubleGaugeMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
InputRow row = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())).get(0);
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());

assertDimensionEquals(row, "name", "metric_gauge_double");
assertDimensionEquals(row, "foo_key", "foo_value");
Expand All @@ -126,22 +121,15 @@ public void testDoubleGaugeParse() throws Exception
}

@Test
public void testIntGaugeParse() throws Exception
public void testIntGaugeParse()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();
Metric metric = intGaugeMetric(TIMESTAMP);

Metric metric = intGaugeMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
InputRow row = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray())).get(0);
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());

assertDimensionEquals(row, "name", "metric_gauge_int64");
assertDimensionEquals(row, "foo_key", "foo_value");
Expand All @@ -150,85 +138,64 @@ public void testIntGaugeParse() throws Exception
}

@Test
public void testSummaryParse() throws Exception
public void testSummaryParse()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = summaryMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);
Metric metric = summaryMetric(TIMESTAMP);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_summary-count");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(40, row.getMetric("value").doubleValue(), 0.0);

row = rows.get(1);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_summary-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(10, row.getMetric("value").doubleValue(), 0.0);
}

@Test
public void testDistributionParse() throws Exception
public void testDistributionParse()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());
Metric metric = distributionMetric(TIMESTAMP);

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = distributionMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

InputRow row = rows.get(0);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_distribution-count");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(100, row.getMetric("value").intValue());

row = rows.get(1);
Assert.assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());
Assert.assertEquals(INSTANT.toEpochMilli(), row.getTimestampFromEpoch());
assertDimensionEquals(row, "name", "metric_distribution-sum");
assertDimensionEquals(row, "foo_key", "foo_value");
Assert.assertEquals(500, row.getMetric("value").doubleValue(), 0.0);
}

@Test
public void testDimensionsParseWithParseSpecDimensions() throws Exception
public void testDimensionsParseWithParseSpecDimensions()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpecWithDimensions, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();
Metric metric = summaryMetric(TIMESTAMP);

Metric metric = summaryMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

Expand All @@ -245,21 +212,14 @@ public void testDimensionsParseWithParseSpecDimensions() throws Exception
}

@Test
public void testDimensionsParseWithoutParseSpecDimensions() throws Exception
public void testDimensionsParseWithoutParseSpecDimensions()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, "");

DateTime dateTime = new DateTime(2019, 07, 12, 9, 30, ISOChronology.getInstanceUTC());

Timestamp timestamp = Timestamp.newBuilder().setSeconds(dateTime.getMillis() / 1000)
.setNanos((int) ((dateTime.getMillis() % 1000) * 1000000)).build();

Metric metric = summaryMetric(timestamp);
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);
Metric metric = summaryMetric(TIMESTAMP);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

Expand All @@ -278,16 +238,14 @@ public void testDimensionsParseWithoutParseSpecDimensions() throws Exception
}

@Test
public void testMetricNameOverride() throws Exception
public void testMetricNameOverride()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "dimension_name", null, "");

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

Expand All @@ -305,16 +263,14 @@ public void testMetricNameOverride() throws Exception
}

@Test
public void testDefaultPrefix() throws Exception
public void testDefaultPrefix()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, null);

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

Expand All @@ -332,16 +288,14 @@ public void testDefaultPrefix() throws Exception
}

@Test
public void testCustomPrefix() throws Exception
public void testCustomPrefix()
{
//configure parser with desc file
OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, "descriptor.", "custom.");

Metric metric = summaryMetric(Timestamp.getDefaultInstance());
ByteArrayOutputStream out = new ByteArrayOutputStream();
metric.writeTo(out);

List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(out.toByteArray()));
List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(metric.toByteArray()));

Assert.assertEquals(2, rows.size());

Expand Down

0 comments on commit 848cf7e

Please sign in to comment.