Skip to content

Commit

Permalink
Add config option to disable context propagation for specified rabbit…
Browse files Browse the repository at this point in the history
… queues and exchanges (#2955)

* rabbitmq queue context propagation disabled progress

* consumer test

* comment changes

* add link to TODO
  • Loading branch information
lizapressman authored Jul 28, 2021
1 parent 9e4d566 commit 09a790c
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand Down Expand Up @@ -162,7 +163,10 @@ public static void setResourceNameAddHeaders(
Map<String, Object> headers = props.getHeaders();
headers = (headers == null) ? new HashMap<String, Object>() : new HashMap<>(headers);

propagate().inject(span, headers, SETTER);
if (Config.get().isRabbitPropagationEnabled()
&& !Config.get().getRabbitPropagationDisabledExchanges().contains(exchange)) {
propagate().inject(span, headers, SETTER);
}

props =
new AMQP.BasicProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand All @@ -35,10 +36,12 @@ public class TracedDelegatingConsumer implements Consumer {
private final String queue;
private final Consumer delegate;
private final boolean traceStartTimeEnabled;
private final boolean propagate;

public TracedDelegatingConsumer(
final String queue, final Consumer delegate, boolean traceStartTimeEnabled) {
this.queue = queue;
this.propagate = !Config.get().getRabbitPropagationDisabledQueues().contains(queue);
this.delegate = delegate;
this.traceStartTimeEnabled = traceStartTimeEnabled;
}
Expand Down Expand Up @@ -79,12 +82,17 @@ public void handleDelivery(
try {
final Map<String, Object> headers = properties.getHeaders();
final Context context =
headers == null ? null : propagate().extract(headers, ContextVisitors.objectValuesMap());
(headers == null || !propagate)
? null
: propagate().extract(headers, ContextVisitors.objectValuesMap());

// TODO: check dynamically bound queues -
// https://github.com/DataDog/dd-trace-java/pull/2955#discussion_r677787875
final AgentSpan span =
startSpan(AMQP_COMMAND, context)
.setTag(MESSAGE_SIZE, body == null ? 0 : body.length)
.setMeasured(true);

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onDeliver(span, queue, envelope);
final long spanStartTime = NANOSECONDS.toMillis(span.getStartTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.rabbitmq.client.GetResponse
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.TraceInstrumentationConfig
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.DDSpan
Expand Down Expand Up @@ -35,7 +36,7 @@ class RabbitMQTest extends AgentTestRunner {
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
*/
@Shared
def rabbbitMQContainer
def rabbitMQContainer
@Shared
def defaultRabbitMQPort = 5672
@Shared
Expand Down Expand Up @@ -66,23 +67,23 @@ class RabbitMQTest extends AgentTestRunner {
and we use 'testcontainers' for this.
*/
if ("true" != System.getenv("CI")) {
rabbbitMQContainer = new GenericContainer('rabbitmq:latest')
rabbitMQContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(defaultRabbitMQPort)
.withStartupTimeout(Duration.ofSeconds(120))
// .withLogConsumer { output ->
// print output.utf8String
// }
rabbbitMQContainer.start()
rabbitMQContainer.start()
rabbitmqAddress = new InetSocketAddress(
rabbbitMQContainer.containerIpAddress,
rabbbitMQContainer.getMappedPort(defaultRabbitMQPort)
rabbitMQContainer.containerIpAddress,
rabbitMQContainer.getMappedPort(defaultRabbitMQPort)
)
}
}

def cleanupSpec() {
if (rabbbitMQContainer) {
rabbbitMQContainer.stop()
if (rabbitMQContainer) {
rabbitMQContainer.stop()
}
}

Expand Down Expand Up @@ -346,6 +347,122 @@ class RabbitMQTest extends AgentTestRunner {
}
}
def "test rabbit publish/get with given disabled queue (producer side)"() {
setup:
injectSysConfig(TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_EXCHANGES, config)
when:
runUnderTrace("parent") {
channel.queueDeclare(queueName, false, true, true, null)
channel.exchangeDeclare(exchangeName, "direct", false)
channel.queueBind(queueName, exchangeName, routingKey)
channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes())
}
GetResponse response = channel.basicGet(queueName, true)
then:
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(2) {
trace(5) {
span {
operationName "parent"
tags {
defaultTags()
}
}
rabbitSpan(it, "basic.publish $exchangeName -> $routingKey", false, span(0))
rabbitSpan(it, "queue.bind", false, span(0))
rabbitSpan(it, "exchange.declare", false, span(0))
rabbitSpan(it, "queue.declare", false, span(0))
}
if (nullParent) {
trace(1) {
span {
parentId(0 as BigInteger)
resourceName("basic.get $queueName")
serviceName("rabbitmq")
operationName("amqp.command")
spanType(DDSpanTypes.MESSAGE_CONSUMER)
errored(false)
}
}
} else {
trace(1) {
rabbitSpan(it, "basic.get $queueName", true, trace(0)[1])
}
}
}
where:
exchangeName | routingKey | queueName | config | nullParent
"some-exchange" | "some-routing-key" | "queueNameTest" | "queueNameTest" | false
"some-exchange" | "some-routing-key" | "queueNameTest" | "some-exchange" | true
"some-exchange" | "some-routing-key" | "queueNameTest" | "" | false
}
def "test rabbit publish/get with given disabled queue (consumer side)"() {
setup:
injectSysConfig(TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_QUEUES, config)
channel.exchangeDeclare(exchangeName, "direct", false)
channel.queueDeclare(queueName, false, true, true, null)
channel.queueBind(queueName, exchangeName, routingKey)
Consumer callback = new DefaultConsumer(channel)
channel.basicConsume(queueName, callback)
channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes())
expect:
assertTraces(6) {
trace(1) {
rabbitSpan(it, "exchange.declare", false)
}
trace(1) {
rabbitSpan(it, "queue.declare", false)
}
trace(1) {
rabbitSpan(it, "queue.bind", false)
}
trace(1) {
rabbitSpan(it, "basic.consume")
}
trace(1) {
rabbitSpan(it, "basic.publish $exchangeName -> $routingKey", false)
}
if (nullParent) {
trace(1) {
span {
parentId(0 as BigInteger)
resourceName("basic.deliver $queueName")
serviceName("rabbitmq")
operationName("amqp.command")
spanType(DDSpanTypes.MESSAGE_CONSUMER)
errored(false)
}
}
} else {
trace(1) {
span {
resourceName("basic.deliver $queueName")
serviceName("rabbitmq")
operationName("amqp.command")
spanType(DDSpanTypes.MESSAGE_CONSUMER)
errored(false)
parentId(trace(4)[0].spanId.toString().toBigInteger())
}
}
}
}
where:
exchangeName | routingKey | queueName | config | nullParent
"some-exchange" | "some-routing-key" | "queueNameTest" | "queueNameTest" | true
"some-exchange" | "some-routing-key" | "queueNameTest" | "some-exchange" | false
"some-exchange" | "some-routing-key" | "queueNameTest" | "" | false
}
def rabbitSpan(
TraceAssert trace,
String resource,
Expand Down Expand Up @@ -407,7 +524,7 @@ class RabbitMQTest extends AgentTestRunner {
case "basic.get":
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"amqp.command" "basic.get"
"amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") }
"amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") || it == "queueNameTest" }
"message.size" { it == null || it instanceof Integer }
break
case "basic.deliver":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class ConfigDefaults {

static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true;
static final boolean DEFAULT_JMS_PROPAGATION_ENABLED = true;
static final boolean DEFAULT_RABBIT_PROPAGATION_ENABLED = true;

static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false;
static final String DEFAULT_TRACE_ANNOTATIONS = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public final class TraceInstrumentationConfig {
public static final String JMS_PROPAGATION_DISABLED_TOPICS = "jms.propagation.disabled.topics";
public static final String JMS_PROPAGATION_DISABLED_QUEUES = "jms.propagation.disabled.queues";

public static final String RABBIT_PROPAGATION_ENABLED = "rabbit.propagation.enabled";
public static final String RABBIT_PROPAGATION_DISABLED_QUEUES =
"rabbit.propagation.disabled.queues";
public static final String RABBIT_PROPAGATION_DISABLED_EXCHANGES =
"rabbit.propagation.disabled.exchanges";

public static final String GRPC_IGNORED_OUTBOUND_METHODS = "trace.grpc.ignored.outbound.methods";
public static final String GRPC_SERVER_TRIM_PACKAGE_RESOURCE =
"trace.grpc.server.trim-package-resource";
Expand Down
35 changes: 35 additions & 0 deletions internal-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_EXTRACT_LOG_HEADER_NAMES_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_STYLE_EXTRACT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_PROPAGATION_STYLE_INJECT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_RABBIT_PROPAGATION_ENABLED;
import static datadog.trace.api.ConfigDefaults.DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION;
import static datadog.trace.api.ConfigDefaults.DEFAULT_SCOPE_DEPTH_LIMIT;
import static datadog.trace.api.ConfigDefaults.DEFAULT_SERIALVERSIONUID_FIELD_INJECTION;
Expand Down Expand Up @@ -151,6 +152,9 @@
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_MDC_TAGS_INJECTION_ENABLED;
import static datadog.trace.api.config.TraceInstrumentationConfig.OSGI_SEARCH_DEPTH;
import static datadog.trace.api.config.TraceInstrumentationConfig.PLAY_REPORT_HTTP_STATUS;
import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_EXCHANGES;
import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_DISABLED_QUEUES;
import static datadog.trace.api.config.TraceInstrumentationConfig.RABBIT_PROPAGATION_ENABLED;
import static datadog.trace.api.config.TraceInstrumentationConfig.RESOLVER_USE_LOADCLASS;
import static datadog.trace.api.config.TraceInstrumentationConfig.RUNTIME_CONTEXT_FIELD_INJECTION;
import static datadog.trace.api.config.TraceInstrumentationConfig.SERIALVERSIONUID_FIELD_INJECTION;
Expand Down Expand Up @@ -387,6 +391,10 @@ public class Config {
private final Set<String> jmsPropagationDisabledTopics;
private final Set<String> jmsPropagationDisabledQueues;

private final boolean RabbitPropagationEnabled;
private final Set<String> RabbitPropagationDisabledQueues;
private final Set<String> RabbitPropagationDisabledExchanges;

private final boolean hystrixTagsEnabled;
private final boolean hystrixMeasuredEnabled;

Expand Down Expand Up @@ -811,6 +819,15 @@ && isJavaVersionAtLeast(8)
jmsPropagationDisabledQueues =
tryMakeImmutableSet(configProvider.getList(JMS_PROPAGATION_DISABLED_QUEUES));

RabbitPropagationEnabled =
configProvider.getBoolean(RABBIT_PROPAGATION_ENABLED, DEFAULT_RABBIT_PROPAGATION_ENABLED);

RabbitPropagationDisabledQueues =
tryMakeImmutableSet(configProvider.getList(RABBIT_PROPAGATION_DISABLED_QUEUES));

RabbitPropagationDisabledExchanges =
tryMakeImmutableSet(configProvider.getList(RABBIT_PROPAGATION_DISABLED_EXCHANGES));

grpcIgnoredOutboundMethods =
tryMakeImmutableSet(configProvider.getList(GRPC_IGNORED_OUTBOUND_METHODS));
grpcServerTrimPackageResource =
Expand Down Expand Up @@ -1253,6 +1270,18 @@ public boolean isKafkaClientBase64DecodingEnabled() {
return kafkaClientBase64DecodingEnabled;
}

public boolean isRabbitPropagationEnabled() {
return RabbitPropagationEnabled;
}

public Set<String> getRabbitPropagationDisabledQueues() {
return RabbitPropagationDisabledQueues;
}

public Set<String> getRabbitPropagationDisabledExchanges() {
return RabbitPropagationDisabledExchanges;
}

public boolean isHystrixTagsEnabled() {
return hystrixTagsEnabled;
}
Expand Down Expand Up @@ -1992,6 +2021,12 @@ public String toString() {
+ jmsPropagationDisabledTopics
+ ", jmsPropagationDisabledQueues="
+ jmsPropagationDisabledQueues
+ ", RabbitPropagationEnabled="
+ RabbitPropagationEnabled
+ ", RabbitPropagationDisabledQueues="
+ RabbitPropagationDisabledQueues
+ ", RabbitPropagationDisabledExchanges="
+ RabbitPropagationDisabledExchanges
+ ", hystrixTagsEnabled="
+ hystrixTagsEnabled
+ ", hystrixMeasuredEnabled="
Expand Down

0 comments on commit 09a790c

Please sign in to comment.