diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractAccumulator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractAccumulator.java index 0d8e71535c3..871ecd40e0f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractAccumulator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractAccumulator.java @@ -8,6 +8,7 @@ import io.opentelemetry.sdk.metrics.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessor; import java.util.List; abstract class AbstractAccumulator { @@ -25,9 +26,17 @@ static Aggregator getAggregator( return meterProviderSharedState .getViewRegistry() .findView(descriptor) + .getAggregatorFactory() .create( meterProviderSharedState.getResource(), meterSharedState.getInstrumentationLibraryInfo(), descriptor); } + + static LabelsProcessor getLabelsProcessor(MeterProviderSharedState meterProviderSharedState, + InstrumentDescriptor descriptor) { + return meterProviderSharedState.getViewRegistry().findView(descriptor) + .getLabelsProcessorFactory().create(); + } + } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index a925e76e7fa..faec742196c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -12,7 +12,9 @@ import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricProducer; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.Collection; @@ -102,6 +104,12 @@ public static SdkMeterProviderBuilder builder() { * } */ public void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory) { - sharedState.getViewRegistry().registerView(selector, aggregatorFactory); + sharedState.getViewRegistry().registerView(selector, View.builder().setAggregatorFactory(aggregatorFactory).build()); + } + public void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory, + LabelsProcessorFactory labelsProcessorFactory) { + sharedState.getViewRegistry().registerView(selector, View.builder() + .setAggregatorFactory(aggregatorFactory) + .setLabelsProcessorFactory(labelsProcessorFactory).build()); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulator.java index fb9335df65d..61bb654bfd2 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulator.java @@ -6,10 +6,12 @@ package io.opentelemetry.sdk.metrics; import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.metrics.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessor; import java.util.List; import java.util.Map; import java.util.Objects; @@ -21,6 +23,7 @@ final class SynchronousInstrumentAccumulator extends AbstractAccumulator { private final ReentrantLock collectLock; private final Aggregator aggregator; private final InstrumentProcessor instrumentProcessor; + private final LabelsProcessor labelsProcessor; static SynchronousInstrumentAccumulator create( MeterProviderSharedState meterProviderSharedState, @@ -30,19 +33,22 @@ static SynchronousInstrumentAccumulator create( getAggregator(meterProviderSharedState, meterSharedState, descriptor); return new SynchronousInstrumentAccumulator<>( aggregator, - new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos())); + new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()), + getLabelsProcessor(meterProviderSharedState, descriptor)); } SynchronousInstrumentAccumulator( - Aggregator aggregator, InstrumentProcessor instrumentProcessor) { + Aggregator aggregator, InstrumentProcessor instrumentProcessor, LabelsProcessor labelsProcessor) { aggregatorLabels = new ConcurrentHashMap<>(); collectLock = new ReentrantLock(); this.aggregator = aggregator; this.instrumentProcessor = instrumentProcessor; + this.labelsProcessor = labelsProcessor; } AggregatorHandle bind(Labels labels) { Objects.requireNonNull(labels, "labels"); + labels = labelsProcessor.onLabelsBound(Context.current(), labels); AggregatorHandle aggregatorHandle = aggregatorLabels.get(labels); if (aggregatorHandle != null && aggregatorHandle.acquire()) { // At this moment it is guaranteed that the Bound is in the map and will not be removed. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java index ff18d057f37..fcb520f39d4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java @@ -9,6 +9,7 @@ import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import java.util.EnumMap; import java.util.LinkedHashMap; import java.util.Map; @@ -23,15 +24,15 @@ * never blocked. */ final class ViewRegistry { - private static final LinkedHashMap EMPTY_CONFIG = + private static final LinkedHashMap EMPTY_CONFIG = new LinkedHashMap<>(); - static final AggregatorFactory CUMULATIVE_SUM = AggregatorFactory.sum(true); - static final AggregatorFactory SUMMARY = AggregatorFactory.minMaxSumCount(); - static final AggregatorFactory LAST_VALUE = AggregatorFactory.lastValue(); + static final View CUMULATIVE_SUM = View.builder().setAggregatorFactory(AggregatorFactory.sum(true)).build(); + static final View SUMMARY = View.builder().setAggregatorFactory(AggregatorFactory.minMaxSumCount()).build(); + static final View LAST_VALUE = View.builder().setAggregatorFactory(AggregatorFactory.lastValue()).build(); // The lock is used to ensure only one updated to the configuration happens at any moment. private final ReentrantLock lock = new ReentrantLock(); - private volatile EnumMap> configuration; + private volatile EnumMap> configuration; ViewRegistry() { this.configuration = new EnumMap<>(InstrumentType.class); @@ -43,10 +44,10 @@ final class ViewRegistry { configuration.put(InstrumentType.VALUE_OBSERVER, EMPTY_CONFIG); } - void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory) { + void registerView(InstrumentSelector selector, View aggregatorFactory) { lock.lock(); try { - EnumMap> newConfiguration = + EnumMap> newConfiguration = new EnumMap<>(configuration); newConfiguration.put( selector.getInstrumentType(), @@ -60,10 +61,10 @@ void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFacto } } - AggregatorFactory findView(InstrumentDescriptor descriptor) { - LinkedHashMap configPerType = + View findView(InstrumentDescriptor descriptor) { + LinkedHashMap configPerType = configuration.get(descriptor.getType()); - for (Map.Entry entry : configPerType.entrySet()) { + for (Map.Entry entry : configPerType.entrySet()) { if (entry.getKey().matcher(descriptor.getName()).matches()) { return entry.getValue(); } @@ -72,7 +73,7 @@ AggregatorFactory findView(InstrumentDescriptor descriptor) { return getDefaultSpecification(descriptor); } - private static AggregatorFactory getDefaultSpecification(InstrumentDescriptor descriptor) { + private static View getDefaultSpecification(InstrumentDescriptor descriptor) { switch (descriptor.getType()) { case COUNTER: case UP_DOWN_COUNTER: @@ -87,11 +88,11 @@ private static AggregatorFactory getDefaultSpecification(InstrumentDescriptor de throw new IllegalArgumentException("Unknown descriptor type: " + descriptor.getType()); } - private static LinkedHashMap newLinkedHashMap( + private static LinkedHashMap newLinkedHashMap( Pattern pattern, - AggregatorFactory aggregatorFactory, - LinkedHashMap parentConfiguration) { - LinkedHashMap result = new LinkedHashMap<>(); + View aggregatorFactory, + LinkedHashMap parentConfiguration) { + LinkedHashMap result = new LinkedHashMap<>(); result.put(pattern, aggregatorFactory); result.putAll(parentConfiguration); return result; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageLabelsProcessor.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageLabelsProcessor.java new file mode 100644 index 00000000000..fb611b11693 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageLabelsProcessor.java @@ -0,0 +1,21 @@ +package io.opentelemetry.sdk.metrics.processor; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.api.metrics.common.LabelsBuilder; +import io.opentelemetry.context.Context; + +public class BaggageLabelsProcessor implements LabelsProcessor { + private final BaggageMetricsLabelsExtractor baggageMetricsLabelsExtractor; + + public BaggageLabelsProcessor(BaggageMetricsLabelsExtractor baggageMetricsLabelsExtractor) { + this.baggageMetricsLabelsExtractor = baggageMetricsLabelsExtractor; + } + + @Override + public Labels onLabelsBound(Context ctx, Labels labels) { + LabelsBuilder labelsBuilder = labels.toBuilder(); + baggageMetricsLabelsExtractor.fromBaggage(ctx).forEach(labelsBuilder::put); + + return labelsBuilder.build(); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageMetricsLabelsExtractor.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageMetricsLabelsExtractor.java new file mode 100644 index 00000000000..85a12f12b8b --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/BaggageMetricsLabelsExtractor.java @@ -0,0 +1,9 @@ +package io.opentelemetry.sdk.metrics.processor; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.context.Context; + +public interface BaggageMetricsLabelsExtractor { + + Labels fromBaggage(Context ctx); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessor.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessor.java new file mode 100644 index 00000000000..dcf230be89b --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessor.java @@ -0,0 +1,18 @@ +package io.opentelemetry.sdk.metrics.processor; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.context.Context; + +public interface LabelsProcessor { + /** + * Called when bind() method is called. Allows to manipulate labels which this + * instrument is bound to. Particular use case includes enriching lables and/or adding more labels + * depending on the Context + * + * @param ctx context of the operation + * @param labels immutable labels. When processors are chained output labels of the previous one is passed as + * an input to the next one. Last labels returned by a chain of processors are used for bind() operation. + * @return labels to be used as an input to the next processor in chain or bind() operation if this is the last processor + */ + Labels onLabelsBound(Context ctx, Labels labels); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessorFactory.java new file mode 100644 index 00000000000..d9d3ec195e9 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/LabelsProcessorFactory.java @@ -0,0 +1,17 @@ +package io.opentelemetry.sdk.metrics.processor; + +public interface LabelsProcessorFactory { + static LabelsProcessorFactory noop() { + return NoopLabelsProcessor::new; + } + static LabelsProcessorFactory baggageExtractor(BaggageMetricsLabelsExtractor labelsExtractor) { + return () -> new BaggageLabelsProcessor(labelsExtractor); + } + + /** + * Returns a new {@link LabelsProcessorFactory} + * + * @return new {@link LabelsProcessorFactory} + */ + LabelsProcessor create(); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/NoopLabelsProcessor.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/NoopLabelsProcessor.java new file mode 100644 index 00000000000..2220b418f36 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/processor/NoopLabelsProcessor.java @@ -0,0 +1,12 @@ +package io.opentelemetry.sdk.metrics.processor; + +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.context.Context; + +public class NoopLabelsProcessor implements LabelsProcessor { + + @Override + public Labels onLabelsBound(Context c, Labels labels) { + return labels; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/view/View.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/view/View.java new file mode 100644 index 00000000000..022e83b8f01 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/view/View.java @@ -0,0 +1,26 @@ +package io.opentelemetry.sdk.metrics.view; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; +import javax.annotation.concurrent.Immutable; + +@AutoValue +@Immutable +public abstract class View { + public abstract AggregatorFactory getAggregatorFactory(); + + public abstract LabelsProcessorFactory getLabelsProcessorFactory(); + + public static Builder builder() { + return new AutoValue_View.Builder().setLabelsProcessorFactory(LabelsProcessorFactory.noop()); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setAggregatorFactory(AggregatorFactory aggregatorFactory); + public abstract Builder setLabelsProcessorFactory(LabelsProcessorFactory labelsProcessorFactory); + + public abstract View build(); + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulatorTest.java index 32872585b13..1b96938ed8f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentAccumulatorTest.java @@ -5,6 +5,10 @@ package io.opentelemetry.sdk.metrics; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessor; + +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; + import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.metrics.common.Labels; @@ -28,12 +32,13 @@ public class SynchronousInstrumentAccumulatorTest { AggregatorFactory.lastValue() .create( Resource.getEmpty(), InstrumentationLibraryInfo.create("test", "1.0"), DESCRIPTOR); + private final LabelsProcessor labelsProcessor = LabelsProcessorFactory.noop().create(); @Test void sameAggregator_ForSameLabelSet() { SynchronousInstrumentAccumulator accumulator = new SynchronousInstrumentAccumulator<>( - aggregator, new InstrumentProcessor<>(aggregator, testClock.now())); + aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), labelsProcessor); AggregatorHandle aggregatorHandle = accumulator.bind(Labels.of("K", "V")); AggregatorHandle duplicateAggregatorHandle = accumulator.bind(Labels.of("K", "V")); try { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewRegistryTest.java index 73f2195a57e..ee2d6b0b596 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewRegistryTest.java @@ -5,6 +5,8 @@ package io.opentelemetry.sdk.metrics; +import io.opentelemetry.sdk.metrics.view.View; + import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; @@ -18,6 +20,7 @@ class ViewRegistryTest { @Test void selection_onType() { AggregatorFactory factory = AggregatorFactory.lastValue(); + View view = View.builder().setAggregatorFactory(factory).build(); ViewRegistry viewRegistry = new ViewRegistry(); viewRegistry.registerView( @@ -25,7 +28,7 @@ void selection_onType() { .setInstrumentType(InstrumentType.COUNTER) .setInstrumentNameRegex(".*") .build(), - factory); + view); assertThat( viewRegistry.findView( InstrumentDescriptor.create( @@ -42,6 +45,7 @@ void selection_onType() { @Test void selection_onName() { AggregatorFactory factory = AggregatorFactory.lastValue(); + View view = View.builder().setAggregatorFactory(factory).build(); ViewRegistry viewRegistry = new ViewRegistry(); viewRegistry.registerView( @@ -49,7 +53,7 @@ void selection_onName() { .setInstrumentType(InstrumentType.COUNTER) .setInstrumentNameRegex("overridden") .build(), - factory); + view); assertThat( viewRegistry.findView( InstrumentDescriptor.create( @@ -66,7 +70,9 @@ void selection_onName() { @Test void selection_LastAddedViewWins() { AggregatorFactory factory1 = AggregatorFactory.lastValue(); + View view1 = View.builder().setAggregatorFactory(factory1).build(); AggregatorFactory factory2 = AggregatorFactory.minMaxSumCount(); + View view2 = View.builder().setAggregatorFactory(factory2).build(); ViewRegistry viewRegistry = new ViewRegistry(); viewRegistry.registerView( @@ -74,13 +80,13 @@ void selection_LastAddedViewWins() { .setInstrumentType(InstrumentType.COUNTER) .setInstrumentNameRegex(".*") .build(), - factory1); + view1); viewRegistry.registerView( InstrumentSelector.builder() .setInstrumentType(InstrumentType.COUNTER) .setInstrumentNameRegex("overridden") .build(), - factory2); + view2); assertThat( viewRegistry.findView( @@ -97,6 +103,7 @@ void selection_LastAddedViewWins() { @Test void selection_regex() { AggregatorFactory factory = AggregatorFactory.lastValue(); + View view = View.builder().setAggregatorFactory(factory).build(); ViewRegistry viewRegistry = new ViewRegistry(); viewRegistry.registerView( @@ -104,7 +111,7 @@ void selection_regex() { .setInstrumentNameRegex("overrid(es|den)") .setInstrumentType(InstrumentType.COUNTER) .build(), - factory); + view); assertThat( viewRegistry.findView(