Skip to content

Commit

Permalink
Return IndexTraceId from storage.Index
Browse files Browse the repository at this point in the history
* Change `storage.Index` to return a `Seq[IndexTraceId]` rather than a vanilla
list of trace IDs. Each `IndexTraceId` contains a `traceId` and its associated
`timestamp`.

Author: @franklinhu
Fixes #126
URL: #126
  • Loading branch information
Franklin Hu committed Aug 28, 2012
1 parent 7ed569a commit 2cc3378
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
FTrace.recordBinary("limit", limit)
FTrace.recordBinary("order", order)

val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit)
val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit).map {
_.map { _.traceId }
}
sortTraceIds(traceIds, limit, order)
}
}
Expand All @@ -113,7 +115,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
FTrace.recordBinary("limit", limit)
FTrace.recordBinary("order", order)

val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit)
val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit).map {
_.map { _.traceId }
}
sortTraceIds(traceIds, limit, order)
}
}
Expand All @@ -139,7 +143,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
FTrace.recordBinary("limit", limit)
FTrace.recordBinary("order", order)

val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit)
val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit).map {
_.map { _.traceId }
}
sortTraceIds(traceIds, limit, order)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import com.twitter.zipkin.common.Span
import com.twitter.util.Future
import scala.collection.Set
import java.nio.ByteBuffer
import com.twitter.logging.Logger
import com.twitter.ostrich.stats.Stats

/**
* Duration of the trace in question in microseconds.
*/
case class TraceIdDuration(traceId: Long, duration: Long, startTimestamp: Long)

/* A trace ID and its associated timestamp */
case class IndexedTraceId(traceId: Long, timestamp: Long)

trait Index {

/**
Expand All @@ -39,15 +40,15 @@ trait Index {
* Only return maximum of limit trace ids from before the endTs.
*/
def getTraceIdsByName(serviceName: String, spanName: Option[String],
endTs: Long, limit: Int): Future[Seq[Long]]
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]]

/**
* Get the trace ids for this annotation between the two timestamps. If value is also passed we expect
* both the annotation key and value to be present in index for a match to be returned.
* Only return maximum of limit trace ids from before the endTs.
*/
def getTraceIdsByAnnotation(serviceName: String, annotation: String, value: Option[ByteBuffer],
endTs: Long, limit: Int): Future[Seq[Long]]
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]]

/**
* Fetch the duration or an estimate thereof from the traces.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import java.util.{Map => JMap}
import com.twitter.zipkin.common.{Annotation, Span}
import com.twitter.zipkin.util.Util
import com.twitter.zipkin.storage.{TraceIdDuration, Index}
import com.twitter.zipkin.storage.{IndexedTraceId, TraceIdDuration, Index}
import com.twitter.util.Future
import com.twitter.zipkin.config.CassandraConfig
import com.twitter.zipkin.Constants
Expand Down Expand Up @@ -134,7 +134,7 @@ trait CassandraIndex extends Index with Cassandra {
*/

