diff --git a/README.md b/README.md index 7aa75e2d251..eb15affd813 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,8 @@ Once the data is stored and indexed we need a way to extract it. This is where t ### UI Most of our users access the data via our UI. It's a Rails app that uses D3 to visualize the trace data. Note that there is no built in authentication in the UI. +## Modules +![Modules (doc/modules.png)](https://github.com/twitter/zipkin/raw/master/doc/modules.png) ## Installation @@ -122,9 +124,9 @@ Note that the above uses the Twitter version of Scribe with support for using Zo We've developed Zipkin with Scala 2.9.1, SBT 0.11.2, and JDK6. 1. `git clone https://github.com/twitter/zipkin.git` -1. `cd zipkin/zipkin-server` -1. `cp config/collector-dev.scala config/collector-prod.scala` -1. `cp config/query-dev.scala config/query-prod.scala` +1. `cd zipkin` +1. `cp zipkin-scribe/config/collector-dev.scala zipkin-scribe/config/collector-prod.scala` +1. `cp zipkin-server/config/query-dev.scala zipkin-server/config/query-prod.scala` 1. Modify the configs above as needed. Pay particular attention to ZooKeeper and Cassandra server entries. 1. `bin/sbt update package-dist` (This downloads SBT 0.11.2 if it doesn't already exist) 1. `scp dist/zipkin*.zip [server]` @@ -133,6 +135,12 @@ We've developed Zipkin with Scala 1. `mkdir -p /var/log/zipkin` 1. Start the collector and query daemon. +You can also run the collector and query services through SBT. + +To run the Scribe collector service: `bin/sbt 'project zipkin-scribe' 'run -f zipkin-scribe/config/collector-dev.scala'` + +To run the query service: `bin/sbt 'project zipkin-server' 'run -f zipkin-server/config/query-dev.scala'` + ### Zipkin UI The UI is a standard Rails 3 app. diff --git a/doc/modules.png b/doc/modules.png new file mode 100644 index 00000000000..5317aacd1f7 Binary files /dev/null and b/doc/modules.png differ diff --git a/zipkin-scribe/config/collector-dev.scala b/zipkin-scribe/config/collector-dev.scala index 0b304aa5255..3a3f82cd8ec 100644 --- a/zipkin-scribe/config/collector-dev.scala +++ b/zipkin-scribe/config/collector-dev.scala @@ -35,7 +35,7 @@ new ScribeZipkinCollectorConfig { ) :: new TimeSeriesCollectorFactory ) - def writeQueueConfig = new WriteQueueConfig { + def writeQueueConfig = new WriteQueueConfig[T] { writeQueueMaxSize = 500 flusherPoolSize = 10 } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/ResilientZKNode.scala b/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ResilientZKNode.scala similarity index 100% rename from zipkin-server/src/main/scala/com/twitter/zipkin/collector/ResilientZKNode.scala rename to zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ResilientZKNode.scala diff --git a/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala b/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala index 10894585a84..84dfe431023 100644 --- a/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala +++ b/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala @@ -27,7 +27,7 @@ import org.apache.zookeeper.KeeperException /** * This class implements the log method from the Scribe Thrift interface. */ -class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue, categories: Set[String]) +class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue[Seq[_ <: gen.LogEntry]], categories: Set[String]) extends gen.ZipkinCollector.FutureIface with CollectorService { private val log = Logger.get @@ -81,15 +81,15 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ return Ok } - val scribeMessages = logEntries.flatMap { + val scribeMessages = logEntries.filter { entry => if (!categories.contains(entry.category.toLowerCase())) { Stats.incr("collector.invalid_category") - None + false } else { - Some(entry.message) + true } - }.toList + } if (scribeMessages.isEmpty) { Ok diff --git a/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilter.scala b/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilter.scala new file mode 100644 index 00000000000..257fac36051 --- /dev/null +++ b/zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilter.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import com.twitter.logging.Logger +import com.twitter.ostrich.stats.Stats +import com.twitter.scrooge.BinaryThriftStructSerializer +import com.twitter.zipkin.adapter.ThriftAdapter +import com.twitter.zipkin.common.Span +import com.twitter.zipkin.gen + +/** + * Transforms a `Seq[gen.LogEntry]` to `Seq[Span]` for a collector service to consume. + * Assumes: + * - the Scribe struct contains a `message` that is the Base64 encoded Thrift Span struct. + * - the sequence of `LogEntry`s only contains messages we want to pass on (already filtered + * by category) + */ +class ScribeProcessorFilter extends ProcessorFilter[Seq[gen.LogEntry], Seq[Span]] { + + private val log = Logger.get + + val deserializer = new BinaryThriftStructSerializer[gen.Span] { + def codec = gen.Span + } + + def apply(logEntries: Seq[gen.LogEntry]): Seq[Span] = { + logEntries.map { + _.`message` + }.flatMap { + msg => + try { + val span = Stats.time("deserializeSpan") { + deserializer.fromString(msg) + } + log.ifDebug("Processing span: " + span + " from " + msg) + Some(ThriftAdapter(span)) + } catch { + case e: Exception => { + // scribe doesn't have any ResultCode.ERROR or similar + // let's just swallow this invalid msg + log.warning(e, "Invalid msg: %s", msg) + Stats.incr("collector.invalid_msg") + None + } + } + } + } +} diff --git a/zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala b/zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala index 5259a6a06cd..3a86961e629 100644 --- a/zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala +++ b/zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala @@ -1,7 +1,28 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package com.twitter.zipkin.config +import com.twitter.zipkin.collector.processor.ScribeProcessorFilter import com.twitter.zipkin.config.collector.CollectorServerConfig +import com.twitter.zipkin.gen trait ScribeZipkinCollectorConfig extends ZipkinCollectorConfig { + type T = Seq[gen.LogEntry] val serverConfig: CollectorServerConfig = new ScribeCollectorServerConfig(this) + + def rawDataFilter = new ScribeProcessorFilter } diff --git a/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala b/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala index 3b90dcba9af..fb9ea0f9edf 100644 --- a/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala +++ b/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala @@ -30,15 +30,14 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo val serializer = new BinaryThriftStructSerializer[gen.Span] { def codec = gen.Span } + val category = "zipkin" val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil) - val validList = List(gen.LogEntry("b3", serializer.toString(ThriftAdapter(validSpan)))) + val validList = List(gen.LogEntry(category, serializer.toString(ThriftAdapter(validSpan)))) val wrongCatList = List(gen.LogEntry("wrongcat", serializer.toString(ThriftAdapter(validSpan)))) - val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA=" - - val queue = mock[WriteQueue] + val queue = mock[WriteQueue[Seq[gen.LogEntry]]] val zkSampleRateConfig = mock[AdjustableRateConfig] val config = new ScribeZipkinCollectorConfig { @@ -52,7 +51,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo override lazy val sampleRateConfig = zkSampleRateConfig } - def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set("b3")) { + def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set(category)) { running = true } @@ -61,7 +60,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo val cs = scribeCollectorService expect { - one(queue).add(List(base64)) willReturn(true) + one(queue).add(validList) willReturn(true) } gen.ResultCode.Ok mustEqual cs.log(validList)() @@ -71,7 +70,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo val cs = scribeCollectorService expect { - one(queue).add(List(base64)) willReturn(false) + one(queue).add(validList) willReturn(false) } gen.ResultCode.TryLater mustEqual cs.log(validList)() diff --git a/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilterSpec.scala b/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilterSpec.scala new file mode 100644 index 00000000000..81241a77680 --- /dev/null +++ b/zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilterSpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.zipkin.collector.processor + +import com.twitter.scrooge.BinaryThriftStructSerializer +import com.twitter.zipkin.adapter.ThriftAdapter +import com.twitter.zipkin.common.{Annotation, Span} +import com.twitter.zipkin.gen +import org.specs.Specification + +class ScribeProcessorFilterSpec extends Specification { + val serializer = new BinaryThriftStructSerializer[gen.Span] { + def codec = gen.Span + } + + "ScribeProcessorFilter" should { + val category = "zipkin" + + val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA=" + + val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil) + val serialized = serializer.toString(ThriftAdapter(validSpan)) + + val base64LogEntries = Seq(gen.LogEntry(category, base64)) + val serializedLogEntries = Seq(gen.LogEntry(category, serialized)) + + val badLogEntries = Seq(gen.LogEntry(category, "garbage!")) + val filter = new ScribeProcessorFilter + + "convert gen.LogEntry to Span" in { + filter.apply(base64LogEntries) mustEqual Seq(validSpan) + } + + "convert serialized thrift to Span" in { + filter.apply(serializedLogEntries) mustEqual Seq(validSpan) + } + + "deal with garbage" in { + filter.apply(badLogEntries) mustEqual Seq.empty[Span] + } + } +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala index 6a9780e0a16..8827c6e18f7 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala @@ -18,8 +18,7 @@ package com.twitter.zipkin.collector import com.twitter.ostrich.admin.Service trait CollectorService extends Service { - - val writeQueue: WriteQueue + val writeQueue: WriteQueue[_ <: AnyRef] @volatile var running = false @@ -29,7 +28,6 @@ trait CollectorService extends Service { def shutdown() { running = false - writeQueue.flushAll() writeQueue.shutdown() } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala index 4464df9a6c5..c1c82c4c6a9 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala @@ -17,45 +17,53 @@ package com.twitter.zipkin.collector import com.twitter.ostrich.admin.Service -import java.util.concurrent.{Executors, ArrayBlockingQueue} import com.twitter.ostrich.stats.Stats -import processor.Processor -import sampler.GlobalSampler +import com.twitter.zipkin.collector.processor.Processor +import com.twitter.zipkin.collector.sampler.GlobalSampler +import java.util.concurrent.ArrayBlockingQueue -class WriteQueue(writeQueueMaxSize: Int, +class WriteQueue[T](writeQueueMaxSize: Int, flusherPoolSize: Int, - processors: Seq[Processor], + processor: Processor[T], sampler: GlobalSampler) extends Service { - private val queue = new ArrayBlockingQueue[List[String]](writeQueueMaxSize) + private val queue = new ArrayBlockingQueue[T](writeQueueMaxSize) Stats.addGauge("write_queue_qsize") { queue.size } - private var workers: Seq[WriteQueueWorker] = Seq() + private var workers: Seq[WriteQueueWorker[T]] = Seq() + @volatile var running: Boolean = false def start() { workers = (0 until flusherPoolSize).toSeq map { i: Int => - val worker = new WriteQueueWorker(queue, processors, sampler) + val worker = new WriteQueueWorker[T](queue, processor, sampler) worker.start() worker } + running = true } /** * Will block until all entries in queue have been flushed. * Assumes now new entries will be added to queue. */ - def flushAll() { + private def flushAll() { while(!queue.isEmpty) { Thread.sleep(100) } } def shutdown() { + running = false + flushAll() workers foreach { _.stop() } workers foreach { _.shutdown() } - processors.foreach {_.shutdown()} + processor.shutdown() } - def add(messages: List[String]): Boolean = { - queue.offer(messages) + def add(messages: T): Boolean = { + if (running) { + queue.offer(messages) + } else { + false + } } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala index 9e7f4f47d71..d90ea1820d7 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala @@ -16,69 +16,30 @@ */ package com.twitter.zipkin.collector -import com.twitter.zipkin.gen -import com.twitter.zipkin.common.Span import com.twitter.logging.Logger -import com.twitter.util.Future -import processor.Processor -import sampler.GlobalSampler -import com.twitter.ostrich.stats.Stats -import com.twitter.scrooge.BinaryThriftStructSerializer import com.twitter.ostrich.admin.BackgroundProcess +import com.twitter.scrooge.BinaryThriftStructSerializer +import com.twitter.zipkin.gen +import com.twitter.zipkin.collector.processor.Processor +import com.twitter.zipkin.collector.sampler.GlobalSampler import java.util.concurrent.{TimeUnit, BlockingQueue} -import com.twitter.zipkin.adapter.ThriftAdapter -class WriteQueueWorker(queue: BlockingQueue[List[String]], - processors: Seq[Processor], +class WriteQueueWorker[T](queue: BlockingQueue[T], + processor: Processor[T], sample: GlobalSampler) extends BackgroundProcess("WriteQueueWorker", false) { private val log = Logger.get val deserializer = new BinaryThriftStructSerializer[gen.Span] { def codec = gen.Span } - def runLoop() = { + def runLoop() { val item = queue.poll(500, TimeUnit.MILLISECONDS) - if (item ne null) { - item foreach (processScribeMessage(_)) + if (item != null) { + process(item) } } - def processScribeMessage(msg: String) { - try { - val span = Stats.time("deserializeSpan") { deserializer.fromString(msg) } - log.ifDebug("Processing span: " + span + " from " + msg) - processSpan(ThriftAdapter(span)) - } catch { - case e: Exception => { - // scribe doesn't have any ResultCode.ERROR or similar - // let's just swallow this invalid msg - log.warning(e, "Invalid msg: %s", msg) - Stats.incr("collector.invalid_msg") - } - } - } - - def processSpan(span: Span) { - try { - span.serviceNames.foreach { name => Stats.incr("received_" + name) } - - // check if we want to store this particular trace or not - if (sample(span.traceId)) { - Stats.time("processSpan") { - span.serviceNames.foreach { name => Stats.incr("process_" + name) } - Future.join { - processors map { _.processSpan(span) } - } onSuccess { e => - Stats.incr("collector.processSpan_success") - } onFailure { e => - Stats.incr("collector.processSpan_failed") - } - } - } - } catch { - case e: Exception => - log.error(e, "Processing of " + span + " failed %s", e) - Stats.incr("collector.invalid_msg") - } + private[collector] def process(item: T) { + processor.process(item) } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/FanoutProcessor.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/FanoutProcessor.scala new file mode 100644 index 00000000000..ef721712727 --- /dev/null +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/FanoutProcessor.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import com.twitter.util.Future + +/** + * Fans out a single item to a set of `Processor`s + * @param processors + * @tparam T + */ +class FanoutProcessor[T](processors: Seq[Processor[T]]) extends Processor[T] { + def process(item: T): Future[Unit] = { + Future.join { + processors map { _.process(item) } + } + } + + def shutdown() { + processors.foreach { _.shutdown() } + } +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala index 7359ac2b22b..fef0035abce 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala @@ -23,9 +23,9 @@ import com.twitter.util.Future /** * Index the incoming spans. */ -class IndexProcessor(index: Index, indexFilter: IndexingFilter) extends Processor { +class IndexProcessor(index: Index, indexFilter: IndexingFilter) extends Processor[Span] { - def processSpan(span: Span) = { + def process(span: Span) = { if (indexFilter.shouldIndex(span)) { Future.join(Seq { index.indexTraceIdByServiceAndName(span) onFailure failureHandler("indexTraceIdByServiceAndName") diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala index 653aa51a9f4..80065c37fce 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala @@ -26,9 +26,9 @@ import com.twitter.util.Future * Adds server side duration data to ostrich, which * in turn can be sent to a monitoring system where it can be queried. */ -class OstrichProcessor(serviceStatsPrefix: String) extends Processor { +class OstrichProcessor(serviceStatsPrefix: String) extends Processor[Span] { - def processSpan(span: Span): Future[Unit] = { + def process(span: Span): Future[Unit] = { for { start <- span.getAnnotation(gen.Constants.SERVER_RECV) end <- span.getAnnotation(gen.Constants.SERVER_SEND) diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala index 9e6bfedc4a0..dbeac8787cd 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala @@ -17,24 +17,23 @@ package com.twitter.zipkin.collector.processor * */ -import com.twitter.zipkin.common.Span import com.twitter.finagle.TooManyWaitersException import com.twitter.logging.Logger import com.twitter.ostrich.stats.Stats import com.twitter.util.Future /** - * A processor that takes a span and processes it some way. + * A processor that takes an item (Span) and processes it some way. * Could be: storing it or aggregating the data in some way for example. */ -trait Processor { +trait Processor[T] { private val log = Logger.get /** - * Process the span. + * Process the item. */ - def processSpan(span: Span): Future[Unit] + def process(item: T): Future[Unit] /** * Shut down this processor @@ -48,4 +47,26 @@ trait Processor { log.error(e, method) } } -} \ No newline at end of file +} + +class NullProcessor[T] extends Processor[T] { + def process(item: T): Future[Unit] = Future.Unit + def shutdown() {} +} + +/** + * Processes a sequence of items + * @param processor + * @tparam T + */ +class SequenceProcessor[T](processor: Processor[T]) extends Processor[Seq[T]] { + def process(items: Seq[T]): Future[Unit] = { + Future.join { + items.map { + processor.process(_) + } + } + } + + def shutdown() { processor.shutdown() } +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/ProcessorFilter.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/ProcessorFilter.scala new file mode 100644 index 00000000000..eadbf30209f --- /dev/null +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/ProcessorFilter.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import com.twitter.util.Future + +/** + * `ProcessFilter`s are filters that can be composed on top of `Processor`s to transform + * items between data types. + * @tparam T input data type + * @tparam U output data type + */ +trait ProcessorFilter[T,U] { + + def andThen[V](next: ProcessorFilter[U,V]): ProcessorFilter[T,V] = + new ProcessorFilter[T,V] { + def apply(item: T): V = { + next.apply { + ProcessorFilter.this.apply(item) + } + } + } + + def andThen(processor: Processor[U]): Processor[T] = new Processor[T] { + def process(item: T): Future[Unit] = { + processor.process { + ProcessorFilter.this.apply(item) + } + } + + def shutdown() { processor.shutdown() } + } + + def apply(item: T): U +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/SamplerProcessorFilter.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/SamplerProcessorFilter.scala new file mode 100644 index 00000000000..0d763685e45 --- /dev/null +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/SamplerProcessorFilter.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import com.twitter.ostrich.stats.Stats +import com.twitter.zipkin.collector.sampler.GlobalSampler +import com.twitter.zipkin.common.Span + +/** + * Filters out `Span`s that do not meet a `GlobalSampler`'s criteria + * @param sampler + */ +class SamplerProcessorFilter(sampler: GlobalSampler) extends ProcessorFilter[Seq[Span], Seq[Span]] { + def apply(spans: Seq[Span]): Seq[Span] = { + spans.flatMap { span => + span.serviceNames.foreach { name => Stats.incr("received_" + name) } + + if (sampler(span.traceId)) { + Some(span) + } else { + None + } + } + } +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala index e591ebf5d4c..69a559e25fe 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala @@ -17,14 +17,13 @@ package com.twitter.zipkin.collector.processor import com.twitter.zipkin.common.Span import com.twitter.zipkin.storage.Storage -import com.twitter.logging.Logger /** * Store the incoming span in the storage system. */ -class StorageProcessor(storage: Storage) extends Processor { +class StorageProcessor(storage: Storage) extends Processor[Span] { - def processSpan(span: Span) = + def process(span: Span) = storage.storeSpan(span) onFailure failureHandler("storeSpan") def shutdown() = storage.close() diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala index fe950a58798..b8c108bdd6f 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala @@ -18,7 +18,6 @@ package com.twitter.zipkin.config import com.twitter.zipkin.storage.{Index, Storage} import com.twitter.zipkin.collector.{WriteQueue, ZipkinCollector} import com.twitter.zipkin.collector.filter.{DefaultClientIndexingFilter, IndexingFilter} -import com.twitter.zipkin.collector.processor.{OstrichProcessor, IndexProcessor, StorageProcessor, Processor} import com.twitter.zipkin.collector.sampler.{AdaptiveSampler, ZooKeeperGlobalSampler, GlobalSampler} import com.twitter.zipkin.config.collector.CollectorServerConfig import com.twitter.zipkin.config.sampler._ @@ -32,6 +31,8 @@ import java.net.{InetAddress, InetSocketAddress} import org.apache.zookeeper.ZooDefs.Ids import scala.collection.JavaConverters._ import scala.collection.Set +import com.twitter.zipkin.collector.processor._ +import com.twitter.zipkin.common.Span trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] { @@ -93,14 +94,27 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] { def globalSampler: GlobalSampler = new ZooKeeperGlobalSampler(sampleRateConfig) - lazy val processors: Seq[Processor] = { - new StorageProcessor(storage) :: - new IndexProcessor(index, indexingFilter) :: - new OstrichProcessor(serviceStatsPrefix) - } - - def writeQueueConfig: WriteQueueConfig - lazy val writeQueue: WriteQueue = writeQueueConfig.apply(processors, globalSampler) + /** + * To accommodate a particular input type `T`, define a `rawDataFilter` that + * converts the input data type (ex: Scrooge-generated Thrift) into a + * sequence of `com.twitter.zipkin.common.Span`s + */ + type T + def rawDataFilter: ProcessorFilter[T, Seq[Span]] + + lazy val processor: Processor[T] = + rawDataFilter andThen + new SamplerProcessorFilter(globalSampler) andThen + new SequenceProcessor[Span]( + new FanoutProcessor[Span]({ + new StorageProcessor(storage) :: + new IndexProcessor(index, indexingFilter) :: + new OstrichProcessor(serviceStatsPrefix) + }) + ) + + def writeQueueConfig: WriteQueueConfig[T] + lazy val writeQueue: WriteQueue[T] = writeQueueConfig.apply(processor, globalSampler) lazy val indexingFilter: IndexingFilter = new DefaultClientIndexingFilter @@ -113,20 +127,20 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] { } } -trait WriteQueueConfig extends Config[WriteQueue] { +trait WriteQueueConfig[T] extends Config[WriteQueue[T]] { var writeQueueMaxSize: Int = 500 var flusherPoolSize: Int = 10 - def apply(processors: Seq[Processor], sampler: GlobalSampler): WriteQueue = { - val wq = new WriteQueue(writeQueueMaxSize, flusherPoolSize, processors, sampler) + def apply(processor: Processor[T], sampler: GlobalSampler): WriteQueue[T] = { + val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, processor, sampler) wq.start() ServiceTracker.register(wq) wq } - def apply(): WriteQueue = { - val wq = new WriteQueue(writeQueueMaxSize, flusherPoolSize, Seq.empty, new GlobalSampler{}) + def apply(): WriteQueue[T] = { + val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, new NullProcessor[T], new GlobalSampler{}) wq.start() ServiceTracker.register(wq) wq diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala index 147e51f34b7..1d758e72845 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala @@ -16,44 +16,27 @@ */ package com.twitter.zipkin.collector +import com.twitter.zipkin.collector.processor.Processor +import com.twitter.zipkin.collector.sampler.GlobalSampler +import com.twitter.zipkin.common.{Annotation, Endpoint, Span} import org.specs.Specification import org.specs.mock.{ClassMocker, JMocker} -import processor.Processor -import sampler.GlobalSampler -import scala.collection._ -import com.twitter.zipkin.common.{Annotation, Endpoint, Span} import java.util.concurrent.BlockingQueue class WriteQueueWorkerSpec extends Specification with JMocker with ClassMocker { "WriteQueueWorker" should { - "sample" in { + "hand off to processor" in { + val processor = mock[Processor[Span]] + val queue = mock[BlockingQueue[Span]] val sampler = mock[GlobalSampler] - val processor = mock[Processor] - val queue = mock[BlockingQueue[List[String]]] - val w = new WriteQueueWorker(queue, Seq(processor), sampler) + val w = new WriteQueueWorker[Span](queue, processor, sampler) val span = Span(123, "boo", 456, None, List(Annotation(123, "value", Some(Endpoint(1,2,"service")))), Nil) expect { - one(sampler).apply(123L) willReturn(true) - one(processor).processSpan(span) + one(processor).process(span) } - - w.processSpan(span) - } - - "deserialize garbage" in { - val garbage = "garbage!" - val sampler = mock[GlobalSampler] - val processor = mock[Processor] - - val w = new WriteQueueWorker(null, Seq(processor), sampler) - - expect { - never(processor).processSpan(any) - } - - w.processScribeMessage(garbage) + w.process(span) } } } diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/FanoutProcessorSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/FanoutProcessorSpec.scala new file mode 100644 index 00000000000..c6665973f99 --- /dev/null +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/FanoutProcessorSpec.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import org.specs.Specification +import org.specs.mock.{JMocker, ClassMocker} + +class FanoutProcessorSpec extends Specification with JMocker with ClassMocker { + "FanoutProcessor" should { + "fanout" in { + val proc1 = mock[Processor[Int]] + val proc2 = mock[Processor[Int]] + + val fanout = new FanoutProcessor[Int](Seq(proc1, proc2)) + val item = 1 + + expect { + one(proc1).process(item) + one(proc2).process(item) + } + + fanout.process(item) + } + } +} diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala index 5be5f63f9ef..18af683722c 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala @@ -39,7 +39,7 @@ class OstrichProcessorSpec extends Specification { val span = Span(12345, "methodcall", 666, None, List(annotation1, annotation2, annotation3), Nil) - agg.processSpan(span) + agg.process(span) Stats.getMetrics()(prefix + "service") mustEqual distribution @@ -55,7 +55,7 @@ class OstrichProcessorSpec extends Specification { val span = Span(12345, "methodcall", 666, None, List(annotation1, annotation2, annotation3), Nil) - agg.processSpan(span) + agg.process(span) Stats.getMetrics()(prefix + "service") mustNotBe distribution Stats.getMetrics()(prefix + "service.methodcall") mustNotBe distribution diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/ProcessorFilterSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/ProcessorFilterSpec.scala new file mode 100644 index 00000000000..ee13afa927d --- /dev/null +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/ProcessorFilterSpec.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.processor + +import org.specs.mock.{JMocker, ClassMocker} +import org.specs.Specification + +class ProcessorFilterSpec extends Specification with JMocker with ClassMocker { + "ProcessorFilter" should { + "compose" in { + val filter1 = new ProcessorFilter[Int, Double] { + def apply(item: Int) = (item + 1).toDouble + } + val filter2 = new ProcessorFilter[Double, Long] { + def apply(item: Double) = (item + 1).toLong + } + + "with other filter" in { + val composed = filter1 andThen filter2 + val item = 1 + val expected = 3L + + val actual = composed.apply(item) + actual mustEqual expected + } + + "with Processor" in { + val proc = mock[Processor[Long]] + val composed = filter1 andThen filter2 andThen proc + + val item = 1 + val procExpected = 3L + + expect { + one(proc).process(procExpected) + } + + composed.process(item) + } + } + } +} diff --git a/zipkin-test/src/test/resources/TestCollector.scala b/zipkin-test/src/test/resources/TestCollector.scala index 0ae21e99b18..26891e26f32 100644 --- a/zipkin-test/src/test/resources/TestCollector.scala +++ b/zipkin-test/src/test/resources/TestCollector.scala @@ -37,7 +37,7 @@ new ScribeZipkinCollectorConfig { ) :: new TimeSeriesCollectorFactory ) - def writeQueueConfig = new WriteQueueConfig { + def writeQueueConfig = new WriteQueueConfig[T] { writeQueueMaxSize = 500 flusherPoolSize = 10 }