Skip to content

Commit

Permalink
add flag to not share trace on consumption (#1033)
Browse files Browse the repository at this point in the history
* initial draft on adding a flag to not share a trace on consumption

* add comment docs for flag

* add comment on impl line

* add unit test

* add newBuilder and creator with messaging api

* add streams tests

* rephrase docs

* try out new naming

* update names based on recomendation

* fix reorder of imports

* add doc section for flag

* add diagram

* remove single header from docs as default

* add code annotation on docs
  • Loading branch information
jeqo authored Nov 24, 2019
1 parent 96ca366 commit 75be01f
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 9 deletions.
32 changes: 31 additions & 1 deletion instrumentation/kafka-clients/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Add decorators for Kafka producer and consumer to enable tracing.
First, setup the generic Kafka component like this:
```java
kafkaTracing = KafkaTracing.newBuilder(messagingTracing)
.writeB3SingleFormat(true) // for more efficient propagation
.remoteServiceName("my-broker")
.build();
```
Expand Down Expand Up @@ -103,6 +102,37 @@ to trace manually or you can do similar via automatic instrumentation like Aspec
}
```

## Single Root Span on Consumer

When a Tracing Kafka Consumer is processing records that do not have trace-context (i.e. Producer is not tracing)
it will reuse the same root span `poll` to group all processing of records returned.

```
trace 1:
poll
|- processing1
|- processing2
...
+- processing N
```

If this is not the desired behavior, users can customize it by setting `singleRootSpanOnReceiveBatch` to `false`.
This will create a root span `poll` for each record received.

```
trace 1:
poll
+- processing1
trace 2:
poll
+- processing2
...
trace N:
poll
+- processing N
```

## Notes
* This tracer is only compatible with Kafka versions including headers support ( > 0.11.0).
* More information about "Message Tracing" [here](https://github.com/apache/incubator-zipkin-website/blob/master/pages/instrumenting.md#message-tracing)
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@ public static Builder newBuilder(MessagingTracing messagingTracing) {
return new Builder(messagingTracing);
}

/** @since 5.10 **/
public Builder toBuilder() {
return new Builder(this);
}

public static final class Builder {
final MessagingTracing messagingTracing;
String remoteServiceName = "kafka";
boolean singleRootSpanOnReceiveBatch = true;

Builder(MessagingTracing messagingTracing) {
if (messagingTracing == null) throw new NullPointerException("messagingTracing == null");
this.messagingTracing = messagingTracing;
}

Builder(KafkaTracing kafkaTracing) {
this.messagingTracing = kafkaTracing.messagingTracing;
this.remoteServiceName = kafkaTracing.remoteServiceName;
this.singleRootSpanOnReceiveBatch = kafkaTracing.singleRootSpanOnReceiveBatch;
}

/**
* The remote service name that describes the broker in the dependency graph. Defaults to
* "kafka"
Expand All @@ -72,6 +84,20 @@ public Builder remoteServiceName(String remoteServiceName) {
return this;
}

/**
* Controls the sharing of a {@code poll} span for incoming spans with no trace context.
*
* <b/>If true, all the spans received in a {@code poll} batch that do not have trace-context
* will be added to a single new poll root span. Otherwise, a {@code poll} span will be created
* for each such message.
*
* @since 5.10
*/
public Builder singleRootSpanOnReceiveBatch(boolean singleRootSpanOnReceiveBatch) {
this.singleRootSpanOnReceiveBatch = singleRootSpanOnReceiveBatch;
return this;
}

/**
* @deprecated as of v5.9, this is ignored because single format is default for messaging. Use
* {@link B3Propagation#newFactoryBuilder()} to change the default.
Expand All @@ -95,6 +121,7 @@ public KafkaTracing build() {
final SamplerFunction<MessagingRequest> producerSampler, consumerSampler;
final Set<String> propagationKeys;
final String remoteServiceName;
final boolean singleRootSpanOnReceiveBatch;

KafkaTracing(Builder builder) { // intentionally hidden constructor
this.messagingTracing = builder.messagingTracing;
Expand All @@ -109,6 +136,7 @@ public KafkaTracing build() {
this.consumerSampler = messagingTracing.consumerSampler();
this.propagationKeys = new LinkedHashSet<>(propagation.keys());
this.remoteServiceName = builder.remoteServiceName;
this.singleRootSpanOnReceiveBatch = builder.singleRootSpanOnReceiveBatch;
}

/** @since 5.9 exposed for Kafka Streams tracing. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ final class TracingConsumer<K, V> implements Consumer<K, V> {
final SamplerFunction<MessagingRequest> sampler;
final Injector<KafkaConsumerRequest> injector;
final String remoteServiceName;
final boolean singleRootSpanOnReceiveBatch;
// replicate org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener behaviour
static final ConsumerRebalanceListener NO_OP_CONSUMER_REBALANCE_LISTENER =
new ConsumerRebalanceListener() {
Expand All @@ -71,6 +72,7 @@ final class TracingConsumer<K, V> implements Consumer<K, V> {
this.sampler = kafkaTracing.consumerSampler;
this.injector = kafkaTracing.consumerInjector;
this.remoteServiceName = kafkaTracing.remoteServiceName;
this.singleRootSpanOnReceiveBatch = kafkaTracing.singleRootSpanOnReceiveBatch;
}

// Do not use @Override annotation to avoid compatibility issue version < 2.0
Expand All @@ -95,8 +97,8 @@ public ConsumerRecords<K, V> poll(long timeout) {
kafkaTracing.extractAndClearHeaders(extractor, request, record.headers());

// If we extracted neither a trace context, nor request-scoped data (extra),
// make or reuse a span for this topic
if (extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
// and sharing trace is enabled make or reuse a span for this topic
if (extracted.equals(TraceContextOrSamplingFlags.EMPTY) && singleRootSpanOnReceiveBatch) {
Span span = consumerSpansForTopic.get(topic);
if (span == null) {
span = kafkaTracing.nextMessagingSpan(sampler, request, extracted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void should_createChildOfTraceHeaders() throws Exception {
}

@Test
public void should_create_only_one_consumer_span_per_topic() {
public void should_create_only_one_consumer_span_per_topic_whenSharingEnabled() {
Map<TopicPartition, Long> offsets = new HashMap<>();
// 2 partitions in the same topic
offsets.put(new TopicPartition(TEST_TOPIC, 0), 0L);
Expand All @@ -148,4 +148,32 @@ public void should_create_only_one_consumer_span_per_topic() {
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(entry("kafka.topic", "myTopic"));
}

@Test
public void should_create_individual_span_per_topic_whenSharingDisabled() {
kafkaTracing = kafkaTracing.toBuilder().singleRootSpanOnReceiveBatch(false).build();

Map<TopicPartition, Long> offsets = new HashMap<>();
// 2 partitions in the same topic
offsets.put(new TopicPartition(TEST_TOPIC, 0), 0L);
offsets.put(new TopicPartition(TEST_TOPIC, 1), 0L);

consumer.updateBeginningOffsets(offsets);
consumer.assign(offsets.keySet());

// create 500 messages
for (int i = 0; i < 250; i++) {
consumer.addRecord(new ConsumerRecord<>(TEST_TOPIC, 0, i, TEST_KEY, TEST_VALUE));
consumer.addRecord(new ConsumerRecord<>(TEST_TOPIC, 1, i, TEST_KEY, TEST_VALUE));
}

Consumer<String, String> tracingConsumer = kafkaTracing.consumer(consumer);
tracingConsumer.poll(10);

// only one consumer span reported
assertThat(spans)
.hasSize(500)
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(entry("kafka.topic", "myTopic"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import brave.Tracer;
import brave.Tracing;
import brave.kafka.clients.KafkaTracing;
import brave.messaging.MessagingTracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
Expand Down Expand Up @@ -55,7 +56,9 @@ public final class KafkaStreamsTracing {
final TraceContext.Injector<Headers> injector;

KafkaStreamsTracing(Builder builder) { // intentionally hidden constructor
this.kafkaTracing = builder.kafkaTracing;
this.kafkaTracing = builder.kafkaTracing.toBuilder()
.singleRootSpanOnReceiveBatch(builder.singleRootSpanOnReceiveBatch)
.build();
this.tracer = kafkaTracing.messagingTracing().tracing().tracer();
Propagation<String> propagation = kafkaTracing.messagingTracing().tracing().propagation();
this.propagationKeys = new LinkedHashSet<>(propagation.keys());
Expand All @@ -67,11 +70,26 @@ public static KafkaStreamsTracing create(Tracing tracing) {
return create(KafkaTracing.create(tracing));
}

/** @since 5.10 */
public static KafkaStreamsTracing create(MessagingTracing messagingTracing) {
return new Builder(KafkaTracing.create(messagingTracing)).build();
}

/** @since 5.9 */
public static KafkaStreamsTracing create(KafkaTracing kafkaTracing) {
return new Builder(kafkaTracing).build();
}

/** @since 5.10 */
public static Builder newBuilder(Tracing tracing) {
return new Builder(KafkaTracing.create(tracing));
}

/** @since 5.10 */
public static Builder newBuilder(MessagingTracing messagingTracing) {
return new Builder(KafkaTracing.create(messagingTracing));
}

/**
* Provides a {@link KafkaClientSupplier} with tracing enabled, hence Producer and Consumer
* operations will be traced.
Expand Down Expand Up @@ -453,12 +471,27 @@ void clearHeaders(Headers headers) {

public static final class Builder {
final KafkaTracing kafkaTracing;
boolean singleRootSpanOnReceiveBatch = false;

Builder(KafkaTracing kafkaTracing) {
if (kafkaTracing == null) throw new NullPointerException("kafkaTracing == null");
this.kafkaTracing = kafkaTracing;
}

/**
* Controls the sharing of a {@code poll} span for incoming spans with no trace context.
*
* <b/>If true, all the spans received in a {@code poll} batch that do not have trace-context
* will be added to a single new poll root span. Otherwise, a {@code poll} span will be created
* for each such message.
*
* @since 5.10
*/
public Builder singleRootSpanOnReceiveBatch(boolean singleRootSpanOnReceiveBatch) {
this.singleRootSpanOnReceiveBatch = singleRootSpanOnReceiveBatch;
return this;
}

public KafkaStreamsTracing build() {
return new KafkaStreamsTracing(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package brave.kafka.streams;

import brave.Tracing;
import brave.messaging.MessagingTracing;
import brave.propagation.StrictScopeDecorator;
import brave.propagation.ThreadLocalCurrentTraceContext;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -61,7 +62,8 @@ abstract class BaseTracingTest {
.build())
.spanReporter(spans::add)
.build();
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(tracing);
MessagingTracing messagingTracing = MessagingTracing.create(tracing);
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(messagingTracing);

ProcessorSupplier<String, String> fakeProcessorSupplier =
kafkaStreamsTracing.processor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import brave.Tracing;
import brave.kafka.clients.KafkaTracing;
import brave.messaging.MessagingTracing;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import brave.propagation.StrictScopeDecorator;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

import zipkin2.Annotation;
import zipkin2.Span;

Expand Down Expand Up @@ -133,9 +133,84 @@ public void should_create_span_from_stream_input_topic() throws Exception {
streams.cleanUp();
}

@Test
public void should_create_multiple_span_from_stream_input_topic_whenSharingDisabled() throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic).foreach((k, v) -> {
});
Topology topology = builder.build();

Tracing tracing = Tracing.newBuilder()
.localServiceName("streams-app")
.currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(StrictScopeDecorator.create())
.build())
.spanReporter(spans::add)
.build();
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.newBuilder(tracing)
.singleRootSpanOnReceiveBatch(false)
.build();
KafkaStreams streams = kafkaStreamsTracing.kafkaStreams(topology, streamsProperties());

producer = createProducer();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();

waitForStreamToRun(streams);

Span first = takeSpan(), second = takeSpan(), third = takeSpan();

assertThat(first.tags()).containsEntry("kafka.topic", inputTopic);
assertThat(second.tags()).containsEntry("kafka.topic", inputTopic);
assertThat(third.tags()).containsEntry("kafka.topic", inputTopic);

streams.close();
streams.cleanUp();
}

@Test
public void should_create_one_span_from_stream_input_topic_whenSharingEnabled() throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic).foreach((k, v) -> {
});
Topology topology = builder.build();

Tracing tracing = Tracing.newBuilder()
.localServiceName("streams-app")
.currentTraceContext(ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(StrictScopeDecorator.create())
.build())
.spanReporter(spans::add)
.build();
MessagingTracing messagingTracing = MessagingTracing.create(tracing);
KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.newBuilder(messagingTracing)
.singleRootSpanOnReceiveBatch(true)
.build();
KafkaStreams streams = kafkaStreamsTracing.kafkaStreams(topology, streamsProperties());

producer = createProducer();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();
producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get();

waitForStreamToRun(streams);

Span first = takeSpan();

assertThat(first.tags()).containsEntry("kafka.topic", inputTopic);

streams.close();
streams.cleanUp();
}

@Test
public void should_create_span_from_stream_input_topic_using_kafka_client_supplier()
throws Exception {
throws Exception {
String inputTopic = testName.getMethodName() + "-input";

StreamsBuilder builder = new StreamsBuilder();
Expand Down Expand Up @@ -1350,7 +1425,7 @@ Properties streamsProperties() {
kafka.helper().consumerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
properties.put(StreamsConfig.STATE_DIR_CONFIG, "target/kafka-streams");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, testName.getMethodName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return properties;
}
Expand Down

0 comments on commit 75be01f

Please sign in to comment.