Skip to content

Commit

Permalink
improve OpenCensusProtobufInputRowParser performance (#25)
Browse files Browse the repository at this point in the history
- remove the need to parse timestamps into their own column
- reduce the number of times we copy maps of labels
- pre-size hashmaps and arrays when possible
- use loops instead of streams in critical sections

Combined these changes improve parsing performance by about 15%
- added benchmark for reference
  • Loading branch information
xvrl authored and harinirajendran committed Apr 19, 2021
1 parent 848cf7e commit 301b68b
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 91 deletions.
13 changes: 13 additions & 0 deletions extensions-contrib/opencensus-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- jmh -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.27</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.27</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import io.opencensus.proto.metrics.v1.LabelKey;
import io.opencensus.proto.metrics.v1.Metric;
import io.opencensus.proto.metrics.v1.Point;
import io.opencensus.proto.metrics.v1.TimeSeries;
import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.nio.ByteBuffer;
import java.time.Instant;
Expand All @@ -46,19 +49,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class OpenCensusProtobufInputRowParser implements ByteBufferInputRowParser
{
private static final Logger LOG = new Logger(OpenCensusProtobufInputRowParser.class);

private static final String SEPARATOR = "-";
private static final String VALUE_COLUMN = "value";
private static final String DEFAULT_METRIC_DIMENSION = "name";
private static final String VALUE = "value";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String DEFAULT_RESOURCE_PREFIX = "";

private final ParseSpec parseSpec;
private final List<String> dimensions;
private final DimensionsSpec dimensionsSpec;

private final String metricDimension;
private final String metricLabelPrefix;
Expand All @@ -73,12 +75,12 @@ public OpenCensusProtobufInputRowParser(
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.dimensionsSpec = parseSpec.getDimensionsSpec();
this.metricDimension = Strings.isNullOrEmpty(metricDimension) ? DEFAULT_METRIC_DIMENSION : metricDimension;
this.metricLabelPrefix = StringUtils.nullToEmptyNonDruidDataString(metricPrefix);
this.resourceLabelPrefix = resourcePrefix != null ? resourcePrefix : DEFAULT_RESOURCE_PREFIX;

LOG.info("Creating Open Census Protobuf parser with spec:" + parseSpec);
LOG.info("Creating OpenCensus Protobuf parser with spec:" + parseSpec);
}

@Override
Expand Down Expand Up @@ -115,11 +117,16 @@ public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec)
resourceLabelPrefix);
}


private interface LabelContext
{
void addRow(long millis, String metricName, Object value);
}

@Override
public List<InputRow> parseBatch(ByteBuffer input)
{

Metric metric;
final Metric metric;
try {
metric = Metric.parseFrom(input);
}
Expand All @@ -128,19 +135,22 @@ public List<InputRow> parseBatch(ByteBuffer input)
}

// Process metric descriptor labels map keys.
List<String> descriptorLabels = metric.getMetricDescriptor().getLabelKeysList().stream()
.map(s -> this.metricLabelPrefix + s.getKey())
.collect(Collectors.toList());
List<String> descriptorLabels = new ArrayList<>(metric.getMetricDescriptor().getLabelKeysCount());
for (LabelKey s : metric.getMetricDescriptor().getLabelKeysList()) {
descriptorLabels.add(this.metricLabelPrefix + s.getKey());
}

// Process resource labels map.
Map<String, String> resourceLabelsMap = metric.getResource().getLabelsMap().entrySet().stream()
.collect(Collectors.toMap(entry -> this.resourceLabelPrefix + entry.getKey(),
Map.Entry::getValue));
Map<String, String> resourceLabelsMap = CollectionUtils.mapKeys(
metric.getResource().getLabelsMap(),
key -> this.resourceLabelPrefix + key
);

final List<String> dimensions;
final List<String> schemaDimensions = dimensionsSpec.getDimensionNames();

if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
final List<String> dimensions;
if (!schemaDimensions.isEmpty()) {
dimensions = schemaDimensions;
} else {
Set<String> recordDimensions = new HashSet<>(descriptorLabels);

Expand All @@ -151,95 +161,83 @@ public List<InputRow> parseBatch(ByteBuffer input)
// map as they are derived dimensions, which get populated while parsing data for timeSeries
// hence add them to recordDimensions.
recordDimensions.add(metricDimension);
recordDimensions.add(VALUE);
recordDimensions.add(VALUE_COLUMN);

dimensions = Lists.newArrayList(
Sets.difference(recordDimensions, parseSpec.getDimensionsSpec().getDimensionExclusions())
Sets.difference(recordDimensions, dimensionsSpec.getDimensionExclusions())
);
}

// Flatten out the OpenCensus record into druid rows.
final int capacity = resourceLabelsMap.size()
+ descriptorLabels.size()
+ 2; // metric name + value columns

