Skip to content

Commit

Permalink
Remove Trace dependency from Storage
Browse files Browse the repository at this point in the history
Rather than returning a Trace from Storage, return a sequence of Spans and let
the QueryService deal

Author: @franklinhu
Fixes #98
URL: #98
  • Loading branch information
Franklin Hu committed Aug 2, 2012
1 parent 4025721 commit cd48498
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
val adjusters = getAdjusters(adjust)
FTrace.recordBinary("numIds", traceIds.length)

storage.getTracesByIds(traceIds).map { traces =>
traces.map { trace =>
storage.getSpansByTraceIds(traceIds).map { traces =>
traces.map { spans =>
val trace = Trace(spans)
ThriftQueryAdapter(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t)))
}
}
Expand All @@ -165,8 +166,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
val adjusters = getAdjusters(adjust)
FTrace.recordBinary("numIds", traceIds.length)

storage.getTracesByIds(traceIds).map { traces =>
traces.flatMap { trace =>
storage.getSpansByTraceIds(traceIds).map { traces =>
traces.flatMap { spans =>
val trace = Trace(spans)
TraceTimeline(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_))
}
}
Expand All @@ -180,8 +182,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
val adjusters = getAdjusters(adjust)
FTrace.recordBinary("numIds", traceIds.length)

storage.getTracesByIds(traceIds.toList).map { traces =>
traces.flatMap { trace =>
storage.getSpansByTraceIds(traceIds.toList).map { traces =>
traces.flatMap { spans =>
val trace = Trace(spans)
TraceSummary(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_))
}
}
Expand All @@ -194,8 +197,9 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
val adjusters = getAdjusters(adjust)
FTrace.recordBinary("numIds", traceIds.length)

storage.getTracesByIds(traceIds).map { traces =>
traces.map { trace =>
storage.getSpansByTraceIds(traceIds).map { traces =>
traces.map { spans =>
val trace = Trace(spans)
ThriftQueryAdapter(TraceCombo(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package com.twitter.zipkin.storage

import com.twitter.util.{Duration, Future}
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.query.Trace

trait Storage {

Expand Down Expand Up @@ -50,13 +49,8 @@ trait Storage {
* Spans in trace should be sorted by the first annotation timestamp
* in that span. First event should be first in the spans list.
*/
def getTraceById(traceId: Long) : Future[Trace]

/**
* Fetch multiple traces.
*/
def getTracesByIds(traceIds : Seq[Long]) : Future[Seq[Trace]]

def getSpansByTraceIds(traceIds: Seq[Long]): Future[Seq[Seq[Span]]]
def getSpansByTraceId(traceId: Long): Future[Seq[Span]]
/**
* How long do we store the data before we delete it? In seconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.twitter.conversions.time._
import scala.collection.JavaConverters._
import com.twitter.zipkin.config.{CassandraConfig, CassandraStorageConfig}
import com.twitter.zipkin.adapter.ThriftAdapter
import com.twitter.zipkin.query.Trace

trait CassandraStorage extends Storage with Cassandra {

Expand Down Expand Up @@ -94,13 +93,13 @@ trait CassandraStorage extends Storage with Cassandra {
* Fetches traces from the underlying storage. Note that there might be multiple
* entries per span.
*/
def getTraceById(traceId: Long): Future[Trace] = {
getTracesByIds(Seq(traceId)).map {
def getSpansByTraceId(traceId: Long): Future[Seq[Span]] = {
getSpansByTraceIds(Seq(traceId)).map {
_.head
}
}

def getTracesByIds(traceIds: Seq[Long]): Future[Seq[Trace]] = {
def getSpansByTraceIds(traceIds: Seq[Long]): Future[Seq[Seq[Span]]] = {
CASSANDRA_GET_TRACE.incr
Future.collect {
traceIds.grouped(storageConfig.traceFetchBatchSize).toSeq.map { ids =>
Expand All @@ -110,14 +109,17 @@ trait CassandraStorage extends Storage with Cassandra {
case (colName, col) => ThriftAdapter(col.value)
}

if (spans.isEmpty) {
None
} else if (spans.size >= TRACE_MAX_COLS) {
log.error("Could not fetch the whole trace: " + id + " due to it being too big. Should not happen!")
CASSANDRA_GET_TRACE_TOO_BIG.incr()
None
} else {
Some(Trace(spans.toSeq))
spans.toSeq match {
case Nil => {
None
}
case s if s.length > TRACE_MAX_COLS => {
CASSANDRA_GET_TRACE_TOO_BIG.incr()
None
}
case s => {
Some(s)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,29 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val ep3 = Endpoint(345, 345, "service3")
val ann1 = Annotation(100, gen.Constants.CLIENT_SEND, Some(ep1))
val ann2 = Annotation(150, gen.Constants.CLIENT_RECV, Some(ep1))
val trace1 = Trace(List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil)))
val spans1 = List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil))
val trace1 = Trace(spans1)
// duration 50

val ann3 = Annotation(101, gen.Constants.CLIENT_SEND, Some(ep2))
val ann4 = Annotation(501, gen.Constants.CLIENT_RECV, Some(ep2))
val trace2 = Trace(List(Span(2, "methodcall", 667, None, List(ann3, ann4), Nil)))
val spans2 = List(Span(2, "methodcall", 667, None, List(ann3, ann4), Nil))
val trace2 = Trace(spans2)
// duration 400

val ann5 = Annotation(99, gen.Constants.CLIENT_SEND, Some(ep3))
val ann6 = Annotation(199, gen.Constants.CLIENT_RECV, Some(ep3))
val trace3 = Trace(List(Span(3, "methodcall", 668, None, List(ann5, ann6), Nil)))
val spans3 = List(Span(3, "methodcall", 668, None, List(ann5, ann6), Nil))
val trace3 = Trace(spans3)
// duration 100

// get some server action going on
val ann7 = Annotation(110, gen.Constants.SERVER_RECV, Some(ep2))
val ann8 = Annotation(140, gen.Constants.SERVER_SEND, Some(ep2))

val trace4 = Trace(List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil),
Span(1, "methodcall", 666, None, List(ann7, ann8), Nil)))
val spans4 = List(Span(1, "methodcall", 666, None, List(ann1, ann2), Nil),
Span(1, "methodcall", 666, None, List(ann7, ann8), Nil))
val trace4 = Trace(spans4)

// no spans
val trace5 = Trace(List())
Expand Down Expand Up @@ -159,7 +163,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val traceId = 123L

expect {
one(storage).getTracesByIds(List(traceId)) willReturn Future(List(trace1))
one(storage).getSpansByTraceIds(List(traceId)) willReturn Future(List(spans1))
}

val ts = List(ThriftQueryAdapter(TraceSummary(1, 100, 150, 50, Map("service1" -> 1), List(ep1))))
Expand All @@ -175,7 +179,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val traceId = 123L

expect {
one(storage).getTracesByIds(List(traceId)) willReturn Future(List(trace1))
one(storage).getSpansByTraceIds(List(traceId)) willReturn Future(List(spans1))
}
val trace = ThriftQueryAdapter(trace1)
val summary = ThriftQueryAdapter(TraceSummary(1, 100, 150, 50, Map("service1" -> 1), List(ep1)))
Expand Down Expand Up @@ -232,7 +236,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
qs.start()

expect {
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace1))
1.of(storage).getSpansByTraceIds(List(1L)) willReturn Future(List(spans1))
}

val expected = List(ThriftQueryAdapter(trace1))
Expand All @@ -248,7 +252,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
qs.start()

expect {
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4))
1.of(storage).getSpansByTraceIds(List(1L)) willReturn Future(List(spans4))
}

val ann1 = gen.TimelineAnnotation(100, gen.Constants.CLIENT_SEND,
Expand Down Expand Up @@ -292,10 +296,11 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
Annotation(6871825L, "cr", epCuckooCassie)
), Nil)

val realTrace = Trace(List(rs1, rs2))
val realSpans = List(rs1, rs2)
val realTrace = Trace(realSpans)

expect {
1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace))
1.of(storage).getSpansByTraceIds(List(4488677265848750007L)) willReturn Future(List(realSpans))
}

val actual = qs.getTraceTimelinesByIds(List(4488677265848750007L), List(gen.Adjust.TimeSkew))()
Expand Down Expand Up @@ -354,10 +359,11 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
Annotation(832872L, "cr", epCuckooCassie)
), Nil)

val realTrace = Trace(List(rs1, rs2))
val realSpans = List(rs1, rs2)
val realTrace = Trace(realSpans)

expect {
1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace))
1.of(storage).getSpansByTraceIds(List(-6120267009876080004L)) willReturn Future(List(realSpans))
}

val actual = qs.getTraceTimelinesByIds(List(-6120267009876080004L), List(gen.Adjust.TimeSkew))()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.specs.Specification
import com.twitter.io.TempFile
import com.twitter.zipkin.adapter.ThriftAdapter
import com.twitter.zipkin.common.{BinaryAnnotation, Endpoint, Annotation, Span}
import com.twitter.zipkin.query.Trace

class CassandraStorageSpec extends Specification with JMocker with ClassMocker {
object FakeServer extends FakeCassandra
Expand Down Expand Up @@ -62,33 +63,38 @@ class CassandraStorageSpec extends Specification with JMocker with ClassMocker {
FakeServer.stop()
}

"getTraceById" in {
"getSpansByTraceId" in {
cassandraStorage.storeSpan(span1)()
val trace = cassandraStorage.getTraceById(span1.traceId)()
trace.spans.isEmpty mustEqual false
trace.spans(0) mustEqual span1
val spans = cassandraStorage.getSpansByTraceId(span1.traceId)()
spans.isEmpty mustEqual false
spans(0) mustEqual span1
}

"getTracesByIds" in {
"getSpansByTraceIds" in {
cassandraStorage.storeSpan(span1)()
val actual1 = cassandraStorage.getTracesByIds(List(span1.traceId))()
val actual1 = cassandraStorage.getSpansByTraceIds(List(span1.traceId))()
actual1.isEmpty mustEqual false
actual1(0).spans.isEmpty mustEqual false
actual1(0).spans(0) mustEqual span1

val trace1 = Trace(actual1(0))
trace1.spans.isEmpty mustEqual false
trace1.spans(0) mustEqual span1

val span2 = Span(666, "methodcall2", spanId, None, List(ann2),
List(binaryAnnotation("BAH2", "BEH2")))
cassandraStorage.storeSpan(span2)()
val actual2 = cassandraStorage.getTracesByIds(List(span1.traceId, span2.traceId))()
val actual2 = cassandraStorage.getSpansByTraceIds(List(span1.traceId, span2.traceId))()
actual2.isEmpty mustEqual false
actual2(0).spans.isEmpty mustEqual false
actual2(0).spans(0) mustEqual span1
actual2(1).spans.isEmpty mustEqual false
actual2(1).spans(0) mustEqual span2

val trace2 = Trace(actual2(0))
val trace3 = Trace(actual2(1))
trace2.spans.isEmpty mustEqual false
trace2.spans(0) mustEqual span1
trace3.spans.isEmpty mustEqual false
trace3.spans(0) mustEqual span2
}

"getTracesByIds should return empty list if no trace exists" in {
val actual1 = cassandraStorage.getTracesByIds(List(span1.traceId))()
"getSpansByTraceIds should return empty list if no trace exists" in {
val actual1 = cassandraStorage.getSpansByTraceIds(List(span1.traceId))()
actual1.isEmpty mustEqual true
}

Expand Down

0 comments on commit cd48498

Please sign in to comment.