Skip to content

Commit

Permalink
Pull out msg from Thrift struct in ScribeCollectorService
Browse files Browse the repository at this point in the history
Move code that pulls out the base64 encoded Span from the Thrift struct back to
the CollectorService since putting the entire Thrift struct in the write queue
causes major GC regression.

Author: @franklinhu
Pull Request: #34
URL: #34
  • Loading branch information
Franklin Hu committed Jun 15, 2012
1 parent e54562e commit 5feb416
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.zookeeper.KeeperException
/**
* This class implements the log method from the Scribe Thrift interface.
*/
class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue[Seq[_ <: gen.LogEntry]], categories: Set[String])
class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue[Seq[_ <: String]], categories: Set[String])
extends gen.ZipkinCollector.FutureIface with CollectorService {
private val log = Logger.get

Expand Down Expand Up @@ -81,13 +81,13 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ
return Ok
}

val scribeMessages = logEntries.filter {
val scribeMessages = logEntries.flatMap {
entry =>
if (!categories.contains(entry.category.toLowerCase())) {
Stats.incr("collector.invalid_category")
false
None
} else {
true
Some(entry.`message`)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ import com.twitter.zipkin.gen
* - the sequence of `LogEntry`s only contains messages we want to pass on (already filtered
* by category)
*/
class ScribeProcessorFilter extends ProcessorFilter[Seq[gen.LogEntry], Seq[Span]] {
class ScribeProcessorFilter extends ProcessorFilter[Seq[String], Seq[Span]] {

private val log = Logger.get

val deserializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}

def apply(logEntries: Seq[gen.LogEntry]): Seq[Span] = {
logEntries.map {
_.`message`
}.flatMap {
def apply(logEntries: Seq[String]): Seq[Span] = {
logEntries.flatMap {
msg =>
try {
val span = Stats.time("deserializeSpan") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.twitter.zipkin.config.collector.CollectorServerConfig
import com.twitter.zipkin.gen

trait ScribeZipkinCollectorConfig extends ZipkinCollectorConfig {
type T = Seq[gen.LogEntry]
type T = Seq[String]
val serverConfig: CollectorServerConfig = new ScribeCollectorServerConfig(this)

def rawDataFilter = new ScribeProcessorFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo

val wrongCatList = List(gen.LogEntry("wrongcat", serializer.toString(ThriftAdapter(validSpan))))

val queue = mock[WriteQueue[Seq[gen.LogEntry]]]
val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="

val queue = mock[WriteQueue[Seq[String]]]
val zkSampleRateConfig = mock[AdjustableRateConfig]

val config = new ScribeZipkinCollectorConfig {
Expand All @@ -60,7 +62,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService

expect {
one(queue).add(validList) willReturn(true)
one(queue).add(List(base64)) willReturn(true)
}

gen.ResultCode.Ok mustEqual cs.log(validList)()
Expand All @@ -70,7 +72,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService

expect {
one(queue).add(validList) willReturn(false)
one(queue).add(List(base64)) willReturn(false)
}

gen.ResultCode.TryLater mustEqual cs.log(validList)()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,24 @@ class ScribeProcessorFilterSpec extends Specification {
"ScribeProcessorFilter" should {
val category = "zipkin"

val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="
val base64 = Seq("CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA=")

val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
val serialized = serializer.toString(ThriftAdapter(validSpan))
val serialized = Seq(serializer.toString(ThriftAdapter(validSpan)))
val bad = Seq("garbage!")

val base64LogEntries = Seq(gen.LogEntry(category, base64))
val serializedLogEntries = Seq(gen.LogEntry(category, serialized))

val badLogEntries = Seq(gen.LogEntry(category, "garbage!"))
val filter = new ScribeProcessorFilter

"convert gen.LogEntry to Span" in {
filter.apply(base64LogEntries) mustEqual Seq(validSpan)
filter.apply(base64) mustEqual Seq(validSpan)
}

"convert serialized thrift to Span" in {
filter.apply(serializedLogEntries) mustEqual Seq(validSpan)
filter.apply(serialized) mustEqual Seq(validSpan)
}

"deal with garbage" in {
filter.apply(badLogEntries) mustEqual Seq.empty[Span]
filter.apply(bad) mustEqual Seq.empty[Span]
}
}
}

0 comments on commit 5feb416

Please sign in to comment.