diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index 6c2b60e5db5..fcc43badc8c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -32,6 +32,7 @@ import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.MessageProperties; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.Config; import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -162,7 +163,10 @@ public static void setResourceNameAddHeaders( Map<String, Object> headers = props.getHeaders(); headers = (headers == null) ? new HashMap<String, Object>() : new HashMap<>(headers); - propagate().inject(span, headers, SETTER); + if (Config.get().isRabbitPropagationEnabled() + && !Config.get().getRabbitPropagationDisabledExchanges().contains(exchange)) { + propagate().inject(span, headers, SETTER); + } props = new AMQP.BasicProperties( diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java index 13ce7acb6c7..919e0fc1afc 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -15,6 +15,7 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; +import datadog.trace.api.Config; import datadog.trace.api.DDTags; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -35,10 +36,12 @@ public class TracedDelegatingConsumer implements Consumer { private final String queue; private final Consumer delegate; private final boolean traceStartTimeEnabled; + private final boolean propagate; public TracedDelegatingConsumer( final String queue, final Consumer delegate, boolean traceStartTimeEnabled) { this.queue = queue; + this.propagate = !Config.get().getRabbitPropagationDisabledQueues().contains(queue); this.delegate = delegate; this.traceStartTimeEnabled = traceStartTimeEnabled; } @@ -79,12 +82,17 @@ public void handleDelivery( try { final Map<String, Object> headers = properties.getHeaders(); final Context context = - headers == null ? null : propagate().extract(headers, ContextVisitors.objectValuesMap()); + (headers == null || !propagate) + ? null + : propagate().extract(headers, ContextVisitors.objectValuesMap()); + // TODO: check dynamically bound queues - + // https://github.com/DataDog/dd-trace-java/pull/2955#discussion_r677787875 final AgentSpan span = startSpan(AMQP_COMMAND, context) .setTag(MESSAGE_SIZE, body == null ? 0 : body.length) .setMeasured(true); + CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onDeliver(span, queue, envelope); final long spanStartTime = NANOSECONDS.toMillis(span.getStartTime()); diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 7c416397de0..04ef2b56f38 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -8,6 +8,7 @@ import com.rabbitmq.client.GetResponse import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.api.DDSpanTypes +import datadog.trace.api.config.TraceInstrumentationConfig import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.core.DDSpan @@ -35,7 +36,7 @@ class RabbitMQTest extends AgentTestRunner { 'testcontainers' are built for Java 8 and Java 7 cannot load this class. */ @Shared - def rabbbitMQContainer + def rabbitMQContainer @Shared def defaultRabbitMQPort = 5672 @Shared @@ -66,23 +67,23 @@ class RabbitMQTest extends AgentTestRunner { and we use 'testcontainers' for this. */ if ("true" != System.getenv("CI")) { - rabbbitMQContainer = new GenericContainer('rabbitmq:latest') + rabbitMQContainer = new GenericContainer('rabbitmq:latest') .withExposedPorts(defaultRabbitMQPort) .withStartupTimeout(Duration.ofSeconds(120)) // .withLogConsumer { output -> // print output.utf8String // } - rabbbitMQContainer.start() + rabbitMQContainer.start() rabbitmqAddress = new InetSocketAddress( - rabbbitMQContainer.containerIpAddress, - rabbbitMQContainer.getMappedPort(defaultRabbitMQPort) + rabbitMQContainer.containerIpAddress, + rabbitMQContainer.getMappedPort(defaultRabbitMQPort) ) } } def cleanupSpec() { - if (rabbbitMQContainer) { - rabbbitMQContainer.stop() + if (rabbitMQContainer) { + rabbitMQContainer.stop() } } @@ -346,6 +347,122 @@ class RabbitMQTest extends AgentTestRunner { } } + def "test rabbit publish/get with given disabled queue (producer side)"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_EXCHANGES, config) + + when: + runUnderTrace("parent") { + channel.queueDeclare(queueName, false, true, true, null) + channel.exchangeDeclare(exchangeName, "direct", false) + channel.queueBind(queueName, exchangeName, routingKey) + channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + } + GetResponse response = channel.basicGet(queueName, true) + + then: + new String(response.getBody()) == "Hello, world!" + + and: + assertTraces(2) { + trace(5) { + span { + operationName "parent" + tags { + defaultTags() + } + } + rabbitSpan(it, "basic.publish $exchangeName -> $routingKey", false, span(0)) + rabbitSpan(it, "queue.bind", false, span(0)) + rabbitSpan(it, "exchange.declare", false, span(0)) + rabbitSpan(it, "queue.declare", false, span(0)) + } + if (nullParent) { + trace(1) { + span { + parentId(0 as BigInteger) + resourceName("basic.get $queueName") + serviceName("rabbitmq") + operationName("amqp.command") + spanType(DDSpanTypes.MESSAGE_CONSUMER) + errored(false) + } + } + } else { + trace(1) { + rabbitSpan(it, "basic.get $queueName", true, trace(0)[1]) + } + } + } + + where: + exchangeName | routingKey | queueName | config | nullParent + "some-exchange" | "some-routing-key" | "queueNameTest" | "queueNameTest" | false + "some-exchange" | "some-routing-key" | "queueNameTest" | "some-exchange" | true + "some-exchange" | "some-routing-key" | "queueNameTest" | "" | false + } + + def "test rabbit publish/get with given disabled queue (consumer side)"() { + setup: + injectSysConfig(TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_QUEUES, config) + + channel.exchangeDeclare(exchangeName, "direct", false) + channel.queueDeclare(queueName, false, true, true, null) + channel.queueBind(queueName, exchangeName, routingKey) + + Consumer callback = new DefaultConsumer(channel) + channel.basicConsume(queueName, callback) + channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + + expect: + assertTraces(6) { + trace(1) { + rabbitSpan(it, "exchange.declare", false) + } + trace(1) { + rabbitSpan(it, "queue.declare", false) + } + trace(1) { + rabbitSpan(it, "queue.bind", false) + } + trace(1) { + rabbitSpan(it, "basic.consume") + } + trace(1) { + rabbitSpan(it, "basic.publish $exchangeName -> $routingKey", false) + } + if (nullParent) { + trace(1) { + span { + parentId(0 as BigInteger) + resourceName("basic.deliver $queueName") + serviceName("rabbitmq") + operationName("amqp.command") + spanType(DDSpanTypes.MESSAGE_CONSUMER) + errored(false) + } + } + } else { + trace(1) { + span { + resourceName("basic.deliver $queueName") + serviceName("rabbitmq") + operationName("amqp.command") + spanType(DDSpanTypes.MESSAGE_CONSUMER) + errored(false) + parentId(trace(4)[0].spanId.toString().toBigInteger()) + } + } + } + } + + where: + exchangeName | routingKey | queueName | config | nullParent + "some-exchange" | "some-routing-key" | "queueNameTest" | "queueNameTest" | true + "some-exchange" | "some-routing-key" | "queueNameTest" | "some-exchange" | false + "some-exchange" | "some-routing-key" | "queueNameTest" | "" | false + } + def rabbitSpan( TraceAssert trace, String resource, @@ -407,7 +524,7 @@ class RabbitMQTest extends AgentTestRunner { case "basic.get": "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER "amqp.command" "basic.get" - "amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") } + "amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") || it == "queueNameTest" } "message.size" { it == null || it instanceof Integer } break case "basic.deliver": diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 0c5ed9516ae..03621bf7969 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -76,6 +76,7 @@ public final class ConfigDefaults { static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true; static final boolean DEFAULT_JMS_PROPAGATION_ENABLED = true; + static final boolean DEFAULT_RABBIT_PROPAGATION_ENABLED = true; static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false; static final String DEFAULT_TRACE_ANNOTATIONS = null; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index 897cf866132..d1ef951e2a1 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -50,6 +50,12 @@ public final class TraceInstrumentationConfig { public static final String JMS_PROPAGATION_DISABLED_TOPICS = "jms.propagation.disabled.topics"; public static final String JMS_PROPAGATION_DISABLED_QUEUES = "jms.propagation.disabled.queues"; + public static final String RABBIT_PROPAGATION_ENABLED = "rabbit.propagation.enabled"; + public static final String RABBIT_PROPAGATION_DISABLED_QUEUES = + "rabbit.propagation.disabled.queues"; + public static final String RABBIT_PROPAGATION_DISABLED_EXCHANGES = + "rabbit.propagation.disabled.exchanges"; + public static final String GRPC_IGNORED_OUTBOUND_METHODS = "trace.grpc.ignored.outbound.methods"; public static final String GRPC_SERVER_TRIM_PACKAGE_RESOURCE = "trace.grpc.server.trim-package-resource"; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 551a1e363fd..5efc276aea9 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -40,6 +40,7 @@ import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_EXTRACT_LOG_HEADER_NAMES_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_STYLE_EXTRACT; import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_STYLE_INJECT; +import static datadog.trace.api.ConfigDefaults.DEFAULT_RABBIT_PROPAGATION_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION; import static datadog.trace.api.ConfigDefaults.DEFAULT_SCOPE_DEPTH_LIMIT; import static datadog.trace.api.ConfigDefaults.DEFAULT_SERIALVERSIONUID_FIELD_INJECTION; @@ -151,6 +152,9 @@ import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_MDC_TAGS_INJECTION_ENABLED; import static datadog.trace.api.config.TraceInstrumentationConfig.OSGI_SEARCH_DEPTH; import static datadog.trace.api.config.TraceInstrumentationConfig.PLAY_REPORT_HTTP_STATUS; +import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_EXCHANGES; +import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_QUEUES; +import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_ENABLED; import static datadog.trace.api.config.TraceInstrumentationConfig.RESOLVER_USE_LOADCLASS; import static datadog.trace.api.config.TraceInstrumentationConfig.RUNTIME_CONTEXT_FIELD_INJECTION; import static datadog.trace.api.config.TraceInstrumentationConfig.SERIALVERSIONUID_FIELD_INJECTION; @@ -387,6 +391,10 @@ public class Config { private final Set<String> jmsPropagationDisabledTopics; private final Set<String> jmsPropagationDisabledQueues; + private final boolean RabbitPropagationEnabled; + private final Set<String> RabbitPropagationDisabledQueues; + private final Set<String> RabbitPropagationDisabledExchanges; + private final boolean hystrixTagsEnabled; private final boolean hystrixMeasuredEnabled; @@ -811,6 +819,15 @@ && isJavaVersionAtLeast(8) jmsPropagationDisabledQueues = tryMakeImmutableSet(configProvider.getList(JMS_PROPAGATION_DISABLED_QUEUES)); + RabbitPropagationEnabled = + configProvider.getBoolean(RABBIT_PROPAGATION_ENABLED, DEFAULT_RABBIT_PROPAGATION_ENABLED); + + RabbitPropagationDisabledQueues = + tryMakeImmutableSet(configProvider.getList(RABBIT_PROPAGATION_DISABLED_QUEUES)); + + RabbitPropagationDisabledExchanges = + tryMakeImmutableSet(configProvider.getList(RABBIT_PROPAGATION_DISABLED_EXCHANGES)); + grpcIgnoredOutboundMethods = tryMakeImmutableSet(configProvider.getList(GRPC_IGNORED_OUTBOUND_METHODS)); grpcServerTrimPackageResource = @@ -1253,6 +1270,18 @@ public boolean isKafkaClientBase64DecodingEnabled() { return kafkaClientBase64DecodingEnabled; } + public boolean isRabbitPropagationEnabled() { + return RabbitPropagationEnabled; + } + + public Set<String> getRabbitPropagationDisabledQueues() { + return RabbitPropagationDisabledQueues; + } + + public Set<String> getRabbitPropagationDisabledExchanges() { + return RabbitPropagationDisabledExchanges; + } + public boolean isHystrixTagsEnabled() { return hystrixTagsEnabled; } @@ -1992,6 +2021,12 @@ public String toString() { + jmsPropagationDisabledTopics + ", jmsPropagationDisabledQueues=" + jmsPropagationDisabledQueues + + ", RabbitPropagationEnabled=" + + RabbitPropagationEnabled + + ", RabbitPropagationDisabledQueues=" + + RabbitPropagationDisabledQueues + + ", RabbitPropagationDisabledExchanges=" + + RabbitPropagationDisabledExchanges + ", hystrixTagsEnabled=" + hystrixTagsEnabled + ", hystrixMeasuredEnabled="