Skip to content

Commit

Permalink
introduction of View and first version of LabelsProessor
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Polyakov committed Feb 11, 2021
1 parent 339bbb8 commit c02d042
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;

abstract class AbstractAccumulator {
Expand All @@ -25,9 +26,17 @@ static <T> Aggregator<T> getAggregator(
return meterProviderSharedState
.getViewRegistry()
.findView(descriptor)
.getAggregatorFactory()
.create(
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
descriptor);
}

static LabelsProcessor getLabelsProcessor(MeterProviderSharedState meterProviderSharedState,
InstrumentDescriptor descriptor) {
return meterProviderSharedState.getViewRegistry().findView(descriptor)
.getLabelsProcessorFactory().create();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -102,6 +104,12 @@ public static SdkMeterProviderBuilder builder() {
* }</pre>
*/
public void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory) {
sharedState.getViewRegistry().registerView(selector, aggregatorFactory);
sharedState.getViewRegistry().registerView(selector, View.builder().setAggregatorFactory(aggregatorFactory).build());
}
public void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory,
LabelsProcessorFactory labelsProcessorFactory) {
sharedState.getViewRegistry().registerView(selector, View.builder()
.setAggregatorFactory(aggregatorFactory)
.setLabelsProcessorFactory(labelsProcessorFactory).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package io.opentelemetry.sdk.metrics;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -21,6 +23,7 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
private final ReentrantLock collectLock;
private final Aggregator<T> aggregator;
private final InstrumentProcessor<T> instrumentProcessor;
private final LabelsProcessor labelsProcessor;

static <T> SynchronousInstrumentAccumulator<T> create(
MeterProviderSharedState meterProviderSharedState,
Expand All @@ -30,19 +33,22 @@ static <T> SynchronousInstrumentAccumulator<T> create(
getAggregator(meterProviderSharedState, meterSharedState, descriptor);
return new SynchronousInstrumentAccumulator<>(
aggregator,
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()));
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()),
getLabelsProcessor(meterProviderSharedState, descriptor));
}

SynchronousInstrumentAccumulator(
Aggregator<T> aggregator, InstrumentProcessor<T> instrumentProcessor) {
Aggregator<T> aggregator, InstrumentProcessor<T> instrumentProcessor, LabelsProcessor labelsProcessor) {
aggregatorLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
this.aggregator = aggregator;
this.instrumentProcessor = instrumentProcessor;
this.labelsProcessor = labelsProcessor;
}

