Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[baggage-processor] Add BaggageLogRecordProcessor #1576

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.baggage.processor;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import java.util.function.Predicate;

/**
* This log record processor copies attributes stored in {@link Baggage} into each newly created log
* record.
*/
public class BaggageLogRecordProcessor implements LogRecordProcessor {

/**
* Creates a new {@link BaggageLogRecordProcessor} that copies all baggage entries into the newly
* created log record.
*/
public static BaggageLogRecordProcessor allowAllBaggageKeys() {
return new BaggageLogRecordProcessor(baggageKey -> true);
}

private final Predicate<String> baggageKeyPredicate;

/**
* Creates a new {@link BaggageLogRecordProcessor} that copies only baggage entries with keys that
* pass the provided filter into the newly created log record.
*/
public BaggageLogRecordProcessor(Predicate<String> baggageKeyPredicate) {
this.baggageKeyPredicate = baggageKeyPredicate;
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
Baggage.fromContext(context)
.forEach(
(s, baggageEntry) -> {
if (baggageKeyPredicate.test(s)) {
logRecord.setAttribute(AttributeKey.stringKey(s), baggageEntry.getValue());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.baggage.processor;

import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.util.List;

public class BaggageProcessorCustomizer implements AutoConfigurationCustomizerProvider {
@Override
public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {
autoConfigurationCustomizer
.addTracerProviderCustomizer(
(sdkTracerProviderBuilder, config) -> {
addSpanProcessor(sdkTracerProviderBuilder, config);
return sdkTracerProviderBuilder;
})
.addLoggerProviderCustomizer(
(sdkLoggerProviderBuilder, config) -> {
addLogRecordProcessor(sdkLoggerProviderBuilder, config);
return sdkLoggerProviderBuilder;
});
}

private static void addSpanProcessor(
SdkTracerProviderBuilder sdkTracerProviderBuilder, ConfigProperties config) {
List<String> keys =
config.getList("otel.java.experimental.span-attributes.copy-from-baggage.include");

if (keys.isEmpty()) {
return;
}

sdkTracerProviderBuilder.addSpanProcessor(createBaggageSpanProcessor(keys));
}

static BaggageSpanProcessor createBaggageSpanProcessor(List<String> keys) {
if (keys.size() == 1 && keys.get(0).equals("*")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract isWildcard method to be reused for logger

return BaggageSpanProcessor.allowAllBaggageKeys();
}
return new BaggageSpanProcessor(keys::contains);
}

private static void addLogRecordProcessor(
SdkLoggerProviderBuilder sdkLoggerProviderBuilder, ConfigProperties config) {
List<String> keys =
config.getList("otel.java.experimental.log-attributes.copy-from-baggage.include");

if (keys.isEmpty()) {
return;
}

sdkLoggerProviderBuilder.addLogRecordProcessor(createBaggageLogRecordProcessor(keys));
}

static BaggageLogRecordProcessor createBaggageLogRecordProcessor(List<String> keys) {
if (keys.size() == 1 && keys.get(0).equals("*")) {
return BaggageLogRecordProcessor.allowAllBaggageKeys();
}
return new BaggageLogRecordProcessor(keys::contains);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.opentelemetry.contrib.baggage.processor.BaggageSpanProcessorCustomizer
io.opentelemetry.contrib.baggage.processor.BaggageProcessorCustomizer
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.contrib.baggage.processor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;

Expand All @@ -20,14 +21,20 @@
import io.opentelemetry.sdk.autoconfigure.internal.ComponentLoader;
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider;
import io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.testing.assertj.TracesAssert;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -38,57 +45,69 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class BaggageSpanProcessorCustomizerTest {
class BaggageProcessorCustomizerTest {

private static final String MEMORY_EXPORTER = "memory";

@Test
void test_customizer() {
assertCustomizer(Collections.emptyMap(), span -> span.hasTotalAttributeCount(0));
assertCustomizer(
Collections.singletonMap(
"otel.java.experimental.span-attributes.copy-from-baggage.include", "key"),
span -> span.hasAttribute(AttributeKey.stringKey("key"), "value"));
Map<String, String> properties = new HashMap<>();
properties.put("otel.java.experimental.span-attributes.copy-from-baggage.include", "key");
properties.put("otel.java.experimental.log-attributes.copy-from-baggage.include", "key");
// TODO try use
// AttributeAssertion attributeAssertion =
cyrille-leclerc marked this conversation as resolved.
Show resolved Hide resolved
// OpenTelemetryAssertions.equalTo(AttributeKey.stringKey("key"), "value");
assertCustomizer(properties, span -> span.hasAttribute(AttributeKey.stringKey("key"), "value"));
}

private static void assertCustomizer(
Map<String, String> properties, Consumer<SpanDataAssert> spanDataAssertConsumer) {

InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
InMemoryLogRecordExporter logExporter = InMemoryLogRecordExporter.create();

OpenTelemetrySdk sdk = getOpenTelemetrySdk(properties, spanExporter);
OpenTelemetrySdk sdk = getOpenTelemetrySdk(properties, spanExporter, logExporter);
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
sdk.getTracer("test").spanBuilder("test").startSpan().end();
sdk.getLogsBridge().get("test").logRecordBuilder().setBody("test").emit();
}
// TODO verify log record attributes
cyrille-leclerc marked this conversation as resolved.
Show resolved Hide resolved
await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(
() ->
TracesAssert.assertThat(spanExporter.getFinishedSpanItems())
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(spanDataAssertConsumer)));
() -> {
TracesAssert.assertThat(spanExporter.getFinishedSpanItems())
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(spanDataAssertConsumer));
List<LogRecordData> finishedLogRecordItems = logExporter.getFinishedLogRecordItems();
assertThat(finishedLogRecordItems).hasSize(1);
cyrille-leclerc marked this conversation as resolved.
Show resolved Hide resolved
});
}

private static OpenTelemetrySdk getOpenTelemetrySdk(
Map<String, String> properties, InMemorySpanExporter spanExporter) {
SpiHelper spiHelper =
SpiHelper.create(BaggageSpanProcessorCustomizerTest.class.getClassLoader());
Map<String, String> properties,
InMemorySpanExporter spanExporter,
InMemoryLogRecordExporter logRecordExporter) {
SpiHelper spiHelper = SpiHelper.create(BaggageProcessorCustomizerTest.class.getClassLoader());

AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder =
AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(
() ->
ImmutableMap.of(
// We set the export interval of the spans to 100 ms. The default value is 5
// We set the export interval of the spans to 10 ms. The default value is 5
// seconds.
"otel.bsp.schedule.delay",
"otel.bsp.schedule.delay", // span exporter
"10",
"otel.blrp.schedule.delay", // log exporter
"10",
"otel.traces.exporter",
MEMORY_EXPORTER,
"otel.metrics.exporter",
"none",
"otel.logs.exporter",
"none"))
MEMORY_EXPORTER))
.addPropertiesSupplier(() -> properties);
AutoConfigureUtil.setComponentLoader(
sdkBuilder,
Expand All @@ -105,6 +124,20 @@ public SpanExporter createExporter(ConfigProperties configProperties) {
return spanExporter;
}

@Override
public String getName() {
return MEMORY_EXPORTER;
}
});
} else if (spiClass == ConfigurableLogRecordExporterProvider.class) {
return Collections.singletonList(
(T)
new ConfigurableLogRecordExporterProvider() {
@Override
public LogRecordExporter createExporter(ConfigProperties configProperties) {
return logRecordExporter;
}

@Override
public String getName() {
return MEMORY_EXPORTER;
Expand All @@ -120,7 +153,7 @@ public String getName() {
@Test
public void test_baggageSpanProcessor_adds_attributes_to_spans(@Mock ReadWriteSpan span) {
try (BaggageSpanProcessor processor =
BaggageSpanProcessorCustomizer.createProcessor(Collections.singletonList("*"))) {
BaggageProcessorCustomizer.createBaggageSpanProcessor(Collections.singletonList("*"))) {
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
processor.onStart(Context.current(), span);
verify(span).setAttribute("key", "value");
Expand All @@ -132,7 +165,7 @@ public void test_baggageSpanProcessor_adds_attributes_to_spans(@Mock ReadWriteSp
public void test_baggageSpanProcessor_adds_attributes_to_spans_when_key_filter_matches(
@Mock ReadWriteSpan span) {
try (BaggageSpanProcessor processor =
BaggageSpanProcessorCustomizer.createProcessor(Collections.singletonList("key"))) {
BaggageProcessorCustomizer.createBaggageSpanProcessor(Collections.singletonList("key"))) {
try (Scope ignore =
Baggage.current().toBuilder()
.put("key", "value")
Expand All @@ -145,4 +178,36 @@ public void test_baggageSpanProcessor_adds_attributes_to_spans_when_key_filter_m
}
}
}

@Test
public void test_baggageLogRecordProcessor_adds_attributes_to_logRecord(
@Mock ReadWriteLogRecord logRecord) {
try (BaggageLogRecordProcessor processor =
BaggageProcessorCustomizer.createBaggageLogRecordProcessor(
Collections.singletonList("*"))) {
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
processor.onEmit(Context.current(), logRecord);
verify(logRecord).setAttribute(AttributeKey.stringKey("key"), "value");
}
}
}

@Test
public void test_baggageLogRecordProcessor_adds_attributes_to_spans_when_key_filter_matches(
@Mock ReadWriteLogRecord logRecord) {
try (BaggageLogRecordProcessor processor =
BaggageProcessorCustomizer.createBaggageLogRecordProcessor(
Collections.singletonList("key"))) {
try (Scope ignore =
Baggage.current().toBuilder()
.put("key", "value")
.put("other", "value")
.build()
.makeCurrent()) {
processor.onEmit(Context.current(), logRecord);
verify(logRecord).setAttribute(AttributeKey.stringKey("key"), "value");
verify(logRecord, Mockito.never()).setAttribute(AttributeKey.stringKey("other"), "value");
}
}
}
}
Loading