Skip to content

Commit

Permalink
Top annotations
Browse files Browse the repository at this point in the history
Add collector and query support for adding "top annotation" data that may be
computed offline

* New collector endpoints
  * `storeTopAnnotations`
  * `storeTopKeyValueAnnotations`
* New query endpoints
  * `getTopAnnotations`
  * `getTopKeyValueAnnotations`

Author: @franklinhu
Fixes #46
URL: #46
  • Loading branch information
Franklin Hu committed Jun 26, 2012
1 parent 8a8497d commit 3e15fe1
Show file tree
Hide file tree
Showing 21 changed files with 503 additions and 29 deletions.
4 changes: 4 additions & 0 deletions zipkin-scribe/config/collector-dev.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ new ScribeZipkinCollectorConfig {
def cassandraConfig = _cassandraConfig
}

def aggregatesConfig = new CassandraAggregatesConfig {
def cassandraConfig = _cassandraConfig
}

override def adaptiveSamplerConfig = new NullAdaptiveSamplerConfig {}

def zkConfig = new ZooKeeperConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,34 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ
}
}

def storeTopAnnotations(serviceName: String, annotations: Seq[String]): Future[Unit] = {
Stats.incr("collector.storeTopAnnotations")
log.info("storeTopAnnotations: " + serviceName + "; " + annotations)

Stats.timeFutureMillis("collector.storeTopAnnotations") {
config.aggregates.storeTopAnnotations(serviceName, annotations)
} rescue {
case e: Exception =>
log.error(e, "storeTopAnnotations failed")
Stats.incr("collector.storeTopAnnotations")
Future.exception(gen.AdjustableRateException(e.toString))
}
}

def storeTopKeyValueAnnotations(serviceName: String, annotations: Seq[String]): Future[Unit] = {
Stats.incr("collector.storeTopKeyValueAnnotations")
log.info("storeTopKeyValueAnnotations: " + serviceName + ";" + annotations)

Stats.timeFutureMillis("collector.storeTopKeyValueAnnotations") {
config.aggregates.storeTopKeyValueAnnotations(serviceName, annotations)
} rescue {
case e: Exception =>
log.error(e, "storeTopKeyValueAnnotations failed")
Stats.incr("collector.storeTopKeyValueAnnotations")
Future.exception(gen.AdjustableRateException(e.toString))
}
}

@throws(classOf[gen.AdjustableRateException])
def setSampleRate(sampleRate: Double): Future[Unit] = {
Stats.incr("collector.set_sample_rate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.twitter.zipkin.config.sampler.AdjustableRateConfig
import com.twitter.zipkin.config.ScribeZipkinCollectorConfig
import com.twitter.zipkin.gen
import com.twitter.zipkin.adapter.ThriftAdapter
import com.twitter.zipkin.storage.Aggregates
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}

Expand All @@ -41,16 +42,19 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo

val queue = mock[WriteQueue[Seq[String]]]
val zkSampleRateConfig = mock[AdjustableRateConfig]
val mockAggregates = mock[Aggregates]

val config = new ScribeZipkinCollectorConfig {
def writeQueueConfig = null
def zkConfig = null
def indexConfig = null
def storageConfig = null
def aggregatesConfig = null
def methodConfig = null

override lazy val writeQueue = queue
override lazy val sampleRateConfig = zkSampleRateConfig
override lazy val aggregates = mockAggregates
}

def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set(category)) {
Expand Down Expand Up @@ -114,5 +118,30 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val actual = cs.setSampleRate(sampleRate)
actual() mustEqual expected()
}

"store aggregates" in {
val serviceName = "mockingbird"
val annotations = Seq("a" , "b", "c")

"store top annotations" in {
val cs = scribeCollectorService

expect {
one(mockAggregates).storeTopAnnotations(serviceName, annotations)
}

cs.storeTopAnnotations(serviceName, annotations)
}

"store top key value annotations" in {
val cs = scribeCollectorService

expect {
one(mockAggregates).storeTopKeyValueAnnotations(serviceName, annotations)
}

cs.storeTopKeyValueAnnotations(serviceName, annotations)
}
}
}
}
4 changes: 4 additions & 0 deletions zipkin-server/config/query-dev.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ new ZipkinQueryConfig {
def cassandraConfig = _cassandraConfig
}