AggregatorHandle<?> bind(Labels labels) {
Objects.requireNonNull(labels, "labels");
labels = labelsProcessor.onLabelsBound(Context.current(), labels);
AggregatorHandle<T> aggregatorHandle = aggregatorLabels.get(labels);
if (aggregatorHandle != null && aggregatorHandle.acquire()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import java.util.EnumMap;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -23,15 +24,15 @@
* never blocked.
*/
final class ViewRegistry {
private static final LinkedHashMap<Pattern, AggregatorFactory> EMPTY_CONFIG =
private static final LinkedHashMap<Pattern, View> EMPTY_CONFIG =
new LinkedHashMap<>();
static final AggregatorFactory CUMULATIVE_SUM = AggregatorFactory.sum(true);
static final AggregatorFactory SUMMARY = AggregatorFactory.minMaxSumCount();
static final AggregatorFactory LAST_VALUE = AggregatorFactory.lastValue();
static final View CUMULATIVE_SUM = View.builder().setAggregatorFactory(AggregatorFactory.sum(true)).build();
static final View SUMMARY = View.builder().setAggregatorFactory(AggregatorFactory.minMaxSumCount()).build();
static final View LAST_VALUE = View.builder().setAggregatorFactory(AggregatorFactory.lastValue()).build();

// The lock is used to ensure only one updated to the configuration happens at any moment.
private final ReentrantLock lock = new ReentrantLock();
private volatile EnumMap<InstrumentType, LinkedHashMap<Pattern, AggregatorFactory>> configuration;
private volatile EnumMap<InstrumentType, LinkedHashMap<Pattern, View>> configuration;

ViewRegistry() {
this.configuration = new EnumMap<>(InstrumentType.class);
Expand All @@ -43,10 +44,10 @@ final class ViewRegistry {
configuration.put(InstrumentType.VALUE_OBSERVER, EMPTY_CONFIG);
}

void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFactory) {
void registerView(InstrumentSelector selector, View aggregatorFactory) {
lock.lock();
try {
EnumMap<InstrumentType, LinkedHashMap<Pattern, AggregatorFactory>> newConfiguration =
EnumMap<InstrumentType, LinkedHashMap<Pattern, View>> newConfiguration =
new EnumMap<>(configuration);
newConfiguration.put(
selector.getInstrumentType(),
Expand All @@ -60,10 +61,10 @@ void registerView(InstrumentSelector selector, AggregatorFactory aggregatorFacto
}
}

AggregatorFactory findView(InstrumentDescriptor descriptor) {
LinkedHashMap<Pattern, AggregatorFactory> configPerType =
View findView(InstrumentDescriptor descriptor) {
LinkedHashMap<Pattern, View> configPerType =
configuration.get(descriptor.getType());
for (Map.Entry<Pattern, AggregatorFactory> entry : configPerType.entrySet()) {
for (Map.Entry<Pattern, View> entry : configPerType.entrySet()) {
if (entry.getKey().matcher(descriptor.getName()).matches()) {
return entry.getValue();
}
Expand All @@ -72,7 +73,7 @@ AggregatorFactory findView(InstrumentDescriptor descriptor) {
return getDefaultSpecification(descriptor);
}

private static AggregatorFactory getDefaultSpecification(InstrumentDescriptor descriptor) {
private static View getDefaultSpecification(InstrumentDescriptor descriptor) {
switch (descriptor.getType()) {
case COUNTER:
case UP_DOWN_COUNTER:
Expand All @@ -87,11 +88,11 @@ private static AggregatorFactory getDefaultSpecification(InstrumentDescriptor de
throw new IllegalArgumentException("Unknown descriptor type: " + descriptor.getType());
}

private static LinkedHashMap<Pattern, AggregatorFactory> newLinkedHashMap(
private static LinkedHashMap<Pattern, View> newLinkedHashMap(
Pattern pattern,
AggregatorFactory aggregatorFactory,
LinkedHashMap<Pattern, AggregatorFactory> parentConfiguration) {
LinkedHashMap<Pattern, AggregatorFactory> result = new LinkedHashMap<>();
View aggregatorFactory,
LinkedHashMap<Pattern, View> parentConfiguration) {
LinkedHashMap<Pattern, View> result = new LinkedHashMap<>();
result.put(pattern, aggregatorFactory);
result.putAll(parentConfiguration);
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.api.metrics.common.LabelsBuilder;
import io.opentelemetry.context.Context;

public class BaggageLabelsProcessor implements LabelsProcessor {
private final BaggageMetricsLabelsExtractor baggageMetricsLabelsExtractor;

public BaggageLabelsProcessor(BaggageMetricsLabelsExtractor baggageMetricsLabelsExtractor) {
this.baggageMetricsLabelsExtractor = baggageMetricsLabelsExtractor;
}

@Override
public Labels onLabelsBound(Context ctx, Labels labels) {
LabelsBuilder labelsBuilder = labels.toBuilder();
baggageMetricsLabelsExtractor.fromBaggage(ctx).forEach(labelsBuilder::put);

return labelsBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;

public interface BaggageMetricsLabelsExtractor {

Labels fromBaggage(Context ctx);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;

public interface LabelsProcessor {
/**
* Called when bind() method is called. Allows to manipulate labels which this
* instrument is bound to. Particular use case includes enriching lables and/or adding more labels
* depending on the Context
*
* @param ctx context of the operation
* @param labels immutable labels. When processors are chained output labels of the previous one is passed as
* an input to the next one. Last labels returned by a chain of processors are used for bind() operation.
* @return labels to be used as an input to the next processor in chain or bind() operation if this is the last processor
*/
Labels onLabelsBound(Context ctx, Labels labels);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.opentelemetry.sdk.metrics.processor;

public interface LabelsProcessorFactory {
static LabelsProcessorFactory noop() {
return NoopLabelsProcessor::new;
}
static LabelsProcessorFactory baggageExtractor(BaggageMetricsLabelsExtractor labelsExtractor) {
return () -> new BaggageLabelsProcessor(labelsExtractor);
}

/**
* Returns a new {@link LabelsProcessorFactory}
*
* @return new {@link LabelsProcessorFactory}
*/
LabelsProcessor create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;

public class NoopLabelsProcessor implements LabelsProcessor {

@Override
public Labels onLabelsBound(Context c, Labels labels) {
return labels;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.opentelemetry.sdk.metrics.view;

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import javax.annotation.concurrent.Immutable;

@AutoValue
@Immutable
public abstract class View {
public abstract AggregatorFactory getAggregatorFactory();

public abstract LabelsProcessorFactory getLabelsProcessorFactory();

public static Builder builder() {
return new AutoValue_View.Builder().setLabelsProcessorFactory(LabelsProcessorFactory.noop());
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setAggregatorFactory(AggregatorFactory aggregatorFactory);
public abstract Builder setLabelsProcessorFactory(LabelsProcessorFactory labelsProcessorFactory);

public abstract View build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;

import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.metrics.common.Labels;
Expand All @@ -28,12 +32,13 @@ public class SynchronousInstrumentAccumulatorTest {
AggregatorFactory.lastValue()
.create(
Resource.getEmpty(), InstrumentationLibraryInfo.create("test", "1.0"), DESCRIPTOR);
private final LabelsProcessor labelsProcessor = LabelsProcessorFactory.noop().create();

@Test
void sameAggregator_ForSameLabelSet() {
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()));
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), labelsProcessor);
AggregatorHandle<?> aggregatorHandle = accumulator.bind(Labels.of("K", "V"));
AggregatorHandle<?> duplicateAggregatorHandle = accumulator.bind(Labels.of("K", "V"));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.sdk.metrics.view.View;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
Expand All @@ -18,14 +20,15 @@ class ViewRegistryTest {
@Test
void selection_onType() {
AggregatorFactory factory = AggregatorFactory.lastValue();
View view = View.builder().setAggregatorFactory(factory).build();

ViewRegistry viewRegistry = new ViewRegistry();
viewRegistry.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.COUNTER)
.setInstrumentNameRegex(".*")
.build(),
factory);
view);
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
Expand All @@ -42,14 +45,15 @@ void selection_onType() {
@Test
void selection_onName() {
AggregatorFactory factory = AggregatorFactory.lastValue();
View view = View.builder().setAggregatorFactory(factory).build();

ViewRegistry viewRegistry = new ViewRegistry();
viewRegistry.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.COUNTER)
.setInstrumentNameRegex("overridden")
.build(),
factory);
view);
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
Expand All @@ -66,21 +70,23 @@ void selection_onName() {
@Test
void selection_LastAddedViewWins() {
AggregatorFactory factory1 = AggregatorFactory.lastValue();
View view1 = View.builder().setAggregatorFactory(factory1).build();
AggregatorFactory factory2 = AggregatorFactory.minMaxSumCount();
View view2 = View.builder().setAggregatorFactory(factory2).build();

ViewRegistry viewRegistry = new ViewRegistry();
viewRegistry.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.COUNTER)
.setInstrumentNameRegex(".*")
.build(),
factory1);
view1);
viewRegistry.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.COUNTER)
.setInstrumentNameRegex("overridden")
.build(),
factory2);
view2);

assertThat(
viewRegistry.findView(
Expand All @@ -97,14 +103,15 @@ void selection_LastAddedViewWins() {
@Test
void selection_regex() {
AggregatorFactory factory = AggregatorFactory.lastValue();
View view = View.builder().setAggregatorFactory(factory).build();

ViewRegistry viewRegistry = new ViewRegistry();
viewRegistry.registerView(
InstrumentSelector.builder()
.setInstrumentNameRegex("overrid(es|den)")
.setInstrumentType(InstrumentType.COUNTER)
.build(),
factory);
view);

assertThat(
viewRegistry.findView(
Expand Down

0 comments on commit c02d042

Please sign in to comment.