List<InputRow> rows = new ArrayList<>();
for (TimeSeries ts : metric.getTimeseriesList()) {
final LabelContext labelContext = (millis, metricName, value) -> {
// Add common resourceLabels.
Map<String, Object> event = new HashMap<>(capacity);
event.putAll(resourceLabelsMap);
// Add metric labels
for (int i = 0; i < metric.getMetricDescriptor().getLabelKeysCount(); i++) {
event.put(descriptorLabels.get(i), ts.getLabelValues(i).getValue());
}
// add metric name and value
event.put(metricDimension, metricName);
event.put(VALUE_COLUMN, value);
rows.add(new MapBasedInputRow(millis, dimensions, event));
};

// Add common resourceLabels.
Map<String, Object> labels = new HashMap<>(resourceLabelsMap);

// Add labels to record.
for (int i = 0; i < metric.getMetricDescriptor().getLabelKeysCount(); i++) {
labels.put(descriptorLabels.get(i), ts.getLabelValues(i).getValue());
}

// One row per timeSeries point.
for (Point point : ts.getPointsList()) {
// Time in millis
final Timestamp t = point.getTimestamp();
final long millis = Instant.ofEpochSecond(t.getSeconds(), t.getNanos()).toEpochMilli();
labels.put(TIMESTAMP_COLUMN, millis);

switch (point.getValueCase()) {
case DOUBLE_VALUE:
Map<String, Object> doubleGauge = new HashMap<>();
doubleGauge.putAll(labels);
doubleGauge.put(metricDimension, metric.getMetricDescriptor().getName());
doubleGauge.put(VALUE, point.getDoubleValue());
addDerivedMetricsRow(doubleGauge, dimensions, rows);
break;
case INT64_VALUE:
HashMap<String, Object> intGauge = new HashMap<>();
intGauge.putAll(labels);
intGauge.put(VALUE, point.getInt64Value());
intGauge.put(metricDimension, metric.getMetricDescriptor().getName());
addDerivedMetricsRow(intGauge, dimensions, rows);
break;
case SUMMARY_VALUE:
// count
Map<String, Object> summaryCount = new HashMap<>();
summaryCount.putAll(labels);
summaryCount.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
summaryCount.put(VALUE, point.getSummaryValue().getCount().getValue());
addDerivedMetricsRow(summaryCount, dimensions, rows);

// sum
Map<String, Object> summarySum = new HashMap<>();
summarySum.putAll(labels);
summarySum.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
summarySum.put(VALUE, point.getSummaryValue().getSnapshot().getSum().getValue());
addDerivedMetricsRow(summarySum, dimensions, rows);

// TODO : Do we put percentiles into druid ?
break;
case DISTRIBUTION_VALUE:
// count
Map<String, Object> distCount = new HashMap<>();
distCount.putAll(labels);
distCount.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "count");
distCount.put(VALUE, point.getDistributionValue().getCount());
addDerivedMetricsRow(distCount, dimensions, rows);

// sum
Map<String, Object> distSum = new HashMap<>();
distSum.putAll(labels);
distSum.put(metricDimension, metric.getMetricDescriptor().getName() + SEPARATOR + "sum");
distSum.put(VALUE, point.getDistributionValue().getSum());
addDerivedMetricsRow(distSum, dimensions, rows);
// TODO: How to handle buckets ?
break;
}
addPointRows(point, metric, labelContext);
}
}

return rows;
}

private void addDerivedMetricsRow(Map<String, Object> derivedMetrics, List<String> dimensions,
List<InputRow> rows)
private void addPointRows(Point point, Metric metric, LabelContext labelContext)
{
rows.add(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(derivedMetrics),
dimensions,
derivedMetrics
));
Timestamp timestamp = point.getTimestamp();
long millis = Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
String metricName = metric.getMetricDescriptor().getName();

switch (point.getValueCase()) {
case DOUBLE_VALUE:
labelContext.addRow(millis, metricName, point.getDoubleValue());
break;

case INT64_VALUE:
labelContext.addRow(millis, metricName, point.getInt64Value());
break;

case SUMMARY_VALUE:
// count
labelContext.addRow(
millis,
metricName + SEPARATOR + "count",
point.getSummaryValue().getCount().getValue()
);
// sum
labelContext.addRow(
millis,
metricName + SEPARATOR + "sum",
point.getSummaryValue().getSnapshot().getSum().getValue()
);
break;

// TODO : How to handle buckets and percentiles
case DISTRIBUTION_VALUE:
// count
labelContext.addRow(millis, metricName + SEPARATOR + "count", point.getDistributionValue().getCount());
// sum
labelContext.addRow(
millis,
metricName + SEPARATOR + "sum",
point.getDistributionValue().getSum()
);
break;
default:
}
}

@Override
Expand Down
Loading

0 comments on commit 301b68b

Please sign in to comment.