Skip to content

Commit

Permalink
Ensure that we can receive Scribe messages with endline at the end
Browse files Browse the repository at this point in the history
Some codestyle changes and removed a few lines of unused code in my quest to
find the right code to test :)

Author: @johanoskarsson
Fixes #117
URL: #117
  • Loading branch information
johanoskarsson committed Aug 23, 2012
1 parent 03247bb commit f6fbc6a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,22 @@ class ScribeProcessorFilter extends ProcessorFilter[Seq[String], Seq[Span]] {
}

def apply(logEntries: Seq[String]): Seq[Span] = {
logEntries.flatMap {
msg =>
try {
val span = Stats.time("deserializeSpan") {
deserializer.fromString(msg)
}
log.ifDebug("Processing span: " + span + " from " + msg)
Some(ThriftAdapter(span))
} catch {
case e: Exception => {
// scribe doesn't have any ResultCode.ERROR or similar
// let's just swallow this invalid msg
log.warning(e, "Invalid msg: %s", msg)
Stats.incr("collector.invalid_msg")
None
}
logEntries.flatMap { msg =>
try {
val span = Stats.time("deserializeSpan") {
deserializer.fromString(msg)
}
log.ifDebug("Processing span: " + span + " from " + msg)
Some(ThriftAdapter(span))
} catch {
case e: Exception => {
// scribe doesn't have any ResultCode.ERROR or similar
// let's just swallow this invalid msg
log.warning(e, "Invalid msg: %s", msg)
Stats.incr("collector.invalid_msg")
None
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ScribeProcessorFilterSpec extends Specification {
val category = "zipkin"

val base64 = Seq("CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA=")
val endline = Seq("CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA=\n")

val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
val serialized = Seq(serializer.toString(ThriftAdapter(validSpan)))
Expand All @@ -41,6 +42,10 @@ class ScribeProcessorFilterSpec extends Specification {
filter.apply(base64) mustEqual Seq(validSpan)
}

"convert gen.LogEntry with endline to Span" in {
filter.apply(endline) mustEqual Seq(validSpan)
}

"convert serialized thrift to Span" in {
filter.apply(serialized) mustEqual Seq(validSpan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@
*/
package com.twitter.zipkin.collector

import com.twitter.logging.Logger
import com.twitter.ostrich.admin.BackgroundProcess
import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.gen
import com.twitter.zipkin.collector.processor.Processor
import java.util.concurrent.{TimeUnit, BlockingQueue}

class WriteQueueWorker[T](queue: BlockingQueue[T],
processor: Processor[T]) extends BackgroundProcess("WriteQueueWorker", false) {

private val log = Logger.get

val deserializer = new BinaryThriftStructSerializer[gen.Span] { def codec = gen.Span }

def runLoop() {
val item = queue.poll(500, TimeUnit.MILLISECONDS)
if (item != null) {
Expand Down

0 comments on commit f6fbc6a

Please sign in to comment.