From 2cc33784eb2ac296e615869ce3555d5ce732cfac Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Tue, 28 Aug 2012 10:27:28 -0700 Subject: [PATCH] Return IndexTraceId from storage.Index * Change `storage.Index` to return a `Seq[IndexTraceId]` rather than a vanilla list of trace IDs. Each `IndexTraceId` contains a `traceId` and its associated `timestamp`. Author: @franklinhu Fixes #126 URL: https://github.com/twitter/zipkin/pull/126 --- .../twitter/zipkin/query/QueryService.scala | 12 +++++++--- .../com/twitter/zipkin/storage/Index.scala | 9 +++---- .../storage/cassandra/CassandraIndex.scala | 20 +++++++++------- .../zipkin/query/QueryServiceSpec.scala | 24 +++++++++---------- .../cassandra/CassandraIndexSpec.scala | 20 +++++++++------- 5 files changed, 50 insertions(+), 35 deletions(-) diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala index 4d40fd1f1d0..3d2c3ef81e0 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala @@ -93,7 +93,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus FTrace.recordBinary("limit", limit) FTrace.recordBinary("order", order) - val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit) + val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit).map { + _.map { _.traceId } + } sortTraceIds(traceIds, limit, order) } } @@ -113,7 +115,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus FTrace.recordBinary("limit", limit) FTrace.recordBinary("order", order) - val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit) + val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit).map { + _.map { _.traceId } + } sortTraceIds(traceIds, limit, order) } } @@ -139,7 +143,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus FTrace.recordBinary("limit", limit) FTrace.recordBinary("order", order) - val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit) + val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit).map { + _.map { _.traceId } + } sortTraceIds(traceIds, limit, order) } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Index.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Index.scala index 7b430c8b2c1..d22fda84105 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Index.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Index.scala @@ -19,14 +19,15 @@ import com.twitter.zipkin.common.Span import com.twitter.util.Future import scala.collection.Set import java.nio.ByteBuffer -import com.twitter.logging.Logger -import com.twitter.ostrich.stats.Stats /** * Duration of the trace in question in microseconds. */ case class TraceIdDuration(traceId: Long, duration: Long, startTimestamp: Long) +/* A trace ID and its associated timestamp */ +case class IndexedTraceId(traceId: Long, timestamp: Long) + trait Index { /** @@ -39,7 +40,7 @@ trait Index { * Only return maximum of limit trace ids from before the endTs. */ def getTraceIdsByName(serviceName: String, spanName: Option[String], - endTs: Long, limit: Int): Future[Seq[Long]] + endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] /** * Get the trace ids for this annotation between the two timestamps. If value is also passed we expect @@ -47,7 +48,7 @@ trait Index { * Only return maximum of limit trace ids from before the endTs. */ def getTraceIdsByAnnotation(serviceName: String, annotation: String, value: Option[ByteBuffer], - endTs: Long, limit: Int): Future[Seq[Long]] + endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] /** * Fetch the duration or an estimate thereof from the traces. diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index 271d336c3e2..6a32631ad6c 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import java.util.{Map => JMap} import com.twitter.zipkin.common.{Annotation, Span} import com.twitter.zipkin.util.Util -import com.twitter.zipkin.storage.{TraceIdDuration, Index} +import com.twitter.zipkin.storage.{IndexedTraceId, TraceIdDuration, Index} import com.twitter.util.Future import com.twitter.zipkin.config.CassandraConfig import com.twitter.zipkin.Constants @@ -134,7 +134,7 @@ trait CassandraIndex extends Index with Cassandra { */ def getTraceIdsByName(serviceName: String, spanName: Option[String], - endTs: Long, limit: Int): Future[Seq[Long]] = { + endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = { CASSANDRA_GET_TRACE_IDS_BY_NAME.incr // if we have a span name, look up in the service + span name index // if not, look up by service name only @@ -147,15 +147,17 @@ trait CassandraIndex extends Index with Cassandra { serviceNameIndex.getRowSlice(key, Some(endTs), None, limit, Order.Reversed) } - // Future[Seq[Column[Long, Long]]] => Future[Seq[Long]] - row map {s => - (s map {_.value}).distinct + // Future[Seq[Column[Long, Long]]] => Future[Seq[IndexedTraceId]] + row map { + _.map { column => + IndexedTraceId(traceId = column.value, timestamp = column.name) + } } } def getTraceIdsByAnnotation(service: String, annotation: String, value: Option[ByteBuffer], - endTs: Long, limit: Int): Future[Seq[Long]] = { + endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = { CASSANDRA_GET_TRACE_IDS_BY_ANN.incr val row = value match { case Some(v) => { @@ -167,8 +169,10 @@ trait CassandraIndex extends Index with Cassandra { annotationsIndex.getRowSlice(key, Some(endTs), None, limit, Order.Reversed) } - row map { s => - (s map {_.value}).distinct + row map { + _.map { column => + IndexedTraceId(traceId = column.value, timestamp = column.name) + } } } diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala index c1995404311..27b3d360bec 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala @@ -16,15 +16,15 @@ */ package com.twitter.zipkin.query -import adjusters.{TimeSkewAdjuster, NullAdjuster} -import org.specs.Specification -import org.specs.mock.{ClassMocker, JMocker} -import com.twitter.zipkin.gen -import com.twitter.zipkin.common._ -import java.nio.ByteBuffer import com.twitter.util.Future -import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Storage, Index} import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} +import com.twitter.zipkin.common._ +import com.twitter.zipkin.gen +import com.twitter.zipkin.query.adjusters.{TimeSkewAdjuster, NullAdjuster} +import com.twitter.zipkin.storage._ +import java.nio.ByteBuffer +import org.specs.Specification +import org.specs.mock.{ClassMocker, JMocker} class QueryServiceSpec extends Specification with JMocker with ClassMocker { val ep1 = Endpoint(123, 123, "service1") @@ -80,20 +80,20 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { } class MockIndex extends Index { - def ids: Seq[Long] = Seq(1, 2, 3) + def ids = Seq(IndexedTraceId(1, 1), IndexedTraceId(2, 2), IndexedTraceId(3, 3)) def mockSpanName: Option[String] = Some("methodcall") def mockValue: Option[ByteBuffer] = None def close() = null def getTraceIdsByName(serviceName: String, spanName: Option[String], - endTs: Long, limit: Int): Future[Seq[Long]] = { + endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = { serviceName mustEqual "service" spanName mustEqual mockSpanName endTs mustEqual 100L Future(ids) } def getTraceIdsByAnnotation(service: String, annotation: String, value: Option[ByteBuffer], endTs: Long, - limit: Int): Future[Seq[Long]] = { + limit: Int): Future[Seq[IndexedTraceId]] = { service mustEqual "service" annotation mustEqual "annotation" value mustEqual mockValue @@ -409,7 +409,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { "fail to find traces by name in index, return empty" in { val storage = mock[Storage] val index = new MockIndex { - override def ids: Seq[Long] = Seq() + override def ids: Seq[IndexedTraceId] = Seq() override def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = Future(Seq()) } val qs = new QueryService(storage, index, null, Map()) @@ -421,7 +421,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { "fail to find traces by annotation in index, return empty" in { val storage = mock[Storage] val index = new MockIndex { - override def ids: Seq[Long] = Seq() + override def ids: Seq[IndexedTraceId] = Seq() override def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = Future(Seq()) } val qs = new QueryService(storage, index, null, Map()) diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraIndexSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraIndexSpec.scala index 44932a17467..1e22854724d 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraIndexSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraIndexSpec.scala @@ -120,10 +120,10 @@ class CassandraIndexSpec extends Specification with JMocker with ClassMocker { //cassandra.storeSpan(span1)() cassandraIndex.indexTraceIdByServiceAndName(span1)() cassandraIndex.getTraceIdsByName("service", None, 0, 3)() foreach { - _ mustEqual span1.traceId + _.traceId mustEqual span1.traceId } cassandraIndex.getTraceIdsByName("service", Some("methodname"), 0, 3)() foreach { - _ mustEqual span1.traceId + _.traceId mustEqual span1.traceId } } @@ -188,17 +188,21 @@ class CassandraIndexSpec extends Specification with JMocker with ClassMocker { cassandraIndex.indexSpanByAnnotations(span1)() // fetch by time based annotation, find trace - var seq = cassandraIndex.getTraceIdsByAnnotation("service", "custom", None, 0, 3)() - seq mustEqual Seq(span1.traceId) + val map1 = cassandraIndex.getTraceIdsByAnnotation("service", "custom", None, 0, 3)() + map1.foreach { + _.traceId mustEqual span1.traceId + } // should not find any traces since the core annotation doesn't exist in index - seq = cassandraIndex.getTraceIdsByAnnotation("service", "cs", None, 0, 3)() - seq.isEmpty mustBe true + val map2 = cassandraIndex.getTraceIdsByAnnotation("service", "cs", None, 0, 3)() + map2.isEmpty mustBe true // should find traces by the key and value annotation - seq = cassandraIndex.getTraceIdsByAnnotation("service", "BAH", + val map3 = cassandraIndex.getTraceIdsByAnnotation("service", "BAH", Some(ByteBuffer.wrap("BEH".getBytes)), 0, 3)() - seq mustEqual Seq(span1.traceId) + map3.foreach { + _.traceId mustEqual span1.traceId + } } "not index empty service name" in {