Skip to content

Commit

Permalink
Modify the OpenTelemetry ProtobufReader's Handling of Attribute Types (
Browse files Browse the repository at this point in the history
…#77)

* Only handle INT_VALUE, BOOL_VALUE, DOUBLE_VALUE and STRING_VALUE and return null otherwise
* fix wrong class in the DruidModule service provider definition
  • Loading branch information
marcusgreer authored and m-ghazanfar committed May 29, 2023
1 parent e4c5aa8 commit 2e73c16
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,8 +96,14 @@ private List<InputRow> parseMetricsData(final MetricsData metricsData)
Map<String, Object> resourceAttributes = resourceMetrics.getResource()
.getAttributesList()
.stream()
.collect(Collectors.toMap(kv -> resourceAttributePrefix + kv.getKey(),
kv -> parseAnyValue(kv.getValue())));
.collect(HashMap::new,
(m, kv) -> {
Object value = parseAnyValue(kv.getValue());
if (value != null) {
m.put(resourceAttributePrefix + kv.getKey(), value);
}
},
HashMap::putAll);
return resourceMetrics.getInstrumentationLibraryMetricsList()
.stream()
.flatMap(libraryMetrics -> libraryMetrics.getMetricsList()
Expand Down Expand Up @@ -142,8 +150,8 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,
{

int capacity = resourceAttributes.size()
+ dataPoint.getAttributesCount()
+ 2; // metric name + value columns
+ dataPoint.getAttributesCount()
+ 2; // metric name + value columns
Map<String, Object> event = Maps.newHashMapWithExpectedSize(capacity);
event.put(metricDimension, metricName);

Expand All @@ -154,32 +162,34 @@ private InputRow parseNumberDataPoint(NumberDataPoint dataPoint,
}

event.putAll(resourceAttributes);
dataPoint.getAttributesList().forEach(att -> event.put(metricAttributePrefix + att.getKey(),
parseAnyValue(att.getValue())));
dataPoint.getAttributesList().forEach(att -> {
Object value = parseAnyValue(att.getValue());
if (value != null) {
event.put(metricAttributePrefix + att.getKey(), value);
}
});

return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event);
}

@Nullable
private static Object parseAnyValue(AnyValue value)
{
switch (value.getValueCase()) {
case INT_VALUE:
return value.getIntValue();
case BOOL_VALUE:
return value.getBoolValue();
case ARRAY_VALUE:
return value.getArrayValue();
case BYTES_VALUE:
return value.getBytesValue();
case DOUBLE_VALUE:
return value.getDoubleValue();
case KVLIST_VALUE:
return value.getKvlistValue();
case STRING_VALUE:
return value.getStringValue();

// TODO: Support KVLIST_VALUE, ARRAY_VALUE and BYTES_VALUE

default:
// VALUE_NOT_SET:
return "";
// VALUE_NOT_SET
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.druid.data.input.opencensus.protobuf.OpenCensusProtobufExtensionsModule
org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufInputFormat
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.druid.data.input.InputRow;
Expand Down Expand Up @@ -282,6 +283,66 @@ public void testDimensionSpecExclusions()
Assert.assertFalse(row.getDimensions().contains("descriptor.color"));
}

@Test
public void testUnsupportedValueTypes()
{
KeyValueList kvList = KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("foo")
.setValue(AnyValue.newBuilder().setStringValue("bar").build()))
.build();

metricsDataBuilder.getResourceMetricsBuilder(0)
.getResourceBuilder()
.addAttributesBuilder()
.setKey(RESOURCE_ATTRIBUTE_ENV)
.setValue(AnyValue.newBuilder().setKvlistValue(kvList).build());

metricBuilder
.setName("example_sum")
.getSumBuilder()
.addDataPointsBuilder()
.setAsInt(6)
.setTimeUnixNano(TIMESTAMP)
.addAllAttributes(ImmutableList.of(
KeyValue.newBuilder()
.setKey(METRIC_ATTRIBUTE_COLOR)
.setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()).build(),
KeyValue.newBuilder()
.setKey(METRIC_ATTRIBUTE_FOO_KEY)
.setValue(AnyValue.newBuilder().setKvlistValue(kvList).build()).build()));

MetricsData metricsData = metricsDataBuilder.build();

CloseableIterator<InputRow> rows = new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
new ByteEntity(metricsData.toByteArray()),
"metric.name",
"raw.value",
"descriptor.",
"custom."
).read();

List<InputRow> rowList = new ArrayList<>();
rows.forEachRemaining(rowList::add);
Assert.assertEquals(1, rowList.size());

InputRow row = rowList.get(0);
Assert.assertEquals(4, row.getDimensions().size());
assertDimensionEquals(row, "metric.name", "example_sum");
assertDimensionEquals(row, "custom.country", "usa");
assertDimensionEquals(row, "descriptor.color", "red");

// Unsupported resource attribute type is omitted
Assert.assertEquals(0, row.getDimension("custom.env").size());

// Unsupported metric attribute type is omitted
Assert.assertEquals(0, row.getDimension("descriptor.foo_key").size());

assertDimensionEquals(row, "raw.value", "6");
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand Down

0 comments on commit 2e73c16

Please sign in to comment.