def getTraceIdsByName(serviceName: String, spanName: Option[String],
endTs: Long, limit: Int): Future[Seq[Long]] = {
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = {
CASSANDRA_GET_TRACE_IDS_BY_NAME.incr
// if we have a span name, look up in the service + span name index
// if not, look up by service name only
Expand All @@ -147,15 +147,17 @@ trait CassandraIndex extends Index with Cassandra {
serviceNameIndex.getRowSlice(key, Some(endTs), None, limit, Order.Reversed)
}

// Future[Seq[Column[Long, Long]]] => Future[Seq[Long]]
row map {s =>
(s map {_.value}).distinct
// Future[Seq[Column[Long, Long]]] => Future[Seq[IndexedTraceId]]
row map {
_.map { column =>
IndexedTraceId(traceId = column.value, timestamp = column.name)
}
}
}


def getTraceIdsByAnnotation(service: String, annotation: String, value: Option[ByteBuffer],
endTs: Long, limit: Int): Future[Seq[Long]] = {
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = {
CASSANDRA_GET_TRACE_IDS_BY_ANN.incr
val row = value match {
case Some(v) => {
Expand All @@ -167,8 +169,10 @@ trait CassandraIndex extends Index with Cassandra {
annotationsIndex.getRowSlice(key, Some(endTs), None, limit, Order.Reversed)
}

row map { s =>
(s map {_.value}).distinct
row map {
_.map { column =>
IndexedTraceId(traceId = column.value, timestamp = column.name)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package com.twitter.zipkin.query

import adjusters.{TimeSkewAdjuster, NullAdjuster}
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
import com.twitter.zipkin.gen
import com.twitter.zipkin.common._
import java.nio.ByteBuffer
import com.twitter.util.Future
import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Storage, Index}
import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter}
import com.twitter.zipkin.common._
import com.twitter.zipkin.gen
import com.twitter.zipkin.query.adjusters.{TimeSkewAdjuster, NullAdjuster}
import com.twitter.zipkin.storage._
import java.nio.ByteBuffer
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}

class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val ep1 = Endpoint(123, 123, "service1")
Expand Down Expand Up @@ -80,20 +80,20 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
}

class MockIndex extends Index {
def ids: Seq[Long] = Seq(1, 2, 3)
def ids = Seq(IndexedTraceId(1, 1), IndexedTraceId(2, 2), IndexedTraceId(3, 3))
def mockSpanName: Option[String] = Some("methodcall")
def mockValue: Option[ByteBuffer] = None

def close() = null
def getTraceIdsByName(serviceName: String, spanName: Option[String],
endTs: Long, limit: Int): Future[Seq[Long]] = {
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = {
serviceName mustEqual "service"
spanName mustEqual mockSpanName
endTs mustEqual 100L
Future(ids)
}
def getTraceIdsByAnnotation(service: String, annotation: String, value: Option[ByteBuffer], endTs: Long,
limit: Int): Future[Seq[Long]] = {
limit: Int): Future[Seq[IndexedTraceId]] = {
service mustEqual "service"
annotation mustEqual "annotation"
value mustEqual mockValue
Expand Down Expand Up @@ -409,7 +409,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
"fail to find traces by name in index, return empty" in {
val storage = mock[Storage]
val index = new MockIndex {
override def ids: Seq[Long] = Seq()
override def ids: Seq[IndexedTraceId] = Seq()
override def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = Future(Seq())
}
val qs = new QueryService(storage, index, null, Map())
Expand All @@ -421,7 +421,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
"fail to find traces by annotation in index, return empty" in {
val storage = mock[Storage]
val index = new MockIndex {
override def ids: Seq[Long] = Seq()
override def ids: Seq[IndexedTraceId] = Seq()
override def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = Future(Seq())
}
val qs = new QueryService(storage, index, null, Map())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ class CassandraIndexSpec extends Specification with JMocker with ClassMocker {
//cassandra.storeSpan(span1)()
cassandraIndex.indexTraceIdByServiceAndName(span1)()
cassandraIndex.getTraceIdsByName("service", None, 0, 3)() foreach {
_ mustEqual span1.traceId
_.traceId mustEqual span1.traceId
}
cassandraIndex.getTraceIdsByName("service", Some("methodname"), 0, 3)() foreach {
_ mustEqual span1.traceId
_.traceId mustEqual span1.traceId
}
}

Expand Down Expand Up @@ -188,17 +188,21 @@ class CassandraIndexSpec extends Specification with JMocker with ClassMocker {
cassandraIndex.indexSpanByAnnotations(span1)()

// fetch by time based annotation, find trace
var seq = cassandraIndex.getTraceIdsByAnnotation("service", "custom", None, 0, 3)()
seq mustEqual Seq(span1.traceId)
val map1 = cassandraIndex.getTraceIdsByAnnotation("service", "custom", None, 0, 3)()
map1.foreach {
_.traceId mustEqual span1.traceId
}

// should not find any traces since the core annotation doesn't exist in index
seq = cassandraIndex.getTraceIdsByAnnotation("service", "cs", None, 0, 3)()
seq.isEmpty mustBe true
val map2 = cassandraIndex.getTraceIdsByAnnotation("service", "cs", None, 0, 3)()
map2.isEmpty mustBe true

// should find traces by the key and value annotation
seq = cassandraIndex.getTraceIdsByAnnotation("service", "BAH",
val map3 = cassandraIndex.getTraceIdsByAnnotation("service", "BAH",
Some(ByteBuffer.wrap("BEH".getBytes)), 0, 3)()
seq mustEqual Seq(span1.traceId)
map3.foreach {
_.traceId mustEqual span1.traceId
}
}

"not index empty service name" in {
Expand Down

0 comments on commit 2cc3378

Please sign in to comment.