Skip to content

Commit

Permalink
Misc fixes
Browse files Browse the repository at this point in the history
* Cleaner summing of scribe message size
* Remove unnecessary `GlobalSampler` dependency from `WriteQueue`,
  `WriteQueueWorker`

Author: @franklinhu
Fixes #40
URL: #40
  • Loading branch information
Franklin Hu committed Jun 26, 2012
1 parent 580c639 commit 8a8497d
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ
return TryLater
}

Stats.addMetric("scribe_size", logEntries.map(_.message).foldLeft(0)((size,str) => size + str.size))
Stats.addMetric("scribe_size", logEntries.map(_.message.size).sum)

if (logEntries.isEmpty) {
Stats.incr("collector.empty_logentry")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package com.twitter.zipkin.collector
import com.twitter.ostrich.admin.Service
import com.twitter.ostrich.stats.Stats
import com.twitter.zipkin.collector.processor.Processor
import com.twitter.zipkin.collector.sampler.GlobalSampler
import java.util.concurrent.ArrayBlockingQueue

class WriteQueue[T](writeQueueMaxSize: Int,
flusherPoolSize: Int,
processor: Processor[T],
sampler: GlobalSampler) extends Service {
processor: Processor[T]) extends Service {

private val queue = new ArrayBlockingQueue[T](writeQueueMaxSize)
Stats.addGauge("write_queue_qsize") { queue.size }
Expand All @@ -34,7 +32,7 @@ class WriteQueue[T](writeQueueMaxSize: Int,

def start() {
workers = (0 until flusherPoolSize).toSeq map { i: Int =>
val worker = new WriteQueueWorker[T](queue, processor, sampler)
val worker = new WriteQueueWorker[T](queue, processor)
worker.start()
worker
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import com.twitter.ostrich.admin.BackgroundProcess
import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.gen
import com.twitter.zipkin.collector.processor.Processor
import com.twitter.zipkin.collector.sampler.GlobalSampler
import java.util.concurrent.{TimeUnit, BlockingQueue}

class WriteQueueWorker[T](queue: BlockingQueue[T],
processor: Processor[T],
sample: GlobalSampler) extends BackgroundProcess("WriteQueueWorker", false) {
processor: Processor[T]) extends BackgroundProcess("WriteQueueWorker", false) {

private val log = Logger.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] {
)

def writeQueueConfig: WriteQueueConfig[T]
lazy val writeQueue: WriteQueue[T] = writeQueueConfig.apply(processor, globalSampler)
lazy val writeQueue: WriteQueue[T] = writeQueueConfig.apply(processor)

lazy val indexingFilter: IndexingFilter = new DefaultClientIndexingFilter

Expand All @@ -131,15 +131,15 @@ trait WriteQueueConfig[T] extends Config[WriteQueue[T]] {
var writeQueueMaxSize: Int = 500
var flusherPoolSize: Int = 10

def apply(processor: Processor[T], sampler: GlobalSampler): WriteQueue[T] = {
val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, processor, sampler)
def apply(processor: Processor[T]): WriteQueue[T] = {
val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, processor)
wq.start()
ServiceTracker.register(wq)
wq
}

def apply(): WriteQueue[T] = {
val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, new NullProcessor[T], new GlobalSampler{})
val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, new NullProcessor[T])
wq.start()
ServiceTracker.register(wq)
wq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.twitter.zipkin.collector

import com.twitter.zipkin.collector.processor.Processor
import com.twitter.zipkin.collector.sampler.GlobalSampler
import com.twitter.zipkin.common.{Annotation, Endpoint, Span}
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
Expand All @@ -28,9 +27,8 @@ class WriteQueueWorkerSpec extends Specification with JMocker with ClassMocker {
"hand off to processor" in {
val processor = mock[Processor[Span]]
val queue = mock[BlockingQueue[Span]]
val sampler = mock[GlobalSampler]

val w = new WriteQueueWorker[Span](queue, processor, sampler)
val w = new WriteQueueWorker[Span](queue, processor)
val span = Span(123, "boo", 456, None, List(Annotation(123, "value", Some(Endpoint(1,2,"service")))), Nil)

expect {
Expand Down

0 comments on commit 8a8497d

Please sign in to comment.