From cd4849801dba929effec8aebcb73dcf273951536 Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Thu, 2 Aug 2012 16:23:36 -0700 Subject: [PATCH] Remove Trace dependency from Storage Rather than returning a Trace from Storage, return a sequence of Spans and let the QueryService deal Author: @franklinhu Fixes #98 URL: https://github.com/twitter/zipkin/pull/98 --- .../twitter/zipkin/query/QueryService.scala | 20 ++++++----- .../com/twitter/zipkin/storage/Storage.scala | 10 ++---- .../storage/cassandra/CassandraStorage.scala | 26 +++++++------- .../zipkin/query/QueryServiceSpec.scala | 32 ++++++++++------- .../cassandra/CassandraStorageSpec.scala | 36 +++++++++++-------- 5 files changed, 68 insertions(+), 56 deletions(-) diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala index e1515e40ce0..9654e3cb6a3 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala @@ -150,8 +150,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) FTrace.recordBinary("numIds", traceIds.length) - storage.getTracesByIds(traceIds).map { traces => - traces.map { trace => + storage.getSpansByTraceIds(traceIds).map { traces => + traces.map { spans => + val trace = Trace(spans) ThriftQueryAdapter(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))) } } @@ -165,8 +166,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) FTrace.recordBinary("numIds", traceIds.length) - storage.getTracesByIds(traceIds).map { traces => - traces.flatMap { trace => + storage.getSpansByTraceIds(traceIds).map { traces => + traces.flatMap { spans => + val trace = Trace(spans) TraceTimeline(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_)) } } @@ -180,8 +182,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) FTrace.recordBinary("numIds", traceIds.length) - storage.getTracesByIds(traceIds.toList).map { traces => - traces.flatMap { trace => + storage.getSpansByTraceIds(traceIds.toList).map { traces => + traces.flatMap { spans => + val trace = Trace(spans) TraceSummary(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_)) } } @@ -194,8 +197,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) FTrace.recordBinary("numIds", traceIds.length) - storage.getTracesByIds(traceIds).map { traces => - traces.map { trace => + storage.getSpansByTraceIds(traceIds).map { traces => + traces.map { spans => + val trace = Trace(spans) ThriftQueryAdapter(TraceCombo(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t)))) } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala index 3ce69f7cc9b..e515beccbdb 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Storage.scala @@ -18,7 +18,6 @@ package com.twitter.zipkin.storage import com.twitter.util.{Duration, Future} import com.twitter.zipkin.common.Span -import com.twitter.zipkin.query.Trace trait Storage { @@ -50,13 +49,8 @@ trait Storage { * Spans in trace should be sorted by the first annotation timestamp * in that span. First event should be first in the spans list. */ - def getTraceById(traceId: Long) : Future[Trace] - - /** - * Fetch multiple traces. - */ - def getTracesByIds(traceIds : Seq[Long]) : Future[Seq[Trace]] - + def getSpansByTraceIds(traceIds: Seq[Long]): Future[Seq[Seq[Span]]] + def getSpansByTraceId(traceId: Long): Future[Seq[Span]] /** * How long do we store the data before we delete it? In seconds. */ diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala index 53ef582c191..398ac01c938 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraStorage.scala @@ -25,7 +25,6 @@ import com.twitter.conversions.time._ import scala.collection.JavaConverters._ import com.twitter.zipkin.config.{CassandraConfig, CassandraStorageConfig} import com.twitter.zipkin.adapter.ThriftAdapter -import com.twitter.zipkin.query.Trace trait CassandraStorage extends Storage with Cassandra { @@ -94,13 +93,13 @@ trait CassandraStorage extends Storage with Cassandra { * Fetches traces from the underlying storage. Note that there might be multiple * entries per span. */ - def getTraceById(traceId: Long): Future[Trace] = { - getTracesByIds(Seq(traceId)).map { + def getSpansByTraceId(traceId: Long): Future[Seq[Span]] = { + getSpansByTraceIds(Seq(traceId)).map { _.head } } - def getTracesByIds(traceIds: Seq[Long]): Future[Seq[Trace]] = { + def getSpansByTraceIds(traceIds: Seq[Long]): Future[Seq[Seq[Span]]] = { CASSANDRA_GET_TRACE.incr Future.collect { traceIds.grouped(storageConfig.traceFetchBatchSize).toSeq.map { ids => @@ -110,14 +109,17 @@ trait CassandraStorage extends Storage with Cassandra { case (colName, col) => ThriftAdapter(col.value) } - if (spans.isEmpty) { - None - } else if (spans.size >= TRACE_MAX_COLS) { - log.error("Could not fetch the whole trace: " + id + " due to it being too big. Should not happen!") - CASSANDRA_GET_TRACE_TOO_BIG.incr() - None - } else { - Some(Trace(spans.toSeq)) + spans.toSeq match { + case Nil => { + None + } + case s if s.length > TRACE_MAX_COLS => { + CASSANDRA_GET_TRACE_TOO_BIG.incr() + None + } + case s => { + Some(s) + } } } } diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala index e86c6d2f7e5..c1995404311 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala @@ -32,25 +32,29 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { val ep3 = Endpoint(345, 345, "service3") val ann1 = Annotation(100, gen.Constants.CLIENT_SEND, Some(ep1)) val ann2 = Annotation(150, gen.Constants.CLIENT_RECV, Some(ep1)) - val trace1 = Trace(List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil))) + val spans1 = List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil)) + val trace1 = Trace(spans1) // duration 50 val ann3 = Annotation(101, gen.Constants.CLIENT_SEND, Some(ep2)) val ann4 = Annotation(501, gen.Constants.CLIENT_RECV, Some(ep2)) - val trace2 = Trace(List(Span(2, "methodcall", 667, None, List(ann3, ann4), Nil))) + val spans2 = List(Span(2, "methodcall", 667, None, List(ann3, ann4), Nil)) + val trace2 = Trace(spans2) // duration 400 val ann5 = Annotation(99, gen.Constants.CLIENT_SEND, Some(ep3)) val ann6 = Annotation(199, gen.Constants.CLIENT_RECV, Some(ep3)) - val trace3 = Trace(List(Span(3, "methodcall", 668, None, List(ann5, ann6), Nil))) + val spans3 = List(Span(3, "methodcall", 668, None, List(ann5, ann6), Nil)) + val trace3 = Trace(spans3) // duration 100 // get some server action going on val ann7 = Annotation(110, gen.Constants.SERVER_RECV, Some(ep2)) val ann8 = Annotation(140, gen.Constants.SERVER_SEND, Some(ep2)) - val trace4 = Trace(List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil), - Span(1, "methodcall", 666, None, List(ann7, ann8), Nil))) + val spans4 = List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil), + Span(1, "methodcall", 666, None, List(ann7, ann8), Nil)) + val trace4 = Trace(spans4) // no spans val trace5 = Trace(List()) @@ -159,7 +163,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { val traceId = 123L expect { - one(storage).getTracesByIds(List(traceId)) willReturn Future(List(trace1)) + one(storage).getSpansByTraceIds(List(traceId)) willReturn Future(List(spans1)) } val ts = List(ThriftQueryAdapter(TraceSummary(1, 100, 150, 50, Map("service1" -> 1), List(ep1)))) @@ -175,7 +179,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { val traceId = 123L expect { - one(storage).getTracesByIds(List(traceId)) willReturn Future(List(trace1)) + one(storage).getSpansByTraceIds(List(traceId)) willReturn Future(List(spans1)) } val trace = ThriftQueryAdapter(trace1) val summary = ThriftQueryAdapter(TraceSummary(1, 100, 150, 50, Map("service1" -> 1), List(ep1))) @@ -232,7 +236,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { qs.start() expect { - 1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace1)) + 1.of(storage).getSpansByTraceIds(List(1L)) willReturn Future(List(spans1)) } val expected = List(ThriftQueryAdapter(trace1)) @@ -248,7 +252,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { qs.start() expect { - 1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4)) + 1.of(storage).getSpansByTraceIds(List(1L)) willReturn Future(List(spans4)) } val ann1 = gen.TimelineAnnotation(100, gen.Constants.CLIENT_SEND, @@ -292,10 +296,11 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { Annotation(6871825L, "cr", epCuckooCassie) ), Nil) - val realTrace = Trace(List(rs1, rs2)) + val realSpans = List(rs1, rs2) + val realTrace = Trace(realSpans) expect { - 1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace)) + 1.of(storage).getSpansByTraceIds(List(4488677265848750007L)) willReturn Future(List(realSpans)) } val actual = qs.getTraceTimelinesByIds(List(4488677265848750007L), List(gen.Adjust.TimeSkew))() @@ -354,10 +359,11 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { Annotation(832872L, "cr", epCuckooCassie) ), Nil) - val realTrace = Trace(List(rs1, rs2)) + val realSpans = List(rs1, rs2) + val realTrace = Trace(realSpans) expect { - 1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace)) + 1.of(storage).getSpansByTraceIds(List(-6120267009876080004L)) willReturn Future(List(realSpans)) } val actual = qs.getTraceTimelinesByIds(List(-6120267009876080004L), List(gen.Adjust.TimeSkew))() diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraStorageSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraStorageSpec.scala index 1410ed86fd8..7c6a6896995 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraStorageSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraStorageSpec.scala @@ -27,6 +27,7 @@ import org.specs.Specification import com.twitter.io.TempFile import com.twitter.zipkin.adapter.ThriftAdapter import com.twitter.zipkin.common.{BinaryAnnotation, Endpoint, Annotation, Span} +import com.twitter.zipkin.query.Trace class CassandraStorageSpec extends Specification with JMocker with ClassMocker { object FakeServer extends FakeCassandra @@ -62,33 +63,38 @@ class CassandraStorageSpec extends Specification with JMocker with ClassMocker { FakeServer.stop() } - "getTraceById" in { + "getSpansByTraceId" in { cassandraStorage.storeSpan(span1)() - val trace = cassandraStorage.getTraceById(span1.traceId)() - trace.spans.isEmpty mustEqual false - trace.spans(0) mustEqual span1 + val spans = cassandraStorage.getSpansByTraceId(span1.traceId)() + spans.isEmpty mustEqual false + spans(0) mustEqual span1 } - "getTracesByIds" in { + "getSpansByTraceIds" in { cassandraStorage.storeSpan(span1)() - val actual1 = cassandraStorage.getTracesByIds(List(span1.traceId))() + val actual1 = cassandraStorage.getSpansByTraceIds(List(span1.traceId))() actual1.isEmpty mustEqual false - actual1(0).spans.isEmpty mustEqual false - actual1(0).spans(0) mustEqual span1 + + val trace1 = Trace(actual1(0)) + trace1.spans.isEmpty mustEqual false + trace1.spans(0) mustEqual span1 val span2 = Span(666, "methodcall2", spanId, None, List(ann2), List(binaryAnnotation("BAH2", "BEH2"))) cassandraStorage.storeSpan(span2)() - val actual2 = cassandraStorage.getTracesByIds(List(span1.traceId, span2.traceId))() + val actual2 = cassandraStorage.getSpansByTraceIds(List(span1.traceId, span2.traceId))() actual2.isEmpty mustEqual false - actual2(0).spans.isEmpty mustEqual false - actual2(0).spans(0) mustEqual span1 - actual2(1).spans.isEmpty mustEqual false - actual2(1).spans(0) mustEqual span2 + + val trace2 = Trace(actual2(0)) + val trace3 = Trace(actual2(1)) + trace2.spans.isEmpty mustEqual false + trace2.spans(0) mustEqual span1 + trace3.spans.isEmpty mustEqual false + trace3.spans(0) mustEqual span2 } - "getTracesByIds should return empty list if no trace exists" in { - val actual1 = cassandraStorage.getTracesByIds(List(span1.traceId))() + "getSpansByTraceIds should return empty list if no trace exists" in { + val actual1 = cassandraStorage.getSpansByTraceIds(List(span1.traceId))() actual1.isEmpty mustEqual true }