diff --git a/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java new file mode 100755 index 0000000000..6fab4db46a --- /dev/null +++ b/brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java @@ -0,0 +1,123 @@ +package com.github.kristofa.brave; + +import com.github.kristofa.brave.internal.Nullable; +import com.twitter.zipkin.gen.Span; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin. + */ +public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable { + + private final SpanCodec codec; + private final SpanCollectorMetricsHandler metrics; + private final BlockingQueue pending = new LinkedBlockingQueue(1000); + @Nullable // for testing + private final Flusher flusher; + + /** + * @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally. + */ + public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics, + int flushInterval) { + this.codec = codec; + this.metrics = metrics; + this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null; + } + + /** + * Queues the span for collection, or drops it if the queue is full. + * + * @param span Span, should not be null. + */ + @Override + public void collect(Span span) { + metrics.incrementAcceptedSpans(1); + if (!pending.offer(span)) { + metrics.incrementDroppedSpans(1); + } + } + + /** + * Calling this will flush any pending spans to the transport on the current thread. + */ + @Override + public void flush() { + if (pending.isEmpty()) return; + List drained = new ArrayList(pending.size()); + pending.drainTo(drained); + if (drained.isEmpty()) return; + + // encode the spans for transport + int spanCount = drained.size(); + byte[] encoded; + try { + encoded = codec.writeSpans(drained); + } catch (RuntimeException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + + // transport the spans + try { + sendSpans(encoded); + } catch (IOException e) { + metrics.incrementDroppedSpans(spanCount); + return; + } + } + + /** Calls flush on a fixed interval */ + static final class Flusher implements Runnable { + final Flushable flushable; + final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + Flusher(Flushable flushable, int flushInterval) { + this.flushable = flushable; + this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); + } + + @Override + public void run() { + try { + flushable.flush(); + } catch (IOException ignored) { + } + } + } + + /** + * Sends a encoded list of spans over the current transport. + * + * @throws IOException when thrown, drop metrics will increment accordingly + */ + protected abstract void sendSpans(byte[] encoded) throws IOException; + + @Override + public void addDefaultAnnotation(String key, String value) { + throw new UnsupportedOperationException(); + } + + /** + * Requests a cease of delivery. There will be at most one in-flight send after this call. + */ + @Override + public void close() { + if (flusher != null) flusher.scheduler.shutdown(); + // throw any outstanding spans on the floor + int dropped = pending.drainTo(new LinkedList()); + metrics.incrementDroppedSpans(dropped); + } +} diff --git a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java index dc87eeb42a..e901b21d19 100755 --- a/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java +++ b/brave-spancollector-http/src/main/java/com/github/kristofa/brave/http/HttpSpanCollector.java @@ -1,34 +1,21 @@ package com.github.kristofa.brave.http; +import com.github.kristofa.brave.AbstractSpanCollector; import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; -import com.github.kristofa.brave.SpanCollector; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.github.kristofa.brave.internal.Nullable; import com.google.auto.value.AutoValue; -import com.twitter.zipkin.gen.Span; import com.twitter.zipkin.gen.SpanCodec; import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.zip.GZIPOutputStream; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * SpanCollector which submits spans to Zipkin, using its {@code POST /spans} endpoint. */ -public final class HttpSpanCollector implements SpanCollector, Flushable, Closeable { +public final class HttpSpanCollector extends AbstractSpanCollector { @AutoValue public static abstract class Config { @@ -72,17 +59,13 @@ public interface Builder { private final String url; private final Config config; - private final SpanCollectorMetricsHandler metrics; - private final BlockingQueue pending = new LinkedBlockingQueue<>(1000); - @Nullable // for testing - private final Flusher flusher; /** * Create a new instance with default configuration. * * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandler metrics) { return new HttpSpanCollector(baseUrl, Config.builder().build(), metrics); @@ -90,9 +73,9 @@ public static HttpSpanCollector create(String baseUrl, SpanCollectorMetricsHandl /** * @param baseUrl URL of the zipkin query server instance. Like: http://localhost:9411/ - * @param config controls flush interval and timeouts + * @param config includes flush interval and timeouts * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in - * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} */ public static HttpSpanCollector create(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { @@ -100,76 +83,14 @@ public static HttpSpanCollector create(String baseUrl, Config config, } // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. - HttpSpanCollector(String baseUrl, Config config, - SpanCollectorMetricsHandler metrics) { + HttpSpanCollector(String baseUrl, Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.JSON, metrics, config.flushInterval()); this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans"; - this.metrics = metrics; this.config = config; - this.flusher = config.flushInterval() > 0 ? new Flusher(this, config.flushInterval()) : null; - } - - /** - * Queues the span for collection, or drops it if the queue is full. - * - * @param span Span, should not be null. - */ - @Override - public void collect(Span span) { - metrics.incrementAcceptedSpans(1); - if (!pending.offer(span)) { - metrics.incrementDroppedSpans(1); - } } - /** - * Calling this will flush any pending spans to the http transport on the current thread. - */ @Override - public void flush() { - if (pending.isEmpty()) return; - List drained = new ArrayList<>(pending.size()); - pending.drainTo(drained); - if (drained.isEmpty()) return; - - // json-encode the spans for transport - int spanCount = drained.size(); - byte[] json; - try { - json = SpanCodec.JSON.writeSpans(drained); - } catch (RuntimeException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - - // Send the json to the zipkin endpoint - try { - postSpans(json); - } catch (IOException e) { - metrics.incrementDroppedSpans(spanCount); - return; - } - } - - /** Calls flush on a fixed interval */ - static final class Flusher implements Runnable { - final Flushable flushable; - final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - Flusher(Flushable flushable, int flushInterval) { - this.flushable = flushable; - this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); - } - - @Override - public void run() { - try { - flushable.flush(); - } catch (IOException ignored) { - } - } - } - - void postSpans(byte[] json) throws IOException { + protected void sendSpans(byte[] json) throws IOException { // intentionally not closing the connection, so as to use keep-alives HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); connection.setConnectTimeout(config.connectTimeout()); @@ -199,21 +120,4 @@ void postSpans(byte[] json) throws IOException { throw e; } } - - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); - } - - /** - * Requests a cease of delivery. There will be at most one in-flight request processing after this - * call returns. - */ - @Override - public void close() { - if (flusher != null) flusher.scheduler.shutdown(); - // throw any outstanding spans on the floor - int dropped = pending.drainTo(new LinkedList<>()); - metrics.incrementDroppedSpans(dropped); - } } diff --git a/brave-spancollector-kafka/README.md b/brave-spancollector-kafka/README.md index 39e31ea6ec..166148b680 100644 --- a/brave-spancollector-kafka/README.md +++ b/brave-spancollector-kafka/README.md @@ -2,7 +2,7 @@ SpanCollector that is used to submit spans to Kafka. -Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded Span. +Spans are sent to a topic named `zipkin` and contain no key or partition only a value which is a TBinaryProtocol encoded list of spans. ## Monitoring ## diff --git a/brave-spancollector-kafka/pom.xml b/brave-spancollector-kafka/pom.xml index c97d293436..2f0bac4a78 100644 --- a/brave-spancollector-kafka/pom.xml +++ b/brave-spancollector-kafka/pom.xml @@ -40,6 +40,16 @@ brave-core 3.4.1-SNAPSHOT + + com.google.auto.value + auto-value + provided + + + + io.zipkin.java + zipkin + org.apache.kafka kafka-clients @@ -59,4 +69,31 @@ test + + + + maven-shade-plugin + + + package + + shade + + + false + true + ${project.build.directory}/dependency-reduced-pom.xml + + + + zipkin + com.github.kristofa.brave.kafka.internal.zipkin + + + + + + + + diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java index 776d751e51..a66da49f17 100644 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java +++ b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/KafkaSpanCollector.java @@ -1,107 +1,126 @@ package com.github.kristofa.brave.kafka; -import com.github.kristofa.brave.SpanCollector; - -import java.io.Closeable; -import java.util.Properties; -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - +import com.github.kristofa.brave.AbstractSpanCollector; +import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.Span; +import com.google.auto.value.AutoValue; +import com.twitter.zipkin.gen.SpanCodec; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; +import org.apache.kafka.clients.producer.ProducerRecord; +import zipkin.Span; +import zipkin.internal.ThriftCodec; /** - * SpanCollector which submits spans to Kafka using Kafka Producer api. - *

- * Spans are sent to kafka as keyed messages: the key is the topic zipkin and the value is a TBinaryProtocol encoded Span. - *

+ * SpanCollector which sends a thrift-encoded list of spans to the Kafka topic "zipkin". + * + *

If running zipkin-scala 1.35+, please enable {@link Config#bundlingEnabled() bundling}. */ -public class KafkaSpanCollector implements SpanCollector, Closeable { - - private static final Logger LOGGER = Logger.getLogger(KafkaSpanCollector.class.getName()); - private static final Properties DEFAULT_PROPERTIES = new Properties(); +public final class KafkaSpanCollector extends AbstractSpanCollector { - static { - DEFAULT_PROPERTIES.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - DEFAULT_PROPERTIES.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + @AutoValue + public static abstract class Config { + public static Builder builder() { + return new AutoValue_KafkaSpanCollector_Config.Builder() + .bundlingEnabled(false) + .flushInterval(1); } - private static Properties defaultPropertiesWith(String bootstrapServers) { - Properties props = new Properties(); - for (String name : DEFAULT_PROPERTIES.stringPropertyNames()) { - props.setProperty(name, DEFAULT_PROPERTIES.getProperty(name)); - } - props.setProperty("bootstrap.servers", bootstrapServers); - return props; + public static Builder builder(String bootstrapServers) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + return builder().kafkaProperties(props); } - private final Producer producer; - private final ExecutorService executorService; - private final SpanProcessingTask spanProcessingTask; - private final Future future; - private final BlockingQueue queue; - private final SpanCollectorMetricsHandler metricsHandler; - - /** - * Create a new instance with default configuration. - * - * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. - * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(String bootstrapServers, SpanCollectorMetricsHandler metricsHandler) { - this(KafkaSpanCollector.defaultPropertiesWith(bootstrapServers), metricsHandler); - } + abstract Properties kafkaProperties(); - /** - * KafkaSpanCollector. - * - * @param kafkaProperties Configuration for Kafka producer. Essential configuration properties are: - * bootstrap.servers, key.serializer, value.serializer. For a - * full list of config options, see http://kafka.apache.org/documentation.html#producerconfigs. - * @param metricsHandler Gets notified when spans are accepted or dropped. If you are not interested in these events you - * can use {@linkplain EmptySpanCollectorMetricsHandler} - */ - public KafkaSpanCollector(Properties kafkaProperties, SpanCollectorMetricsHandler metricsHandler) { - producer = new KafkaProducer<>(kafkaProperties); - this.metricsHandler = metricsHandler; - executorService = Executors.newSingleThreadExecutor(); - queue = new ArrayBlockingQueue(1000); - spanProcessingTask = new SpanProcessingTask(queue, producer, metricsHandler); - future = executorService.submit(spanProcessingTask); - } + abstract int flushInterval(); - @Override - public void collect(com.twitter.zipkin.gen.Span span) { - metricsHandler.incrementAcceptedSpans(1); - if (!queue.offer(span)) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "Queue rejected span!"); - } - } + abstract boolean bundlingEnabled(); - @Override - public void addDefaultAnnotation(String key, String value) { - throw new UnsupportedOperationException(); + @AutoValue.Builder + public interface Builder { + /** + * Configuration for Kafka producer. Essential configuration properties are: + * bootstrap.servers, key.serializer, value.serializer. For a full list of config options, see + * http://kafka.apache.org/documentation.html#kafkaPropertiess. + * + *

Must include the following mappings: + */ + Builder kafkaProperties(Properties kafkaProperties); + + /** + * Default false. true implies that spans will be bundled into a list vs sent individually. + * + *

Note: It is recommended to enable bundling. However, it requires zipkin-scala 1.35+ + */ + Builder bundlingEnabled(boolean bundleSpans); + + + /** Default 1 second. 0 implies spans are {@link #flush() flushed} externally. */ + Builder flushInterval(int flushInterval); + + Config build(); } + } + + private final Config config; + private final Producer producer; + private final ThriftCodec thriftCodec = new ThriftCodec(); - @Override - public void close() { - spanProcessingTask.stop(); - try { - Integer nrProcessedSpans = future.get(6000, TimeUnit.MILLISECONDS); - LOGGER.info("SpanProcessingTask processed " + nrProcessedSpans + " spans."); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception when waiting for SpanProcessTask to finish.", e); - } - executorService.shutdown(); - producer.close(); - metricsHandler.incrementDroppedSpans(queue.size()); - LOGGER.info("KafkaSpanCollector closed."); + /** + * Create a new instance with default configuration. + * + * @param bootstrapServers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * Like: host1:port1,host2:port2,... Does not to be all the servers part of Kafka cluster. + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(String bootstrapServers, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(Config.builder(bootstrapServers).build(), metrics); + } + + /** + * @param config includes flush interval and kafka properties + * @param metrics Gets notified when spans are accepted or dropped. If you are not interested in + * these events you can use {@linkplain EmptySpanCollectorMetricsHandler} + */ + public static KafkaSpanCollector create(Config config, SpanCollectorMetricsHandler metrics) { + return new KafkaSpanCollector(config, metrics); + } + + // Visible for testing. Ex when tests need to explicitly control flushing, set interval to 0. + KafkaSpanCollector(Config config, SpanCollectorMetricsHandler metrics) { + super(SpanCodec.THRIFT, metrics, config.flushInterval()); + this.config = config; + this.producer = new KafkaProducer<>(config.kafkaProperties()); + } + + @Override + protected void sendSpans(byte[] thrift) throws IOException { + // Optimal case, when bundling is enabled, send all in the same message + if (config.bundlingEnabled()) { + producer.send(new ProducerRecord("zipkin", thrift)); + return; } -} \ No newline at end of file + // Zipkin <1.35 compatibility case, send spans one at-a-time. + ByteBuffer buffer = ByteBuffer.wrap(thrift); + byte elementTypeIgnored = buffer.get(); + int spanCount = buffer.getInt(); + for (int i = 0; i < spanCount; i++) { + Span span = thriftCodec.readSpan(buffer); + producer.send(new ProducerRecord("zipkin", thriftCodec.writeSpan(span))); + } + } + + @Override + public void close() { + producer.close(); + super.close(); + } +} diff --git a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java b/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java deleted file mode 100644 index 4dd8a9eae4..0000000000 --- a/brave-spancollector-kafka/src/main/java/com/github/kristofa/brave/kafka/SpanProcessingTask.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.github.kristofa.brave.kafka; - -import com.github.kristofa.brave.SpanCollectorMetricsHandler; -import com.twitter.zipkin.gen.SpanCodec; -import com.twitter.zipkin.gen.Span; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Processes spans by sending them, one at a time, to the topic `zipkin`, encoded in {@code TBinaryProtocol}. - *

- *

Note: this class was written to be used by a single-threaded executor, hence it is not thead-safe. - */ -class SpanProcessingTask implements Callable { - - private static final Logger LOGGER = Logger.getLogger(SpanProcessingTask.class.getName()); - private final BlockingQueue queue; - private final Producer producer; - private final SpanCollectorMetricsHandler metricsHandler; - private volatile boolean stop = false; - private int numProcessedSpans = 0; - - - SpanProcessingTask(BlockingQueue queue, Producer producer, SpanCollectorMetricsHandler metricsHandler) { - this.queue = queue; - this.producer = producer; - this.metricsHandler = metricsHandler; - } - - public void stop() { - stop = true; - } - - @Override - public Integer call() throws Exception { - do { - final Span span = queue.poll(5, TimeUnit.SECONDS); - if (span == null) { - continue; - } - try { - final ProducerRecord message = new ProducerRecord<>("zipkin", SpanCodec.THRIFT.writeSpan(span)); - producer.send(message); - numProcessedSpans++; - } catch (RuntimeException e) { - metricsHandler.incrementDroppedSpans(1); - LOGGER.log(Level.WARNING, "RuntimeException when writing span.", e); - } - } while (!stop); - return numProcessedSpans; - } -} diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java index f56f4a46d7..50ef49805a 100644 --- a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java +++ b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/ITKafkaSpanCollector.java @@ -39,9 +39,10 @@ public synchronized void incrementDroppedSpans(int quantity) { @Test public void submitSingleSpan() throws TimeoutException { - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); + KafkaSpanCollector kafkaCollector = KafkaSpanCollector.create("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); Span span = span(1l, "test_kafka_span"); kafkaCollector.collect(span); + kafkaCollector.flush(); kafkaCollector.close(); List spans = getCollectedSpans(kafkaRule.readMessages("zipkin", 1, new DefaultDecoder(kafkaRule.consumerConfig().props()))); @@ -54,7 +55,7 @@ public void submitSingleSpan() throws TimeoutException { @Test public void submitMultipleSpansInParallel() throws InterruptedException, ExecutionException, TimeoutException { - KafkaSpanCollector kafkaCollector = new KafkaSpanCollector("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); + KafkaSpanCollector kafkaCollector = KafkaSpanCollector.create("localhost:"+kafkaRule.kafkaBrokerPort(), metricsHandler); Callable spanProducer1 = new Callable() { @Override diff --git a/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java new file mode 100644 index 0000000000..12fb3ceedc --- /dev/null +++ b/brave-spancollector-kafka/src/test/java/com/github/kristofa/brave/kafka/KafkaSpanCollectorTest.java @@ -0,0 +1,105 @@ +package com.github.kristofa.brave.kafka; + +import com.github.charithe.kafka.KafkaJunitRule; +import com.github.kristofa.brave.SpanCollectorMetricsHandler; +import com.github.kristofa.brave.kafka.KafkaSpanCollector.Config; +import com.twitter.zipkin.gen.Span; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import kafka.serializer.DefaultDecoder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin.Codec; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KafkaSpanCollectorTest { + + @Rule + public KafkaJunitRule kafka = new KafkaJunitRule(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + TestMetricsHander metrics = new TestMetricsHander(); + + Config config = Config.builder("localhost:" + kafka.kafkaBrokerPort()) + .bundlingEnabled(true) // enable the recommended bundling configuration + .flushInterval(0) // set flush interval to 0 so that tests can drive flushing explicitly + .build(); + KafkaSpanCollector collector = new KafkaSpanCollector(config, metrics); + + @Test + public void collectDoesntDoIO() throws Exception { + thrown.expect(TimeoutException.class); + collector.collect(span(1L, "foo")); + + assertThat(readMessages()).isEmpty(); + } + + @Test + public void collectIncrementsAcceptedMetrics() throws Exception { + collector.collect(span(1L, "foo")); + + assertThat(metrics.acceptedSpans.get()).isEqualTo(1); + assertThat(metrics.droppedSpans.get()).isZero(); + } + + @Test + public void dropsWhenQueueIsFull() throws Exception { + for (int i = 0; i < 1001; i++) + collector.collect(span(1L, "foo")); + + collector.flush(); // manually flush the spans + + assertThat(Codec.THRIFT.readSpans(readMessages().get(0))).hasSize(1000); + assertThat(metrics.droppedSpans.get()).isEqualTo(1); + } + + @Test + public void sendsSpans() throws Exception { + collector.collect(span(1L, "foo")); + collector.collect(span(2L, "bar")); + + collector.flush(); // manually flush the spans + + // Ensure only one message was sent + List messages = readMessages(); + assertThat(messages).hasSize(1); + + // Now, let's read back the spans we sent! + assertThat(Codec.THRIFT.readSpans(messages.get(0))).containsExactly( + zipkinSpan(1L, "foo"), + zipkinSpan(2L, "bar") + ); + } + + class TestMetricsHander implements SpanCollectorMetricsHandler { + + final AtomicInteger acceptedSpans = new AtomicInteger(); + final AtomicInteger droppedSpans = new AtomicInteger(); + + @Override + public void incrementAcceptedSpans(int quantity) { + acceptedSpans.addAndGet(quantity); + } + + @Override + public void incrementDroppedSpans(int quantity) { + droppedSpans.addAndGet(quantity); + } + } + + static Span span(long traceId, String spanName) { + return new Span().setTrace_id(traceId).setId(traceId).setName(spanName); + } + + static zipkin.Span zipkinSpan(long traceId, String spanName) { + return new zipkin.Span.Builder().traceId(traceId).id(traceId).name(spanName).build(); + } + + private List readMessages() throws TimeoutException { + return kafka.readMessages("zipkin", 1, new DefaultDecoder(kafka.consumerConfig().props())); + } +}