def aggregatesConfig = new CassandraAggregatesConfig {
def cassandraConfig = _cassandraConfig
}

def zkConfig = new ZooKeeperConfig {
servers = List("localhost:2181")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.zipkin.config

import com.twitter.util.Config
import com.twitter.zipkin.storage.{NullAggregates, Aggregates}

trait AggregatesConfig extends Config[Aggregates]

class NullAggregatesConfig extends AggregatesConfig {
def apply(): Aggregates = new NullAggregates
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.zipkin.config

import com.twitter.zipkin.storage.cassandra.CassandraAggregates
import com.twitter.cassie.codecs.{LongCodec, Utf8Codec}
import com.twitter.cassie.{ColumnFamily, ReadConsistency, WriteConsistency}

trait CassandraAggregatesConfig extends AggregatesConfig { self =>

def cassandraConfig: CassandraConfig
var topAnnotationsCf: String = "TopAnnotations"

def apply(): CassandraAggregates = {
val _topAnnotations = cassandraConfig.keyspace.columnFamily[String, Long, String](
topAnnotationsCf,Utf8Codec, LongCodec, Utf8Codec
).consistency(WriteConsistency.One).consistency(ReadConsistency.One)

new CassandraAggregates {
val topAnnotations: ColumnFamily[String, Long, String] = _topAnnotations
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.twitter.zipkin.config

import com.twitter.zipkin.storage.{Index, Storage}
import com.twitter.zipkin.storage.{Aggregates, Index, Storage}
import com.twitter.zipkin.collector.{WriteQueue, ZipkinCollector}
import com.twitter.zipkin.collector.filter.{DefaultClientIndexingFilter, IndexingFilter}
import com.twitter.zipkin.collector.sampler.{AdaptiveSampler, ZooKeeperGlobalSampler, GlobalSampler}
Expand Down Expand Up @@ -62,6 +62,10 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] {
def indexConfig: IndexConfig
lazy val index: Index = indexConfig.apply()

/* Aggregates */
def aggregatesConfig: AggregatesConfig
lazy val aggregates: Aggregates = aggregatesConfig.apply()

/* ZooKeeper */
def zkConfig: ZooKeeperConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.twitter.zipkin.config

import com.twitter.zipkin.query.ZipkinQuery
import com.twitter.zipkin.gen
import com.twitter.zipkin.storage.{Index, Storage}
import com.twitter.zipkin.storage.{Aggregates, Index, Storage}
import com.twitter.common.zookeeper.{ServerSetImpl, ZooKeeperClient}
import com.twitter.finagle.zipkin.thrift.ZipkinTracer
import com.twitter.ostrich.admin.RuntimeEnvironment
Expand All @@ -42,6 +42,9 @@ trait ZipkinQueryConfig extends ZipkinConfig[ZipkinQuery] {
def indexConfig: IndexConfig
lazy val index: Index = indexConfig.apply()

def aggregatesConfig: AggregatesConfig
lazy val aggregates: Aggregates = aggregatesConfig.apply()

def zkConfig: ZooKeeperConfig

def zkClientConfig = new ZooKeeperClientConfig {
Expand All @@ -55,6 +58,6 @@ trait ZipkinQueryConfig extends ZipkinConfig[ZipkinQuery] {
ZipkinTracer(statsReceiver = statsReceiver, sampleRate = 1f)

def apply(runtime: RuntimeEnvironment) = {
new ZipkinQuery(this, serverSet, storage, index)
new ZipkinQuery(this, serverSet, storage, index, aggregates)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.twitter.finagle.tracing.Trace
import com.twitter.util.Future
import com.twitter.zipkin.gen
import com.twitter.zipkin.query.adjusters.Adjuster
import com.twitter.zipkin.storage.{TraceIdDuration, Index, Storage}
import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Index, Storage}
import java.nio.ByteBuffer
import org.apache.thrift.TException
import scala.collection.Set
Expand All @@ -34,7 +34,7 @@ import scala.collection.Set
* by lookup the information in the index and then fetch the required trace data
* from the storage.
*/
class QueryService(storage: Storage, index: Index, adjusterMap: Map[gen.Adjust, Adjuster])
class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjusterMap: Map[gen.Adjust, Adjuster])
extends gen.ZipkinQuery.FutureIface with Service {
private val log = Logger.get
private var running = false
Expand Down Expand Up @@ -339,6 +339,34 @@ class QueryService(storage: Storage, index: Index, adjusterMap: Map[gen.Adjust,
}
}

def getTopAnnotations(serviceName: String): Future[Seq[String]] = {
checkIfRunning()
Stats.getCounter("query.get_top_annotations").incr()
log.debug("getTopAnnotations: " + serviceName)

Stats.timeFutureMillis("query.getTopAnnotations") {
aggregates.getTopAnnotations(serviceName) onFailure { e =>
log.error(e, "getTopAnnotations failed")
Stats.getCounter("query.error_get_top_annotations").incr()
Future.exception(gen.QueryException(e.toString))
}
}
}

def getTopKeyValueAnnotations(serviceName: String): Future[Seq[String]] = {
checkIfRunning()
Stats.getCounter("query.get_top_key_value_annotations").incr()
log.debug("getTopKeyValueAnnotations: " + serviceName)

Stats.timeFutureMillis("query.getTopKeyValueAnnotations") {
aggregates.getTopKeyValueAnnotations(serviceName) onFailure { e =>
log.error(e, "getTopKeyValueAnnotations failed")
Stats.getCounter("query.error_get_top_key_value_annotations").incr()
Future.exception(gen.QueryException(e.toString))
}
}
}

private def checkIfRunning() = {
if (!running) {
log.warning("Server not running, throwing exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.query
*/
import com.twitter.logging.Logger
import org.apache.thrift.protocol.TBinaryProtocol
import com.twitter.zipkin.storage.{Index, Storage}
import com.twitter.zipkin.storage.{Aggregates, Index, Storage}
import com.twitter.zipkin.gen
import com.twitter.finagle.thrift.ThriftServerFramedCodec
import com.twitter.finagle.zookeeper.ZookeeperServerSetCluster
Expand All @@ -29,7 +29,7 @@ import com.twitter.zipkin.config.ZipkinQueryConfig
import com.twitter.common.zookeeper.ServerSet

class ZipkinQuery(
config: ZipkinQueryConfig, serverSet: ServerSet, storage: Storage, index: Index
config: ZipkinQueryConfig, serverSet: ServerSet, storage: Storage, index: Index, aggregates: Aggregates
) extends Service {

val log = Logger.get(getClass.getName)
Expand All @@ -41,7 +41,7 @@ class ZipkinQuery(
log.info("Starting query thrift service on addr " + serverAddr)
val cluster = new ZookeeperServerSetCluster(serverSet)

val queryService = new QueryService(storage, index, config.adjusterMap)
val queryService = new QueryService(storage, index, aggregates, config.adjusterMap)
queryService.start()
ServiceTracker.register(queryService)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.zipkin.storage

import com.twitter.util.Future

/**
* Storage and retrieval interface for aggregates that may be computed offline and reloaded into
* online storage
*/
trait Aggregates {
def getTopAnnotations(serviceName: String): Future[Seq[String]]
def getTopKeyValueAnnotations(serviceName: String): Future[Seq[String]]

def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
}

class NullAggregates extends Aggregates {
def getTopAnnotations(serviceName: String) = Future(Seq.empty[String])
def getTopKeyValueAnnotations(serviceName: String) = Future(Seq.empty[String])

def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
}
Loading

0 comments on commit 3e15fe1

Please sign in to comment.