Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flag to not share trace on consumption #1033

Merged
merged 14 commits into from
Nov 24, 2019
32 changes: 31 additions & 1 deletion instrumentation/kafka-clients/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Add decorators for Kafka producer and consumer to enable tracing.
First, setup the generic Kafka component like this:
```java
kafkaTracing = KafkaTracing.newBuilder(messagingTracing)
.writeB3SingleFormat(true) // for more efficient propagation
.remoteServiceName("my-broker")
.build();
```
Expand Down Expand Up @@ -103,6 +102,37 @@ to trace manually or you can do similar via automatic instrumentation like Aspec
}
```

## Single Root Span on Consumer

When a Tracing Kafka Consumer is processing records that do not have trace-context (i.e. Producer is not tracing)
it will reuse the same root span `poll` to group all processing of records returned.

```
trace 1:
poll
|- processing1
|- processing2
...
+- processing N
```

If this is not the desired behavior, users can customize it by setting `singleRootSpanOnReceiveBatch` to `false`.
This will create a root span `poll` for each record received.

```
trace 1:
poll
jeqo marked this conversation as resolved.
Show resolved Hide resolved
+- processing1

trace 2:
poll
+- processing2
...
trace N:
poll
+- processing N
```

## Notes
* This tracer is only compatible with Kafka versions including headers support ( > 0.11.0).
* More information about "Message Tracing" [here](https://github.com/apache/incubator-zipkin-website/blob/master/pages/instrumenting.md#message-tracing)
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@ public static Builder newBuilder(MessagingTracing messagingTracing) {
return new Builder(messagingTracing);
}

/** @since 5.10 **/
public Builder toBuilder() {
return new Builder(this);
}

public static final class Builder {
final MessagingTracing messagingTracing;
String remoteServiceName = "kafka";
boolean singleRootSpanOnReceiveBatch = true;

Builder(MessagingTracing messagingTracing) {
if (messagingTracing == null) throw new NullPointerException("messagingTracing == null");
this.messagingTracing = messagingTracing;
}

Builder(KafkaTracing kafkaTracing) {
this.messagingTracing = kafkaTracing.messagingTracing;
this.remoteServiceName = kafkaTracing.remoteServiceName;
this.singleRootSpanOnReceiveBatch = kafkaTracing.singleRootSpanOnReceiveBatch;
}

/**
* The remote service name that describes the broker in the dependency graph. Defaults to
* "kafka"
Expand All @@ -72,6 +84,19 @@ public Builder remoteServiceName(String remoteServiceName) {
return this;
}

/**
* Controls the sharing of a poll span for incoming spans with no trace context.
*
* <b/>If true, all the spans received in a poll batch that do not have trace-context will be added
jeqo marked this conversation as resolved.
Show resolved Hide resolved
* to a single new poll root span. Otherwise, a poll span will be created for each such message.
*
* @since 5.10
*/
public Builder singleRootSpanOnReceiveBatch(boolean singleRootSpanOnReceiveBatch) {
this.singleRootSpanOnReceiveBatch = singleRootSpanOnReceiveBatch;
return this;
}

/**
* @deprecated as of v5.9, this is ignored because single format is default for messaging. Use
* {@link B3Propagation#newFactoryBuilder()} to change the default.
Expand All @@ -95,6 +120,7 @@ public KafkaTracing build() {
final SamplerFunction<MessagingRequest> producerSampler, consumerSampler;
final Set<String> propagationKeys;
final String remoteServiceName;
final boolean singleRootSpanOnReceiveBatch;

KafkaTracing(Builder builder) { // intentionally hidden constructor
this.messagingTracing = builder.messagingTracing;
Expand All @@ -109,6 +135,7 @@ public KafkaTracing build() {
this.consumerSampler = messagingTracing.consumerSampler();
this.propagationKeys = new LinkedHashSet<>(propagation.keys());
this.remoteServiceName = builder.remoteServiceName;
this.singleRootSpanOnReceiveBatch = builder.singleRootSpanOnReceiveBatch;
}

/** @since 5.9 exposed for Kafka Streams tracing. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ final class TracingConsumer<K, V> implements Consumer<K, V> {
final SamplerFunction<MessagingRequest> sampler;
final Injector<KafkaConsumerRequest> injector;
final String remoteServiceName;
final boolean singleRootSpanOnReceiveBatch;
// replicate org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener behaviour
static final ConsumerRebalanceListener NO_OP_CONSUMER_REBALANCE_LISTENER =
new ConsumerRebalanceListener() {
Expand All @@ -71,6 +72,7 @@ final class TracingConsumer<K, V> implements Consumer<K, V> {
this.sampler = kafkaTracing.consumerSampler;
this.injector = kafkaTracing.consumerInjector;
this.remoteServiceName = kafkaTracing.remoteServiceName;
this.singleRootSpanOnReceiveBatch = kafkaTracing.singleRootSpanOnReceiveBatch;
}

// Do not use @Override annotation to avoid compatibility issue version < 2.0
Expand All @@ -95,8 +97,8 @@ public ConsumerRecords<K, V> poll(long timeout) {
kafkaTracing.extractAndClearHeaders(extractor, request, record.headers());

// If we extracted neither a trace context, nor request-scoped data (extra),
// make or reuse a span for this topic
if (extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
// and sharing trace is enabled make or reuse a span for this topic
if (extracted.equals(TraceContextOrSamplingFlags.EMPTY) && singleRootSpanOnReceiveBatch) {
Span span = consumerSpansForTopic.get(topic);
if (span == null) {
span = kafkaTracing.nextMessagingSpan(sampler, request, extracted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void should_createChildOfTraceHeaders() throws Exception {
}

@Test
public void should_create_only_one_consumer_span_per_topic() {
public void should_create_only_one_consumer_span_per_topic_whenSharingEnabled() {
Map<TopicPartition, Long> offsets = new HashMap<>();
// 2 partitions in the same topic
offsets.put(new TopicPartition(TEST_TOPIC, 0), 0L);
Expand All @@ -148,4 +148,32 @@ public void should_create_only_one_consumer_span_per_topic() {
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(entry("kafka.topic", "myTopic"));
}

@Test
public void should_create_individual_span_per_topic_whenSharingDisabled() {
kafkaTracing = kafkaTracing.toBuilder().singleRootSpanOnReceiveBatch(false).build();

Map<TopicPartition, Long> offsets = new HashMap<>();
// 2 partitions in the same topic
offsets.put(new TopicPartition(TEST_TOPIC, 0), 0L);
offsets.put(new TopicPartition(TEST_TOPIC, 1), 0L);

consumer.updateBeginningOffsets(offsets);
consumer.assign(offsets.keySet());

// create 500 messages
for (int i = 0; i < 250; i++) {
consumer.addRecord(new ConsumerRecord<>(TEST_TOPIC, 0, i, TEST_KEY, TEST_VALUE));
consumer.addRecord(new ConsumerRecord<>(TEST_TOPIC, 1, i, TEST_KEY, TEST_VALUE));
}

Consumer<String, String> tracingConsumer = kafkaTracing.consumer(consumer);
tracingConsumer.poll(10);

// only one consumer span reported
assertThat(spans)
.hasSize(500)
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(entry("kafka.topic", "myTopic"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import brave.Tracer;
import brave.Tracing;
import brave.kafka.clients.KafkaTracing;
import brave.messaging.MessagingTracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
Expand Down Expand Up @@ -55,7 +56,9 @@ public final class KafkaStreamsTracing {
final TraceContext.Injector<Headers> injector;

KafkaStreamsTracing(Builder builder) { // intentionally hidden constructor
this.kafkaTracing = builder.kafkaTracing;
this.kafkaTracing = builder.kafkaTracing.toBuilder()
.singleRootSpanOnReceiveBatch(builder.singleRootSpanOnReceiveBatch)
.build();
this.tracer = kafkaTracing.messagingTracing().tracing().tracer();
Propagation<String> propagation = kafkaTracing.messagingTracing().tracing().propagation();
this.propagationKeys = new LinkedHashSet<>(propagation.keys());
Expand All @@ -67,11 +70,26 @@ public static KafkaStreamsTracing create(Tracing tracing) {
return create(KafkaTracing.create(tracing));
}

/** @since 5.10 */
public static KafkaStreamsTracing create(MessagingTracing messagingTracing) {
return new Builder(KafkaTracing.create(messagingTracing)).build();
}

/** @since 5.9 */
public static KafkaStreamsTracing create(KafkaTracing kafkaTracing) {
return new Builder(kafkaTracing).build();
}

/** @since 5.10 */
public static Builder newBuilder(Tracing tracing) {
return new Builder(KafkaTracing.create(tracing));
}

/** @since 5.10 */
public static Builder newBuilder(MessagingTracing messagingTracing) {
return new Builder(KafkaTracing.create(messagingTracing));
}

/**
* Provides a {@link KafkaClientSupplier} with tracing enabled, hence Producer and Consumer
* operations will be traced.
Expand Down Expand Up @@ -431,12 +449,26 @@ void clearHeaders(Headers headers) {

public static final class Builder {
final KafkaTracing kafkaTracing;
boolean singleRootSpanOnReceiveBatch = false;

Builder(KafkaTracing kafkaTracing) {
if (kafkaTracing == null) throw new NullPointerException("kafkaTracing == null");
this.kafkaTracing = kafkaTracing;
}

/**
* Controls the sharing of a poll span for incoming spans with no trace context.
*
* <b/>If true, all the spans received in a poll batch that do not have trace-context will be added
* to a single new poll root span. Otherwise, a poll span will be created for each such message.
*
* @since 5.10
*/
public Builder singleRootSpanOnReceiveBatch(boolean singleRootSpanOnReceiveBatch) {
this.singleRootSpanOnReceiveBatch = singleRootSpanOnReceiveBatch;
return this;
}

public KafkaStreamsTracing build() {
return new KafkaStreamsTracing(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package brave.kafka.streams;

import brave.Tracing;
import brave.messaging.MessagingTracing;
import brave.propagation.StrictScopeDecorator;
import brave.propagation.ThreadLocalCurrentTraceContext;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -61,7 +62,8 @@ abstract class BaseTracingTest {
.build())
.spanReporter(spans::add)
.build();
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
MessagingTracing messagingTracing = MessagingTracing.create(tracing);
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(messagingTracing);

ProcessorSupplier<String, String> fakeProcessorSupplier =
kafkaStreamsTracing.processor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import brave.Tracing;
import brave.kafka.clients.KafkaTracing;
import brave.messaging.MessagingTracing;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import brave.propagation.StrictScopeDecorator;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

import zipkin2.Annotation;
import zipkin2.Span;

Expand Down Expand Up @@ -133,9 +133,84 @@ public void should_create_span_from_stream_input_topic() throws Exception {
streams.cleanUp();
}

@Test
public void should_create_multiple_span_from_stream_input_topic_whenSharingDisabled() throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic).foreach((k, v) -> {
});
Topology topology = builder.build();

Tracing tracing = Tracing.newBuilder()
.localServiceName("streams-app")
.currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(StrictScopeDecorator.create())
.build())
.spanReporter(spans::add)
.build();
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.newBuilder(tracing)
.singleRootSpanOnReceiveBatch(false)
.build();
KafkaStreams streams = kafkaStreamsTracing.kafkaStreams(topology, streamsProperties());

producer = createProducer();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();

waitForStreamToRun(streams);

Span first = takeSpan(), second = takeSpan(), third = takeSpan();

assertThat(first.tags()).containsEntry("kafka.topic", inputTopic);
assertThat(second.tags()).containsEntry("kafka.topic", inputTopic);
assertThat(third.tags()).containsEntry("kafka.topic", inputTopic);

streams.close();
streams.cleanUp();
}

@Test
public void should_create_one_span_from_stream_input_topic_whenSharingEnabled() throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic).foreach((k, v) -> {
});
Topology topology = builder.build();

Tracing tracing = Tracing.newBuilder()
.localServiceName("streams-app")
.currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(StrictScopeDecorator.create())
.build())
.spanReporter(spans::add)
.build();
MessagingTracing messagingTracing = MessagingTracing.create(tracing);
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.newBuilder(messagingTracing)
.singleRootSpanOnReceiveBatch(true)
.build();
KafkaStreams streams = kafkaStreamsTracing.kafkaStreams(topology, streamsProperties());

producer = createProducer();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();

waitForStreamToRun(streams);

Span first = takeSpan();

assertThat(first.tags()).containsEntry("kafka.topic", inputTopic);

streams.close();
streams.cleanUp();
}

@Test
public void should_create_span_from_stream_input_topic_using_kafka_client_supplier()
throws Exception {
throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
Expand Down Expand Up @@ -1310,7 +1385,7 @@ Properties streamsProperties() {
kafka.helper().consumerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
properties.put(StreamsConfig.STATE_DIR_CONFIG, "target/kafka-streams");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, testName.getMethodName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return properties;
}
Expand Down