Skip to content

Commit

Permalink
Added tracesExist support
Browse files Browse the repository at this point in the history
traceExist queries the database to find which traces from a list are actually in the database

Author: @jerryli9876
Fixes #100
URL: #100
  • Loading branch information
jerryli9876 committed Aug 6, 2012
1 parent b0f50d4 commit fae7e61
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
}
}

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = {
log.debug("tracesExist. " + traceIds)
call("tracesExist") {
FTrace.recordBinary("numIds", traceIds.length)

storage.tracesExist(traceIds)
}
}

def getTracesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.Trace]] = {
log.debug("getTracesByIds. " + traceIds + " adjust " + adjust)
call("getTracesByIds") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ trait Storage {
*/
def getTimeToLive(traceId: Long): Future[Duration]

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]]

/**
* Get the available trace information from the storage system.
* Spans in trace should be sorted by the first annotation timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ trait CassandraStorage extends Storage with Cassandra {
// read the trace
private val CASSANDRA_GET_TRACE = Stats.getCounter("cassandra_gettrace")

// trace exist call
private val CASSANDRA_TRACE_EXISTS = Stats.getCounter("cassandra_traceexists")

// trace is too big!
private val CASSANDRA_GET_TRACE_TOO_BIG = Stats.getCounter("cassandra_gettrace_too_big")

Expand Down Expand Up @@ -89,6 +92,35 @@ trait CassandraStorage extends Storage with Cassandra {
}
}

/**
* Finds traces that have been stored from a list of trace IDs
*
* @param traceIds a List of trace IDs
* @return a Set of those trace IDs from the list which are stored
*/

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = {
CASSANDRA_TRACE_EXISTS.incr
Future.collect {
traceIds.grouped(storageConfig.traceFetchBatchSize).toSeq.map { ids =>
traces.multigetRows(ids.toSet.asJava, None, None, Order.Normal, 1).map { rowSet =>
ids.flatMap { id =>
val spans = rowSet.asScala(id).asScala.map {
case (colName, col) => ThriftAdapter(col.value)
}
if (spans.isEmpty) {
None
} else {
Some(spans.head.traceId)
}
}.toSet
}
}
}.map {
_.reduce { (left, right) => left ++ right }
}
}

/**
* Fetches traces from the underlying storage. Note that there might be multiple
* entries per span.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ class ZipkinSpec extends Specification with JMocker with ClassMocker {
// let's check that the trace we just sent has been stored and indexed properly
val queryClient = new gen.ZipkinQuery.FinagledClient(queryTransport, protocol)
val traces = queryClient.getTracesByIds(Seq(123), Seq())()
val existSet = queryClient.tracesExist(Seq(123, 5))()

traces.isEmpty mustEqual false
traces(0).spans.isEmpty mustEqual false
traces(0).spans(0).traceId mustEqual 123

existSet.contains(123) mustEqual true
existSet.contains(5) mustEqual false
}

}
Expand Down
6 changes: 6 additions & 0 deletions zipkin-thrift/src/main/thrift/zipkinQuery.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ service ZipkinQuery {

//************** Fetch traces from id **************

/**
* Get the traces that are in the database from the given list of trace ids.
*/

set<i64> tracesExist(1: list<i64> trace_ids) throws (1: QueryException qe);

/**
* Get the full traces associated with the given trace ids.
*
Expand Down

0 comments on commit fae7e61

Please sign in to comment.