Skip to content

Commit

Permalink
Supports reading multiple spans per Kafka message
Browse files Browse the repository at this point in the history
Kafka messages have binary payloads and no key. The binary contents are
serialized TBinaryProtocol thrift messages. This change peeks at thei
first bytes to see if it is a List of structs or not, reading
accordingly.

This approach would need a revision if we ever add a Struct field to
Span. However, that is unlikely. At the point we change the structure of
Span, we'd likely change other aspects which would make it a different
struct completely (see #939). In such case, we'd add a key to the kafka
message of the span version, and not hit the code affected in this
change.

Fixes #979
  • Loading branch information
Adrian Cole committed Feb 23, 2016
1 parent 95c017e commit 6f5a378
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait KafkaSpanReceiverFactory { self: App =>
process: Seq[ThriftSpan] => Future[Unit],
stats: StatsReceiver = DefaultStatsReceiver.scope("KafkaSpanReceiver"),
keyDecoder: Decoder[T] = KafkaProcessor.defaultKeyDecoder,
valueDecoder: KafkaProcessor.KafkaDecoder = new SpanCodec()
valueDecoder: KafkaProcessor.KafkaDecoder = new SpanDecoder()
): SpanReceiver = new SpanReceiver {


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.twitter.zipkin.receiver.kafka

import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.conversions.thrift
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
import org.apache.thrift.protocol.TType

class SpanDecoder extends KafkaProcessor.KafkaDecoder {
val deserializer = new BinaryThriftStructSerializer[ThriftSpan] {
def codec = ThriftSpan
}

// Given the thrift encoding is TBinaryProtocol..
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT
// Span has no STRUCT fields: we assume that if the first byte is TType.STRUCT is a list.
def fromBytes(bytes: Array[Byte]) =
if (bytes(0) == TType.STRUCT) {
thrift.thriftListToThriftSpans(bytes)
} else {
List(deserializer.fromBytes(bytes))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package com.twitter.zipkin.receiver.kafka
import java.util.concurrent.LinkedBlockingQueue

import com.github.charithe.kafka.KafkaJunitRule
import com.twitter.io.Buf
import com.twitter.util.{Await, Future, Promise}
import com.twitter.zipkin.common._
import com.twitter.zipkin.conversions.thrift._
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
import kafka.producer._
import org.apache.thrift.protocol.{TType, TList, TBinaryProtocol}
import org.apache.thrift.transport.TMemoryBuffer
import org.junit.{ClassRule, Test}
import org.scalatest.junit.JUnitSuite

Expand All @@ -26,7 +29,7 @@ class KafkaProcessorSpec extends JUnitSuite {
val annotation = Annotation(1, "value", Some(Endpoint(1, 2, "service")))
// Intentionally leaving timestamp and duration unset, as legacy instrumentation don't set this.
val span = Span(1234, "methodname", 4567, annotations = List(annotation))
val codec = new SpanCodec()
val codec = new SpanDecoder()

@Test def messageWithSingleSpan() {
val topic = "single_span"
Expand All @@ -38,14 +41,33 @@ class KafkaProcessorSpec extends JUnitSuite {
}, codec, codec)

val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())
producer.send(new KeyedMessage(topic, codec.encode(span)))
producer.send(new KeyedMessage(topic, encode(span)))
producer.close()

assert(Await.result(recvdSpan) == Seq(span.toThrift))

Await.result(service.close())
}

@Test def messageWithMultipleSpans() {
val topic = "multiple_spans"
val recvdSpan = new Promise[Seq[ThriftSpan]]

val service = KafkaProcessor(Map(topic -> 1), kafkaRule.consumerConfig(), { s =>
recvdSpan.setValue(s)
Future.value(true)
}, codec, codec)

val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())
producer.send(new KeyedMessage(topic, encode(Seq(span, span)))) // 2 spans in one message
producer.close()

// make sure we decoded both spans from the same message
assert(Await.result(recvdSpan) == Seq(span.toThrift, span.toThrift))

Await.result(service.close())
}

@Test def skipsMalformedData() {
val topic = "malformed"
val recvdSpans = new LinkedBlockingQueue[Seq[ThriftSpan]](3)
Expand All @@ -57,14 +79,32 @@ class KafkaProcessorSpec extends JUnitSuite {

val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())

producer.send(new KeyedMessage(topic, codec.encode(span)))
producer.send(new KeyedMessage(topic, encode(span)))
producer.send(new KeyedMessage(topic, "malformed".getBytes()))
producer.send(new KeyedMessage(topic, codec.encode(span)))
producer.send(new KeyedMessage(topic, encode(span)))
producer.close()

for (elem <- 1 until 2)
assert(recvdSpans.take() == Seq(span.toThrift))

Await.result(service.close())
}

def encode(span: Span) = {
val transport = new TMemoryBuffer(0)
val oproto = new TBinaryProtocol(transport)
val tspan = spanToThriftSpan(span)
tspan.toThrift.write(oproto)
transport.getArray()
}

def encode(spans: Seq[Span]) = {
// serialize all spans as a thrift list
val transport = new TMemoryBuffer(0)
val oproto = new TBinaryProtocol(transport)
oproto.writeListBegin(new TList(TType.STRUCT, spans.size))
spans.map(spanToThriftSpan).foreach(_.toThrift.write(oproto))
oproto.writeListEnd()
transport.getArray()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,23 @@ object thrift {
implicit def spanToThriftSpan(s: Span) = new ThriftSpan(s)
implicit def thriftSpanToSpan(s: thriftscala.Span) = new WrappedSpan(s)

def thriftListToSpans(bytes: Array[Byte]) = {
def thriftListToThriftSpans(bytes: Array[Byte]) = {
val proto = new TBinaryProtocol(TArrayByteTransport(bytes))
val _list = proto.readListBegin()
if (_list.size > 10000) {
throw new IllegalArgumentException(_list.size + " > 10000: possibly malformed thrift")
}
val result = new ArrayBuffer[Span](_list.size)
val result = new ArrayBuffer[thriftscala.Span](_list.size)
for (i <- 1 to _list.size) {
val thrift = thriftscala.Span.decode(proto)
result += thriftSpanToSpan(thrift).toSpan
result += thriftscala.Span.decode(proto)
}
proto.readListEnd()
result.toList
}

def thriftListToSpans(bytes: Array[Byte]) =
thriftListToThriftSpans(bytes).map(thriftSpanToSpan(_).toSpan)

class WrappedDependencyLink(dl: DependencyLink) {
lazy val toThrift = thriftscala.DependencyLink(dl.parent, dl.child, dl.callCount)
}
Expand Down

0 comments on commit 6f5a378

Please sign in to comment.