From 6449ab7e0ee801f7ba04378e75a5ac4851998a02 Mon Sep 17 00:00:00 2001 From: Jerry Li Date: Mon, 6 Aug 2012 11:33:33 -0700 Subject: [PATCH] Added tracesExist support traceExist queries the database to find which traces from a list are actually in the database Author: @jerryli9876 Fixes #100 URL: https://github.com/twitter/zipkin/pull/100 --- .../twitter/zipkin/query/QueryService.scala | 9 ++++++ .../com/twitter/zipkin/storage/Storage.scala | 2 ++ .../storage/cassandra/CassandraStorage.scala | 32 +++++++++++++++++++ .../scala/com/twitter/zipkin/ZipkinSpec.scala | 4 +++ .../src/main/thrift/zipkinQuery.thrift | 6 ++++ 5 files changed, 53 insertions(+) diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala index 9654e3cb6a3..0bf96083a2c 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala @@ -144,6 +144,15 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus } } + def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = { + log.debug("tracesExist. " + traceIds) + call("tracesExist") { + FTrace.recordBinary("numIds", traceIds.length) + + storage.tracesExist(traceIds) + } + } + def getTracesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.Trace]] = { log.debug("getTracesByIds. " + traceIds + " adjust " + adjust) call("getTracesByIds") { diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala index e515beccbdb..6a6e450d49b 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala @@ -44,6 +44,8 @@ trait Storage { */ def getTimeToLive(traceId: Long): Future[Duration] + def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] + /** * Get the available trace information from the storage system. * Spans in trace should be sorted by the first annotation timestamp diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala index 398ac01c938..87906d3523f 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala @@ -40,6 +40,9 @@ trait CassandraStorage extends Storage with Cassandra { // read the trace private val CASSANDRA_GET_TRACE = Stats.getCounter("cassandra_gettrace") + // trace exist call + private val CASSANDRA_TRACE_EXISTS = Stats.getCounter("cassandra_traceexists") + // trace is too big! private val CASSANDRA_GET_TRACE_TOO_BIG = Stats.getCounter("cassandra_gettrace_too_big") @@ -89,6 +92,35 @@ trait CassandraStorage extends Storage with Cassandra { } } + /** + * Finds traces that have been stored from a list of trace IDs + * + * @param traceIds a List of trace IDs + * @return a Set of those trace IDs from the list which are stored + */ + + def tracesExist(traceIds: Seq[Long]): Future[Set[Long]] = { + CASSANDRA_TRACE_EXISTS.incr + Future.collect { + traceIds.grouped(storageConfig.traceFetchBatchSize).toSeq.map { ids => + traces.multigetRows(ids.toSet.asJava, None, None, Order.Normal, 1).map { rowSet => + ids.flatMap { id => + val spans = rowSet.asScala(id).asScala.map { + case (colName, col) => ThriftAdapter(col.value) + } + if (spans.isEmpty) { + None + } else { + Some(spans.head.traceId) + } + }.toSet + } + } + }.map { + _.reduce { (left, right) => left ++ right } + } + } + /** * Fetches traces from the underlying storage. Note that there might be multiple * entries per span. diff --git a/zipkin-test/src/test/scala/com/twitter/zipkin/ZipkinSpec.scala b/zipkin-test/src/test/scala/com/twitter/zipkin/ZipkinSpec.scala index f1221ac6c9d..dcc5047bce3 100644 --- a/zipkin-test/src/test/scala/com/twitter/zipkin/ZipkinSpec.scala +++ b/zipkin-test/src/test/scala/com/twitter/zipkin/ZipkinSpec.scala @@ -130,10 +130,14 @@ class ZipkinSpec extends Specification with JMocker with ClassMocker { // let's check that the trace we just sent has been stored and indexed properly val queryClient = new gen.ZipkinQuery.FinagledClient(queryTransport, protocol) val traces = queryClient.getTracesByIds(Seq(123), Seq())() + val existSet = queryClient.tracesExist(Seq(123, 5))() traces.isEmpty mustEqual false traces(0).spans.isEmpty mustEqual false traces(0).spans(0).traceId mustEqual 123 + + existSet.contains(123) mustEqual true + existSet.contains(5) mustEqual false } } diff --git a/zipkin-thrift/src/main/thrift/zipkinQuery.thrift b/zipkin-thrift/src/main/thrift/zipkinQuery.thrift index 60d89138415..01c391cae83 100644 --- a/zipkin-thrift/src/main/thrift/zipkinQuery.thrift +++ b/zipkin-thrift/src/main/thrift/zipkinQuery.thrift @@ -120,6 +120,12 @@ service ZipkinQuery { //************** Fetch traces from id ************** + /** + * Get the traces that are in the database from the given list of trace ids. + */ + + set tracesExist(1: list trace_ids) throws (1: QueryException qe); + /** * Get the full traces associated with the given trace ids. *