Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace: merge spans and sort by timestamp by default #91

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 8 additions & 25 deletions zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ object Trace {
def apply(spanTree: SpanTreeEntry): Trace = Trace(spanTree.toList)
}

case class Trace(spans: Seq[Span]) {
case class Trace(private val s: Seq[Span]) {

val log = Logger.get(getClass.getName)

lazy val spans = mergeBySpanId(s).toSeq.sortWith {
(a, b) =>
val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
aTimestamp < bTimestamp
}

/**
* Find the trace id for this trace.
* Returns none if we have no spans to look up id by
Expand Down Expand Up @@ -157,16 +164,6 @@ case class Trace(spans: Seq[Span]) {
}.flatten
}

/**
* Incoming data can have multiple entries for the same Span, for example
* data sent from client as one span and data from the server as one span.
*
* This method merges them by span id into one object per id.
*/
def mergeSpans: Trace = {
new Trace(mergeBySpanId(spans).toList)
}

/**
* Merge all the spans objects with the same span ids into one per id.
* We store parts of spans in different columns in order to make writes
Expand Down Expand Up @@ -224,20 +221,6 @@ case class Trace(spans: Seq[Span]) {
}
}

/**
* Return a Trace sorted by the first annotation in each span.
*/
def sortedByTimestamp: Trace = {
Trace {
spans.sortWith {
(a, b) =>
val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue)
aTimestamp < bTimestamp
}
}
}

/**
* Print the trace tree to give the user an overview.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object JsonQueryAdapter extends QueryAdapter {
val startAndEnd = t.getStartAndEndTimestamp.get
JsonTrace(
t.id.map(_.toString).getOrElse(""),
t.mergeSpans.spans.map(JsonAdapter(_)),
t.spans.map(JsonAdapter(_)),
startAndEnd.start,
startAndEnd.end,
startAndEnd.end - startAndEnd.start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.finagle.tracing.{Trace => FTrace}
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.Service
import com.twitter.util.Future
import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter}
import com.twitter.zipkin.adapter.ThriftQueryAdapter
import com.twitter.zipkin.gen
import com.twitter.zipkin.query.adjusters.Adjuster
import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Index, Storage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TimeSkewAdjuster extends Adjuster {
case None => return trace // no root span found, returning as is
case Some(s) => {
val spans = adjust(trace.getSpanTree(s, trace.getIdToChildrenMap), None)
Trace(spans).sortedByTimestamp
Trace(spans)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ trait CassandraStorage extends Storage with Cassandra {
CASSANDRA_GET_TRACE_TOO_BIG.incr()
None
} else {
Some(Trace(spans.toSeq).mergeSpans.sortedByTimestamp)
Some(Trace(spans.toSeq))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ class TraceSpec extends Specification {
"sort spans by first annotation timestamp" in {
val inputSpans = List[Span](span4, span3, span5, span1, span2)
val expectedTrace = Trace(List[Span](span1, span2, span3, span4, span5))
val actualTrace = Trace(inputSpans).sortedByTimestamp
val actualTrace = Trace(inputSpans)

expectedTrace mustEqual actualTrace
expectedTrace.spans mustEqual actualTrace.spans
}

"merge spans" in {
Expand All @@ -175,7 +175,7 @@ class TraceSpec extends Specification {
val spanToMerge2 = Span(12345, "methodcall2", span2Id, Some(span1Id), ann2, Nil)
val spanMerged = Span(12345, "methodcall2", span2Id, Some(span1Id), annMerged, Nil)

Trace(List(spanMerged)) mustEqual Trace(List(spanToMerge1, spanToMerge2)).mergeSpans
Trace(List(spanMerged)).spans mustEqual Trace(List(spanToMerge1, spanToMerge2)).spans
}

"get rootmost span from full trace" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
qs.start()

expect {
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4.mergeSpans))
1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace4))
}

val ann1 = gen.TimelineAnnotation(100, gen.Constants.CLIENT_SEND,
Expand Down Expand Up @@ -295,7 +295,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val realTrace = Trace(List(rs1, rs2))

expect {
1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace.mergeSpans))
1.of(storage).getTracesByIds(List(4488677265848750007L)) willReturn Future(List(realTrace))
}

val actual = qs.getTraceTimelinesByIds(List(4488677265848750007L), List(gen.Adjust.TimeSkew))()
Expand Down Expand Up @@ -357,7 +357,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker {
val realTrace = Trace(List(rs1, rs2))

expect {
1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace.mergeSpans))
1.of(storage).getTracesByIds(List(-6120267009876080004L)) willReturn Future(List(realTrace))
}

val actual = qs.getTraceTimelinesByIds(List(-6120267009876080004L), List(gen.Adjust.TimeSkew))()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val span2 = Span(1, "multiget_slice", -855543208864892776L, Some(2209720933601260005L),
List(ann2, ann5), Nil)

val realTrace = new Trace(List(span1a, span1b, span2)).mergeSpans
val expectedRealTrace = new Trace(List(span1aFixed, span1b, span2)).mergeSpans
val realTrace = new Trace(List(span1a, span1b, span2))
val expectedRealTrace = new Trace(List(span1aFixed, span1b, span2))

val adjuster = new TimeSkewAdjuster

Expand Down Expand Up @@ -167,7 +167,7 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val monorailSs = Annotation(3L, gen.Constants.SERVER_SEND, epMonorail)
val unicornCr = Annotation(4L, gen.Constants.CLIENT_RECV, epTfe)
val goodSpan = Span(1, "friendships/create", 12345L, None, List(unicornCs, monorailSr, monorailSs, unicornCr), Nil)
val goodTrace = new Trace(Seq(goodSpan)).mergeSpans
val goodTrace = new Trace(Seq(goodSpan))

val actualTrace = adjuster.adjust(goodTrace)
goodTrace mustEqual actualTrace
Expand All @@ -191,8 +191,8 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val adjustedMonorailSs = Annotation(1330539327145012L, gen.Constants.SERVER_SEND, epMonorail)
val spanAdjustedMonorail = Span(1, "friendships/create", 6379677665629798877L, Some(7264365917420400007L), List(unicornCs, adjustedMonorailSr, adjustedMonorailSs, unicornCr), Nil)

val realTrace = new Trace(Seq(spanTfe, spanMonorailUnicorn)).mergeSpans
val expectedAdjustedTrace = new Trace(Seq(spanTfe, spanAdjustedMonorail)).mergeSpans
val realTrace = new Trace(Seq(spanTfe, spanMonorailUnicorn))
val expectedAdjustedTrace = new Trace(Seq(spanTfe, spanAdjustedMonorail))

val adjusted = adjuster.adjust(realTrace)

Expand Down Expand Up @@ -236,7 +236,7 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val spanAdjustedGizmoduck = Span(1, "get_by_auth_token", 119310086840195752L, Some(7625434200987291951L), List(passbirdCs, passbirdCr, createdGizmoduckSr, createdGizmoduckSs), Nil)
val spanAdjustedMemcache = Span(1, "Get", 3983355768376203472L, Some(119310086840195752L), List(adjustedGizmoduckCs, adjustedGizmoduckCr), Nil)

val realTrace = new Trace(Seq(spanTfe, spanPassbird, spanGizmoduck, spanMemcache)).mergeSpans
val realTrace = new Trace(Seq(spanTfe, spanPassbird, spanGizmoduck, spanMemcache))
val adjustedTrace = new Trace(Seq(spanTfe, spanPassbird, spanAdjustedGizmoduck, spanAdjustedMemcache))

adjuster.adjust(realTrace) mustEqual adjustedTrace
Expand Down