Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added getTracesExist support #100

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
}
}

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = {
log.debug("tracesExist. " + traceIds)
call("tracesExist") {
FTrace.recordBinary("numIds", traceIds.length)

storage.tracesExist(traceIds)
}
}

def getTracesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.Trace]] = {
log.debug("getTracesByIds. " + traceIds + " adjust " + adjust)
call("getTracesByIds") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ trait Storage {
*/
def getTimeToLive(traceId: Long): Future[Duration]

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]]

/**
* Get the available trace information from the storage system.
* Spans in trace should be sorted by the first annotation timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ trait CassandraStorage extends Storage with Cassandra {
// read the trace
private val CASSANDRA_GET_TRACE = Stats.getCounter("cassandra_gettrace")

// trace exist call
private val CASSANDRA_TRACE_EXISTS = Stats.getCounter("cassandra_traceexists")

// trace is too big!
private val CASSANDRA_GET_TRACE_TOO_BIG = Stats.getCounter("cassandra_gettrace_too_big")

Expand Down Expand Up @@ -89,6 +92,35 @@ trait CassandraStorage extends Storage with Cassandra {
}
}

/**
* Finds traces that have been stored from a list of trace IDs
*
* @param traceIds a List of trace IDs
* @return a Set of those trace IDs from the list which are stored
*/

def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = {
CASSANDRA_TRACE_EXISTS.incr
Future.collect {
traceIds.grouped(storageConfig.traceFetchBatchSize).toSeq.map { ids =>
traces.multigetRows(ids.toSet.asJava, None, None, Order.Normal, 1).map { rowSet =>
ids.flatMap { id =>
val spans = rowSet.asScala(id).asScala.map {
case (colName, col) => ThriftAdapter(col.value)
}
if (spans.isEmpty) {
None
} else {
Some(spans.head.traceId)
}
}.toSet
}
}
}.map {
_.reduce { (left, right) => left ++ right }
}
}

/**
* Fetches traces from the underlying storage. Note that there might be multiple
* entries per span.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ class ZipkinSpec extends Specification with JMocker with ClassMocker {
// let's check that the trace we just sent has been stored and indexed properly
val queryClient = new gen.ZipkinQuery.FinagledClient(queryTransport, protocol)
val traces = queryClient.getTracesByIds(Seq(123), Seq())()
val existSet = queryClient.tracesExist(Seq(123, 5))()

traces.isEmpty mustEqual false
traces(0).spans.isEmpty mustEqual false
traces(0).spans(0).traceId mustEqual 123

existSet.contains(123) mustEqual true
existSet.contains(5) mustEqual false
}

}
Expand Down
6 changes: 6 additions & 0 deletions zipkin-thrift/src/main/thrift/zipkinQuery.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ service ZipkinQuery {

//************** Fetch traces from id **************

/**
* Get the traces that are in the database from the given list of trace ids.
*/

set<i64> tracesExist(1: list<i64> trace_ids) throws (1: QueryException qe);

/**
* Get the full traces associated with the given trace ids.
*
Expand Down