Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

labels processor #2964

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;

abstract class AbstractAccumulator {
Expand All @@ -31,4 +32,18 @@ static <T> Aggregator<T> getAggregator(
meterSharedState.getInstrumentationLibraryInfo(),
descriptor);
}

static LabelsProcessor getLabelsProcessor(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) {
return meterProviderSharedState
.getViewRegistry()
.findView(descriptor)
.getLabelsProcessorFactory()
.create(
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
descriptor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package io.opentelemetry.sdk.metrics;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -21,6 +23,7 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
private final ReentrantLock collectLock;
private final Aggregator<T> aggregator;
private final InstrumentProcessor<T> instrumentProcessor;
private final LabelsProcessor labelsProcessor;

static <T> SynchronousInstrumentAccumulator<T> create(
MeterProviderSharedState meterProviderSharedState,
Expand All @@ -30,19 +33,24 @@ static <T> SynchronousInstrumentAccumulator<T> create(
getAggregator(meterProviderSharedState, meterSharedState, descriptor);
return new SynchronousInstrumentAccumulator<>(
aggregator,
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()));
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()),
getLabelsProcessor(meterProviderSharedState, meterSharedState, descriptor));
}

SynchronousInstrumentAccumulator(
Aggregator<T> aggregator, InstrumentProcessor<T> instrumentProcessor) {
Aggregator<T> aggregator,
InstrumentProcessor<T> instrumentProcessor,
LabelsProcessor labelsProcessor) {
aggregatorLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
this.aggregator = aggregator;
this.instrumentProcessor = instrumentProcessor;
this.labelsProcessor = labelsProcessor;
}

AggregatorHandle<?> bind(Labels labels) {
Objects.requireNonNull(labels, "labels");
labels = labelsProcessor.onLabelsBound(Context.current(), labels);
as-polyakov marked this conversation as resolved.
Show resolved Hide resolved
AggregatorHandle<T> aggregatorHandle = aggregatorLabels.get(labels);
if (aggregatorHandle != null && aggregatorHandle.acquire()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;

/**
* Labels processor is an abstraction to manipulate instrument labels during metrics capture
* process.
*/
public interface LabelsProcessor {
/**
* Called when bound synchronous instrument is created or metrics are recorded for non-bound
* synchronous instrument. Allows to manipulate labels which this instrument is bound to in case
* of binding operation or labels used for recording values in case of non-bound synchronous
* instrument. Particular use case includes enriching labels and/or adding more labels depending
* on the Context
*
* <p>Please note, this is an experimental API. In case of bound instruments, it will be only
* invoked upon instrument binding and not when measurements are recorded.
*
* @param ctx context of the operation
* @param labels immutable labels. When processors are chained output labels of the previous one
* is passed as an input to the next one. Last labels returned by a chain of processors are
* used for bind() operation.
* @return labels to be used as an input to the next processor in chain or bind() operation if
* this is the last processor
*/
Labels onLabelsBound(Context ctx, Labels labels);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.resources.Resource;

public interface LabelsProcessorFactory {
static LabelsProcessorFactory noop() {
return (resource, instrumentationLibraryInfo, descriptor) -> new NoopLabelsProcessor();
}

/**
* Returns a new {@link LabelsProcessorFactory}.
*
* @return new {@link LabelsProcessorFactory}
*/
LabelsProcessor create(
Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor descriptor);
as-polyakov marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.processor;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;

public class NoopLabelsProcessor implements LabelsProcessor {

@Override
public Labels onLabelsBound(Context c, Labels labels) {
return labels;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import javax.annotation.concurrent.Immutable;

/** TODO: javadoc. */
Expand All @@ -15,11 +16,14 @@
public abstract class View {
public abstract AggregatorFactory getAggregatorFactory();

public abstract LabelsProcessorFactory getLabelsProcessorFactory();

public static ViewBuilder builder() {
return new ViewBuilder();
}

static View create(AggregatorFactory aggregatorFactory) {
return new AutoValue_View(aggregatorFactory);
static View create(
AggregatorFactory aggregatorFactory, LabelsProcessorFactory labelsProcessorFactory) {
return new AutoValue_View(aggregatorFactory, labelsProcessorFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,37 @@
package io.opentelemetry.sdk.metrics.view;

import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import java.util.Objects;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;

public final class ViewBuilder {
private AggregatorFactory aggregatorFactory;
private LabelsProcessorFactory labelsProcessorFactory = LabelsProcessorFactory.noop();

ViewBuilder() {}
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

/**
* sets {@link AggregatorFactory}.
*
* @param aggregatorFactory aggregator factory.
* @return this Builder.
*/
public ViewBuilder setAggregatorFactory(AggregatorFactory aggregatorFactory) {
this.aggregatorFactory = Objects.requireNonNull(aggregatorFactory, "aggregatorFactory");
this.aggregatorFactory = aggregatorFactory;
return this;
}

/**
* sets {@link LabelsProcessorFactory}.
*
* @param labelsProcessorFactory labels processor factory.
* @return this Builder.
*/
public ViewBuilder setLabelsProcessorFactory(LabelsProcessorFactory labelsProcessorFactory) {
this.labelsProcessorFactory = labelsProcessorFactory;
return this;
}

public View build() {
return View.create(this.aggregatorFactory);
return View.create(this.aggregatorFactory, this.labelsProcessorFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.TestClock;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
Expand All @@ -16,8 +17,13 @@
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class SynchronousInstrumentAccumulatorTest {
private static final InstrumentDescriptor DESCRIPTOR =
Expand All @@ -27,12 +33,50 @@ public class SynchronousInstrumentAccumulatorTest {
private final Aggregator<Long> aggregator =
AggregatorFactory.lastValue()
.create(Resource.empty(), InstrumentationLibraryInfo.create("test", "1.0"), DESCRIPTOR);
private final LabelsProcessor labelsProcessor =
LabelsProcessorFactory.noop()
.create(Resource.empty(), InstrumentationLibraryInfo.create("test", "1.0"), DESCRIPTOR);

@Test
void labelsProcessor_used() {
LabelsProcessor spyLabelsProcessor = Mockito.spy(this.labelsProcessor);
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), spyLabelsProcessor);
accumulator.bind(Labels.empty());
Mockito.verify(spyLabelsProcessor).onLabelsBound(Context.current(), Labels.empty());
}

@Test
void labelsProcessor_applied() {
final Labels labels = Labels.of("K", "V");
LabelsProcessor labelsProcessor =
new LabelsProcessor() {
@Override
public Labels onLabelsBound(Context ctx, Labels lbls) {
return lbls.toBuilder().put("modifiedK", "modifiedV").build();
}
};
LabelsProcessor spyLabelsProcessor = Mockito.spy(labelsProcessor);
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), spyLabelsProcessor);
AggregatorHandle<?> aggregatorHandle = accumulator.bind(labels);
aggregatorHandle.recordDouble(1);
List<MetricData> md = accumulator.collectAll(testClock.now());
md.stream()
.flatMap(m -> m.getLongGaugeData().getPoints().stream())
.forEach(
p ->
assertThat(p.getLabels().asMap())
.isEqualTo(labels.toBuilder().put("modifiedK", "modifiedV").build().asMap()));
}

@Test
void sameAggregator_ForSameLabelSet() {
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()));
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), labelsProcessor);
AggregatorHandle<?> aggregatorHandle = accumulator.bind(Labels.of("K", "V"));
AggregatorHandle<?> duplicateAggregatorHandle = accumulator.bind(Labels.of("K", "V"));
try {
Expand Down