From a3327f0b606e31059159b0efa8d328fa08247357 Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Mon, 9 Jul 2012 10:01:32 -0700 Subject: [PATCH] Move trace (part 2) * Move Trace to zipkin-common * Pull out thrift dependencies into zipkin-scrooge Author: @franklinhu Fixes #67 URL: https://github.com/twitter/zipkin/pull/67 --- .../twitter/zipkin/adapter/QueryAdapter.scala | 14 ++- .../com/twitter/zipkin/common/Trace.scala | 99 +++++++------------ .../twitter/zipkin/common/TraceSummary.scala | 13 +++ .../twitter/zipkin/query}/SpanTreeEntry.scala | 14 +-- .../com/twitter/zipkin/query/TraceCombo.scala | 30 ++++++ .../twitter/zipkin/query/TraceTimeline.scala | 40 +++++++- .../zipkin/adapter/ThriftQueryAdapter.scala | 33 ++++++- .../twitter/zipkin/query/QueryService.scala | 63 +++++++----- .../adjusters/AdjusterSpanTreeEntry.scala | 3 +- .../query/adjusters/TimeSkewAdjuster.scala | 1 + .../query/conversions/TraceToTimeline.scala | 60 ----------- .../com/twitter/zipkin/common/TraceSpec.scala | 9 +- .../zipkin/query/QueryServiceSpec.scala | 8 +- ...lineSpec.scala => TraceTimelineSpec.scala} | 8 +- 14 files changed, 219 insertions(+), 176 deletions(-) rename {zipkin-server => zipkin-common}/src/main/scala/com/twitter/zipkin/common/Trace.scala (71%) rename {zipkin-server/src/main/scala/com/twitter/zipkin/common => zipkin-common/src/main/scala/com/twitter/zipkin/query}/SpanTreeEntry.scala (86%) create mode 100644 zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceCombo.scala delete mode 100644 zipkin-server/src/main/scala/com/twitter/zipkin/query/conversions/TraceToTimeline.scala rename zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/{TraceToTimelineSpec.scala => TraceTimelineSpec.scala} (96%) diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/adapter/QueryAdapter.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/adapter/QueryAdapter.scala index 268215b358f..e3b9643b4c8 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/adapter/QueryAdapter.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/adapter/QueryAdapter.scala @@ -15,15 +15,27 @@ */ package com.twitter.zipkin.adapter -import com.twitter.zipkin.query.{TraceTimeline, TimelineAnnotation} +import com.twitter.zipkin.common.Trace +import com.twitter.zipkin.query.{TraceCombo, TraceTimeline, TimelineAnnotation} +/** + * Adapter for query related structs + */ trait QueryAdapter { type timelineAnnotationType /* corresponds to com.twitter.zipkin.query.TimelineAnnotation */ type traceTimelineType /* corresponds to com.twitter.zipkin.query.TraceTimeline */ + type traceComboType /* corresponds to com.twitter.zipkin.query.TraceCombo */ + type traceType /* corresponds to com.twitter.zipkin.query.Trace */ def apply(t: timelineAnnotationType): TimelineAnnotation def apply(t: TimelineAnnotation): timelineAnnotationType def apply(t: traceTimelineType): TraceTimeline def apply(t: TraceTimeline): traceTimelineType + + def apply(t: traceComboType): TraceCombo + def apply(t: TraceCombo): traceComboType + + def apply(t: traceType): Trace + def apply(t: Trace): traceType } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/common/Trace.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/common/Trace.scala similarity index 71% rename from zipkin-server/src/main/scala/com/twitter/zipkin/common/Trace.scala rename to zipkin-common/src/main/scala/com/twitter/zipkin/common/Trace.scala index cdf696c0882..d0dc50e57c5 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/common/Trace.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/common/Trace.scala @@ -1,51 +1,38 @@ /* * Copyright 2012 Twitter Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.twitter.zipkin.common -import com.twitter.zipkin.gen -import collection.mutable -import mutable.HashMap import com.twitter.logging.Logger import java.nio.ByteBuffer +import scala.collection.mutable import com.twitter.finagle.tracing.{Trace => FTrace} -import com.twitter.zipkin.query.conversions.TraceToTimeline -import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} -import com.twitter.zipkin.query.TraceTimeline +import com.twitter.zipkin.query.SpanTreeEntry + +/** + * A chunk of time, between a start and an end. + */ +case class Timespan(start: Long, end: Long) /** * Represents a trace, a bundle of spans. */ object Trace { - def apply(spanTree: SpanTreeEntry): Trace = Trace(spanTree.toList) - - def fromThrift(trace: gen.Trace): Trace = { - new Trace(trace.spans.map(ThriftAdapter(_)).toList) - } - } - -/** - * A chunk of time, between a start and an end. - */ -case class Timespan(start: Long, end: Long) - - case class Trace(spans: Seq[Span]) { val log = Logger.get(getClass.getName) @@ -61,7 +48,9 @@ case class Trace(spans: Seq[Span]) { /** * Find the root span of this trace and return */ - def getRootSpan: Option[Span] = spans.find { s => s.parentId == None } + def getRootSpan: Option[Span] = spans.find { + s => s.parentId == None + } /** * In some cases we don't care if it's the actual root span or just the span @@ -72,8 +61,9 @@ case class Trace(spans: Seq[Span]) { lazy val getRootMostSpan: Option[Span] = { getRootSpan.orElse { val idSpan = getIdToSpanMap - spans.headOption.map { s => - recursiveGetRootMostSpan(idSpan, s) + spans.headOption.map { + s => + recursiveGetRootMostSpan(idSpan, s) } } } @@ -95,8 +85,8 @@ case class Trace(spans: Seq[Span]) { a => a.timestamp } } match { - case Nil => None // No annotations - case s @ _ => Some(Timespan(s.min, s.max)) + case Nil => None // No annotations + case s@_ => Some(Timespan(s.min, s.max)) } } @@ -133,31 +123,6 @@ case class Trace(spans: Seq[Span]) { } } - def toThrift: gen.Trace = { - FTrace.record("toThrift") - gen.Trace(spans.map { ThriftAdapter(_) }) - } - - /** - * Return a summary of this trace or none if we - * cannot construct a trace summary. Could be that we have no spans. - */ - def toTraceSummary: Option[TraceSummary] = { - FTrace.record("toTraceSummary") - for (traceId <- id; startEnd <- getStartAndEndTimestamp) - yield TraceSummary(traceId, startEnd.start, startEnd.end, (startEnd.end - startEnd.start).toInt, - serviceCounts, endpoints.toList) - } - - def toTimeline: Option[TraceTimeline] = { - FTrace.record("toTimeline") - TraceToTimeline(this) - } - - def toTraceCombo: gen.TraceCombo = { - gen.TraceCombo(toThrift, toTraceSummary.map(ThriftAdapter(_)), toTimeline.map(ThriftQueryAdapter(_)), toSpanDepths) - } - /** * Figures out the "span depth". This is used in the ui * to figure out how to lay out the spans in the visualization. @@ -179,7 +144,7 @@ case class Trace(spans: Seq[Span]) { */ def getBinaryAnnotationsByKey(key: String): Seq[ByteBuffer] = { spans.flatMap(_.binaryAnnotations.collect { - case gen.BinaryAnnotation(bKey, bValue, _, _) if (bKey == key) => bValue + case BinaryAnnotation(bKey, bValue, _, _) if (bKey == key) => bValue }.toSeq) } @@ -202,13 +167,13 @@ case class Trace(spans: Seq[Span]) { new Trace(mergeBySpanId(spans).toList) } - /** + /** * Merge all the spans objects with the same span ids into one per id. * We store parts of spans in different columns in order to make writes * faster and simpler. This means we have to merge them correctly on read. */ - private def mergeBySpanId(spans: Iterable[Span]) : Iterable[Span] = { - val spanMap = new HashMap[Long, Span] + private def mergeBySpanId(spans: Iterable[Span]): Iterable[Span] = { + val spanMap = new mutable.HashMap[Long, Span] spans.foreach(s => { val oldSpan = spanMap.get(s.id) oldSpan match { @@ -239,7 +204,9 @@ case class Trace(spans: Seq[Span]) { /* * Turn the Trace into a map of Span Id -> Span */ - def getIdToSpanMap: Map[Long, Span] = spans.map{ s => (s.id, s)}.toMap + def getIdToSpanMap: Map[Long, Span] = spans.map { + s => (s.id, s) + }.toMap /** * Get the spans of this trace in a tree form. SpanTreeEntry wraps a Span and it's children. @@ -262,10 +229,11 @@ case class Trace(spans: Seq[Span]) { */ def sortedByTimestamp: Trace = { Trace { - spans.sortWith{(a, b) => - val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue) - val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue) - aTimestamp < bTimestamp + spans.sortWith { + (a, b) => + val aTimestamp = a.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue) + val bTimestamp = b.firstAnnotation.map(_.timestamp).getOrElse(Long.MaxValue) + aTimestamp < bTimestamp } } } @@ -279,5 +247,4 @@ case class Trace(spans: Seq[Span]) { case None => println("No root node found") } } - } diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/common/TraceSummary.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/common/TraceSummary.scala index 2a1fb2e916c..2ed791a831a 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/common/TraceSummary.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/common/TraceSummary.scala @@ -18,6 +18,19 @@ package com.twitter.zipkin.common import scala.collection.Map +object TraceSummary { + + /** + * Return a summary of this trace or none if we + * cannot construct a trace summary. Could be that we have no spans. + */ + def apply(trace: Trace): Option[TraceSummary] = { + for (traceId <- trace.id; startEnd <- trace.getStartAndEndTimestamp) + yield TraceSummary(traceId, startEnd.start, startEnd.end, (startEnd.end - startEnd.start).toInt, + trace.serviceCounts, trace.endpoints.toList) + } +} + /** * @param traceId id of this trace * @param startTimestamp when did the trace start? diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/common/SpanTreeEntry.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/query/SpanTreeEntry.scala similarity index 86% rename from zipkin-server/src/main/scala/com/twitter/zipkin/common/SpanTreeEntry.scala rename to zipkin-common/src/main/scala/com/twitter/zipkin/query/SpanTreeEntry.scala index 83d11fb6945..06a23e0fd29 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/common/SpanTreeEntry.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/query/SpanTreeEntry.scala @@ -1,6 +1,6 @@ /* * Copyright 2012 Twitter Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -14,18 +14,20 @@ * limitations under the License. * */ -package com.twitter.zipkin.common +package com.twitter.zipkin.query + +import com.twitter.zipkin.common.Span /** * This represents a tree version of a Trace. */ case class SpanTreeEntry(span: Span, children: List[SpanTreeEntry]) { - def toList : List[Span] = { + def toList: List[Span] = { childrenToList(this) } - private def childrenToList(span: SpanTreeEntry) : List[Span] = { + private def childrenToList(span: SpanTreeEntry): List[Span] = { if (span.children.isEmpty) { List[Span](span.span) } else { @@ -43,7 +45,7 @@ case class SpanTreeEntry(span: Span, children: List[SpanTreeEntry]) { // start out with this span's depth (at startDepth) // fold in the childrens depth (increase the current one by 1) children.foldLeft(Map(span.id -> startDepth))((prevMap, child) => - prevMap ++ child.depths(startDepth+1) + prevMap ++ child.depths(startDepth + 1) ) } @@ -53,7 +55,7 @@ case class SpanTreeEntry(span: Span, children: List[SpanTreeEntry]) { */ def printTree(indent: Int) { println("%s%s".format(" " * indent, span.toString)) - children foreach(s => { + children foreach (s => { s.printTree(indent + 2) }) } diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceCombo.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceCombo.scala new file mode 100644 index 00000000000..71ee349e74d --- /dev/null +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceCombo.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.zipkin.query + +import com.twitter.zipkin.common.{Trace, TraceSummary} + +object TraceCombo { + def apply(trace: Trace): TraceCombo = { + TraceCombo(trace, TraceSummary(trace), TraceTimeline(trace), trace.toSpanDepths) + } +} + +/** + * Combined trace, summary, timeline + */ +case class TraceCombo(trace: Trace, traceSummary: Option[TraceSummary], traceTimeline: Option[TraceTimeline], + spanDepths: Option[Map[Long, Int]]) diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceTimeline.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceTimeline.scala index d9a9bf2ea48..72fda597f97 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceTimeline.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/query/TraceTimeline.scala @@ -15,7 +15,45 @@ */ package com.twitter.zipkin.query -import com.twitter.zipkin.common.BinaryAnnotation +import com.twitter.zipkin.common.{Trace, Endpoint, BinaryAnnotation} + +object TraceTimeline { + def apply(trace: Trace): Option[TraceTimeline] = { + if (trace.spans.isEmpty) { + return None + } + + // convert span and annotation to timeline annotation + val annotations = trace.spans.flatMap(s => + s.annotations.map{ a => + TimelineAnnotation( + a.timestamp, + a.value, + a.host match { + case Some(s) => s + case None => Endpoint.Unknown + }, + s.id, + s.parentId, + a.host match { + case Some(s) => s.serviceName + case None => "Unknown" + }, + s.name) + } + ).sortWith((a, b) => { + a.timestamp < b.timestamp + + // TODO also sort so that events that must have happened first (cs before sr for example) + // end up in the right order + }) + + val rootSpanId = trace.getRootMostSpan.getOrElse(return None).id + val id = trace.id.getOrElse(return None) + + Some(TraceTimeline(id, rootSpanId, annotations, trace.getBinaryAnnotations)) + } +} /** * Query side struct that contains diff --git a/zipkin-scrooge/src/main/scala/com/twitter/zipkin/adapter/ThriftQueryAdapter.scala b/zipkin-scrooge/src/main/scala/com/twitter/zipkin/adapter/ThriftQueryAdapter.scala index 5ecd9fef676..4b6dac1178a 100644 --- a/zipkin-scrooge/src/main/scala/com/twitter/zipkin/adapter/ThriftQueryAdapter.scala +++ b/zipkin-scrooge/src/main/scala/com/twitter/zipkin/adapter/ThriftQueryAdapter.scala @@ -16,11 +16,14 @@ package com.twitter.zipkin.adapter import com.twitter.zipkin.gen -import com.twitter.zipkin.query.{TraceTimeline, TimelineAnnotation} +import com.twitter.zipkin.query.{TraceCombo, TraceTimeline, TimelineAnnotation} +import com.twitter.zipkin.common.Trace object ThriftQueryAdapter extends QueryAdapter { type timelineAnnotationType = gen.TimelineAnnotation type traceTimelineType = gen.TraceTimeline + type traceComboType = gen.TraceCombo + type traceType = gen.Trace /* TimelineAnnotation from Thrift */ def apply(t: timelineAnnotationType): TimelineAnnotation = { @@ -63,4 +66,32 @@ object ThriftQueryAdapter extends QueryAdapter { t.annotations.map { ThriftQueryAdapter(_) }, t.binaryAnnotations.map { ThriftAdapter(_) }) } + + /* TraceCombo from Thrift */ + def apply(t: traceComboType): TraceCombo = { + TraceCombo( + ThriftQueryAdapter(t.`trace`), + t.`summary`.map(ThriftAdapter(_)), + t.`timeline`.map(ThriftQueryAdapter(_)), + t.`spanDepths`.map(_.toMap)) + } + + /* TraceCombo to Thrift */ + def apply(t: TraceCombo): traceComboType = { + gen.TraceCombo( + ThriftQueryAdapter(t.trace), + t.traceSummary.map(ThriftAdapter(_)), + t.traceTimeline.map(ThriftQueryAdapter(_)), + t.spanDepths) + } + + /* Trace from Thrift */ + def apply(t: traceType): Trace = { + Trace(t.`spans`.map(ThriftAdapter(_))) + } + + /* Trace to Thrift */ + def apply(t: Trace): traceType = { + gen.Trace(t.spans.map(ThriftAdapter(_))) + } } diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala index 8a6bb5deb25..7483d435b1c 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala @@ -20,7 +20,7 @@ import com.twitter.conversions.time._ import com.twitter.logging.Logger import com.twitter.ostrich.stats.Stats import com.twitter.ostrich.admin.Service -import com.twitter.finagle.tracing.Trace +import com.twitter.finagle.tracing.{Trace => FTrace} import com.twitter.util.Future import com.twitter.zipkin.gen import com.twitter.zipkin.query.adjusters.Adjuster @@ -28,6 +28,7 @@ import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Index, Storage} import java.nio.ByteBuffer import org.apache.thrift.TException import scala.collection.Set +import com.twitter.zipkin.common.TraceSummary import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} /** @@ -82,11 +83,11 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus // do we have a valid span name to query indexes by? val span = convertToOption(spanName) - Trace.recordBinary("serviceName", serviceName) - Trace.recordBinary("spanName", spanName) - Trace.recordBinary("endTs", endTs) - Trace.recordBinary("limit", limit) - Trace.recordBinary("order", order) + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("spanName", spanName) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) Stats.timeFutureMillis("query.getTraceIdsByName") { { @@ -115,10 +116,10 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus return Future.exception(gen.QueryException("No service name provided, we need one")) } - Trace.recordBinary("serviceName", serviceName) - Trace.recordBinary("endTs", endTs) - Trace.recordBinary("limit", limit) - Trace.recordBinary("order", order) + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) Stats.timeFutureMillis("query.getTraceIdsByServiceName") { { @@ -150,11 +151,11 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus // do we have a valid annotation value to query indexes by? val valueOption = convertToOption(value) - Trace.recordBinary("serviceName", serviceName) - Trace.recordBinary("annotation", annotation) - Trace.recordBinary("endTs", endTs) - Trace.recordBinary("limit", limit) - Trace.recordBinary("order", order) + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("annotation", annotation) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) Stats.timeFutureMillis("query.getTraceIdsByAnnotation") { { @@ -176,11 +177,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) - Trace.recordBinary("numIds", traceIds.length) + FTrace.recordBinary("numIds", traceIds.length) Stats.timeFutureMillis("query.getTracesByIds") { - storage.getTracesByIds(traceIds).map { id => - id.map(adjusters.foldLeft(_)((trace, adjuster) => adjuster.adjust(trace)).toThrift) + storage.getTracesByIds(traceIds).map { traces => + traces.map { trace => + ThriftQueryAdapter(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))) + } } rescue { case e: Exception => log.error(e, "getTracesByIds query failed") @@ -198,11 +201,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) - Trace.recordBinary("numIds", traceIds.length) + FTrace.recordBinary("numIds", traceIds.length) Stats.timeFutureMillis("query.getTraceTimelinesByIds") { - storage.getTracesByIds(traceIds).map { id => - id.flatMap(adjusters.foldLeft(_)((trace, adjuster) => adjuster.adjust(trace)).toTimeline.map(ThriftQueryAdapter(_))) + storage.getTracesByIds(traceIds).map { traces => + traces.flatMap { trace => + TraceTimeline(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_)) + } } rescue { case e: Exception => log.error(e, "getTraceTimelinesByIds query failed") @@ -220,11 +225,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) - Trace.recordBinary("numIds", traceIds.length) + FTrace.recordBinary("numIds", traceIds.length) Stats.timeFutureMillis("query.getTraceSummariesByIds") { - storage.getTracesByIds(traceIds.toList).map { id => - id.flatMap(adjusters.foldLeft(_)((trace, adjuster) => adjuster.adjust(trace)).toTraceSummary.map(ThriftAdapter(_))) + storage.getTracesByIds(traceIds.toList).map { traces => + traces.flatMap { trace => + TraceSummary(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftAdapter(_)) + } } rescue { case e: Exception => log.error(e, "getTraceSummariesByIds query failed") @@ -241,11 +248,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus val adjusters = getAdjusters(adjust) - Trace.recordBinary("numIds", traceIds.length) + FTrace.recordBinary("numIds", traceIds.length) Stats.timeFutureMillis("query.getTraceComboByIds") { - storage.getTracesByIds(traceIds).map { id => - id.map(adjusters.foldLeft(_)((trace, adjuster) => adjuster.adjust(trace)).toTraceCombo) + storage.getTracesByIds(traceIds).map { traces => + traces.map { trace => + ThriftQueryAdapter(TraceCombo(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t)))) + } } rescue { case e: Exception => log.error(e, "getTraceCombosByIds query failed") diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/AdjusterSpanTreeEntry.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/AdjusterSpanTreeEntry.scala index 4c59293b72c..fded485b755 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/AdjusterSpanTreeEntry.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/AdjusterSpanTreeEntry.scala @@ -15,7 +15,8 @@ */ package com.twitter.zipkin.query.adjusters -import com.twitter.zipkin.common.{Span, SpanTreeEntry} +import com.twitter.zipkin.query.SpanTreeEntry +import com.twitter.zipkin.common.Span trait AdjusterMessage case class AdjusterWarning(msg: String) extends AdjusterMessage diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala index 248dc076a7e..44510deaf10 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala @@ -20,6 +20,7 @@ import com.twitter.zipkin.gen import scala.collection.Map import com.twitter.zipkin.common._ import com.twitter.finagle.tracing.{Trace => FTrace} +import com.twitter.zipkin.query.SpanTreeEntry class TimeSkewAdjuster extends Adjuster { diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/conversions/TraceToTimeline.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/conversions/TraceToTimeline.scala deleted file mode 100644 index df86cb5c21c..00000000000 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/conversions/TraceToTimeline.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2012 Twitter Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.zipkin.query.conversions - -import com.twitter.zipkin.common._ -import com.twitter.zipkin.query.{TraceTimeline, TimelineAnnotation} - -object TraceToTimeline { - - def apply(trace: Trace): Option[TraceTimeline] = { - - if (trace.spans.isEmpty) { - return None - } - - // convert span and annotation to timeline annotation - val annotations = trace.spans.flatMap(s => - s.annotations.map{ a => - TimelineAnnotation( - a.timestamp, - a.value, - a.host match { - case Some(s) => s - case None => Endpoint.Unknown - }, - s.id, - s.parentId, - a.host match { - case Some(s) => s.serviceName - case None => "Unknown" - }, - s.name) - } - ).sortWith((a, b) => { - a.timestamp < b.timestamp - - // TODO also sort so that events that must have happened first (cs before sr for example) - // end up in the right order - }) - - val rootSpanId = trace.getRootMostSpan.getOrElse(return None).id - val id = trace.id.getOrElse(return None) - - Some(TraceTimeline(id, rootSpanId, annotations, trace.getBinaryAnnotations)) - } - -} diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/common/TraceSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/common/TraceSpec.scala index 3fc5fd2287d..9b488dd1458 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/common/TraceSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/common/TraceSpec.scala @@ -20,7 +20,8 @@ import org.specs.Specification import com.twitter.zipkin.gen import collection.mutable import java.nio.ByteBuffer -import com.twitter.zipkin.adapter.ThriftAdapter +import com.twitter.zipkin.query.SpanTreeEntry +import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} class TraceSpec extends Specification { @@ -52,8 +53,8 @@ class TraceSpec extends Specification { val span = Span(12345, "methodcall", 666, None, List(Annotation(1, "boaoo", None)), Nil) val expectedTrace = Trace(List[Span](span)) - val thriftTrace = expectedTrace.toThrift - val actualTrace = Trace.fromThrift(thriftTrace) + val thriftTrace = ThriftQueryAdapter(expectedTrace) + val actualTrace = ThriftQueryAdapter(thriftTrace) expectedTrace mustEqual actualTrace } @@ -90,7 +91,7 @@ class TraceSpec extends Specification { val span2 = Span(123, "method_2", 200, Some(100), ann2, Nil) val trace = new Trace(Seq(span1, span2)) - val duration = trace.toTraceSummary.get.durationMicro + val duration = TraceSummary(trace).get.durationMicro duration mustEqual 12 } diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala index 0a291cbfe80..4ef8fb3e8cd 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/query/QueryServiceSpec.scala @@ -19,12 +19,10 @@ package com.twitter.zipkin.query import adjusters.{TimeSkewAdjuster, NullAdjuster} import org.specs.Specification import org.specs.mock.{ClassMocker, JMocker} -import scala.collection.immutable.Set import com.twitter.zipkin.gen import com.twitter.zipkin.common._ import java.nio.ByteBuffer import com.twitter.util.Future -import com.twitter.scrooge.BinaryThriftStructSerializer import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Storage, Index} import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} @@ -179,9 +177,9 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { expect { one(storage).getTracesByIds(List(traceId)) willReturn Future(List(trace1)) } - val trace = trace1.toThrift + val trace = ThriftQueryAdapter(trace1) val summary = ThriftAdapter(TraceSummary(1, 100, 150, 50, Map("service1" -> 1), List(ep1))) - val timeline = trace1.toTimeline.map(ThriftQueryAdapter(_)) + val timeline = TraceTimeline(trace1).map(ThriftQueryAdapter(_)) val combo = gen.TraceCombo(trace, Some(summary), timeline, Some(Map(666L -> 1))) Seq(combo) mustEqual qs.getTraceCombosByIds(List(traceId), List())() } @@ -237,7 +235,7 @@ class QueryServiceSpec extends Specification with JMocker with ClassMocker { 1.of(storage).getTracesByIds(List(1L)) willReturn Future(List(trace1)) } - val expected = List(trace1.toThrift) + val expected = List(ThriftQueryAdapter(trace1)) val actual = qs.getTracesByIds(List(1L), List())() expected mustEqual actual } diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceToTimelineSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceTimelineSpec.scala similarity index 96% rename from zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceToTimelineSpec.scala rename to zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceTimelineSpec.scala index d4251d24fa5..126f03f81b4 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceToTimelineSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/query/conversions/TraceTimelineSpec.scala @@ -25,7 +25,7 @@ import com.twitter.zipkin.adapter.ThriftAdapter import com.twitter.zipkin.query.{TimelineAnnotation, TraceTimeline} import com.twitter.zipkin.common._ -class TraceToTimelineSpec extends Specification with JMocker with ClassMocker { +class TraceTimelineSpec extends Specification with JMocker with ClassMocker { //T = 0 koalabird-cuckoo ValuesFromSource Server receive 10.34.238.111 ():9149 @@ -103,14 +103,14 @@ class TraceToTimelineSpec extends Specification with JMocker with ClassMocker { val expectedTimeline = TraceTimeline(1, 2209720933601260005L, List(tAnn3, tAnn1, tAnn2, tAnn5, tAnn4, tAnn6), List(ba1)) - "TraceToTimelineSpec" should { + "TraceTimelineSpec" should { "convert to timeline with correct annotations ordering" in { - val actualTimeline = TraceToTimeline(trace) + val actualTimeline = TraceTimeline(trace) Some(expectedTimeline) mustEqual actualTimeline } "return none if empty trace" in { - val actualTimeline = TraceToTimeline(new Trace(List())) + val actualTimeline = TraceTimeline(new Trace(List())) None mustEqual actualTimeline } }