-
Notifications
You must be signed in to change notification settings - Fork 714
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactors such that Kafka bundles spans and shares more code
This rewrites KafkaSpanCollector to bundle multiple spans into the same Kafka message. Basically, as many messages that come in one second will send in the same message. This change resulted in several times higher throughput in Yelp's kafka+cassandra Zipkin architecture. Flushing can be controlled via KafkaSpanCollector.Config.flushInterval Incidentally, this shares a lot of code with HttpSpanCollector, which should reduce the amount of bugs and maintenance around queue-based collection.
- Loading branch information
Adrian Cole
committed
Mar 4, 2016
1 parent
003cd69
commit ac5ca34
Showing
10 changed files
with
390 additions
and
383 deletions.
There are no files selected for viewing
123 changes: 123 additions & 0 deletions
123
brave-core/src/main/java/com/github/kristofa/brave/AbstractSpanCollector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package com.github.kristofa.brave; | ||
|
||
import com.github.kristofa.brave.internal.Nullable; | ||
import com.twitter.zipkin.gen.Span; | ||
import com.twitter.zipkin.gen.SpanCodec; | ||
import java.io.Closeable; | ||
import java.io.Flushable; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
|
||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
/** | ||
* Implemented {@link #sendSpans} to transport a encoded list of spans to Zipkin. | ||
*/ | ||
public abstract class AbstractSpanCollector implements SpanCollector, Flushable, Closeable { | ||
|
||
private final SpanCodec codec; | ||
private final SpanCollectorMetricsHandler metrics; | ||
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<Span>(1000); | ||
@Nullable // for testing | ||
private final Flusher flusher; | ||
|
||
/** | ||
* @param flushInterval in seconds. 0 implies spans are {@link #flush() flushed externally. | ||
*/ | ||
public AbstractSpanCollector(SpanCodec codec, SpanCollectorMetricsHandler metrics, | ||
int flushInterval) { | ||
this.codec = codec; | ||
this.metrics = metrics; | ||
this.flusher = flushInterval > 0 ? new Flusher(this, flushInterval) : null; | ||
} | ||
|
||
/** | ||
* Queues the span for collection, or drops it if the queue is full. | ||
* | ||
* @param span Span, should not be <code>null</code>. | ||
*/ | ||
@Override | ||
public void collect(Span span) { | ||
metrics.incrementAcceptedSpans(1); | ||
if (!pending.offer(span)) { | ||
metrics.incrementDroppedSpans(1); | ||
} | ||
} | ||
|
||
/** | ||
* Calling this will flush any pending spans to the transport on the current thread. | ||
*/ | ||
@Override | ||
public void flush() { | ||
if (pending.isEmpty()) return; | ||
List<Span> drained = new ArrayList<Span>(pending.size()); | ||
pending.drainTo(drained); | ||
if (drained.isEmpty()) return; | ||
|
||
// encode the spans for transport | ||
int spanCount = drained.size(); | ||
byte[] encoded; | ||
try { | ||
encoded = codec.writeSpans(drained); | ||
} catch (RuntimeException e) { | ||
metrics.incrementDroppedSpans(spanCount); | ||
return; | ||
} | ||
|
||
// transport the spans | ||
try { | ||
sendSpans(encoded); | ||
} catch (IOException e) { | ||
metrics.incrementDroppedSpans(spanCount); | ||
return; | ||
} | ||
} | ||
|
||
/** Calls flush on a fixed interval */ | ||
static final class Flusher implements Runnable { | ||
final Flushable flushable; | ||
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); | ||
|
||
Flusher(Flushable flushable, int flushInterval) { | ||
this.flushable = flushable; | ||
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
flushable.flush(); | ||
} catch (IOException ignored) { | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Sends a encoded list of spans over the current transport. | ||
* | ||
* @throws IOException when thrown, drop metrics will increment accordingly | ||
*/ | ||
protected abstract void sendSpans(byte[] encoded) throws IOException; | ||
|
||
@Override | ||
public void addDefaultAnnotation(String key, String value) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Requests a cease of delivery. There will be at most one in-flight send after this call. | ||
*/ | ||
@Override | ||
public void close() { | ||
if (flusher != null) flusher.scheduler.shutdown(); | ||
// throw any outstanding spans on the floor | ||
int dropped = pending.drainTo(new LinkedList<Span>()); | ||
metrics.incrementDroppedSpans(dropped); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,10 @@ | ||
# brave-spancollector-http # | ||
|
||
SpanCollector that is used to submit spans to Zipkins Http endpoint `/spans`. | ||
SpanCollector that encodes spans into a json list, POSTed to `/api/v1/spans`. | ||
|
||
## Configuration ## | ||
|
||
By default... | ||
|
||
* spans are flushed to a POST request every second. Configure with `HttpSpanCollector.Config.flushInterval`. | ||
* the POST body is not compressed. Configure with `HttpSpanCollector.Config.compressionEnabled`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.