From 9914ff485dacaeb49486a5122b9a27bae2b3c83f Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Thu, 2 Aug 2012 11:06:48 -0700 Subject: [PATCH] Fix time skew adjustment for client sends with 127.0.0.1 as host IP Author: @franklinhu Fixes #94 URL: https://github.com/twitter/zipkin/pull/94 --- .../scala/com/twitter/zipkin/Constants.scala | 3 ++ .../com/twitter/zipkin/query/Trace.scala | 5 ++ .../query/adjusters/TimeSkewAdjuster.scala | 5 +- .../adjusters/TimeSkewAdjusterSpec.scala | 48 +++++++++++++++++++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/Constants.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/Constants.scala index 4cdca144f84..8587471f40a 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/Constants.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/Constants.scala @@ -26,4 +26,7 @@ object Constants { val CoreServer: Seq[String] = Seq(ServerRecv, ServerSend) val CoreAnnotations: Seq[String] = CoreClient ++ CoreServer + + /* 127.0.0.1 */ + val LocalhostLoopBackIP = (127 << 24) | 1 } \ No newline at end of file diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala index 3c506e82a29..6c23f3e4294 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/query/Trace.scala @@ -59,6 +59,11 @@ case class Trace(private val s: Seq[Span]) { s => s.parentId == None } + /** + * Find a span by the id. Note that this iterates through all the spans. + */ + def getSpanById(spanId: Long): Option[Span] = spans.find { s => s.id == spanId } + /** * In some cases we don't care if it's the actual root span or just the span * that is closes to the root. For example it could be that we don't yet log spans diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala index e59f4c43123..80e3ba22f98 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjuster.scala @@ -18,7 +18,7 @@ package com.twitter.zipkin.query.adjusters import com.twitter.finagle.tracing.{Trace => FTrace} import com.twitter.zipkin.common._ -import com.twitter.zipkin.gen +import com.twitter.zipkin.{Constants, gen} import com.twitter.zipkin.query.{Trace, SpanTreeEntry} import scala.collection.Map @@ -264,7 +264,8 @@ class TimeSkewAdjuster extends Adjuster { val annotations = span.annotations.map { a => a.host match { case Some(host) => - if (clockSkew.endpoint.ipv4 == host.ipv4) { + if (clockSkew.endpoint.ipv4 == host.ipv4 || + (a.value == Constants.ClientRecv || a.value == Constants.ClientSend) && Constants.LocalhostLoopBackIP == host.ipv4) { // found our host, adjust timestamp and add Annotation(a.timestamp - clockSkew.skew, a.value, Some(host)) } else { diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjusterSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjusterSpec.scala index 9aa2caeac22..d87439d5a7f 100644 --- a/zipkin-server/src/test/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjusterSpec.scala +++ b/zipkin-server/src/test/scala/com/twitter/zipkin/query/adjusters/TimeSkewAdjusterSpec.scala @@ -273,5 +273,53 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker { val trace1 = new Trace(Seq(span)) adjuster.adjust(trace1) mustEqual trace1 } + + "adjust even if we only have client send" in { + val tfeService = Endpoint(123, 9455, "api.twitter.com-ssl") + + val tfe = Span(142224153997690008L, "GET", 142224153997690008L, None, List( + Annotation(60498165L, gen.Constants.SERVER_RECV, Some(tfeService)), + Annotation(61031100L, gen.Constants.SERVER_SEND, Some(tfeService)) + ), Nil) + + val monorailService = Endpoint(456, 8000, "monorail") + val clusterTwitterweb = Endpoint(123, -13145, "cluster_twitterweb_unicorn") + + val monorail = Span(142224153997690008L, "following/index", 7899774722699781565L, Some(142224153997690008L), List( + Annotation(59501663L, gen.Constants.SERVER_RECV, Some(monorailService)), + Annotation(59934508L, gen.Constants.SERVER_SEND, Some(monorailService)), + Annotation(60499730L, gen.Constants.CLIENT_SEND, Some(clusterTwitterweb)), + Annotation(61030844L, gen.Constants.CLIENT_RECV, Some(clusterTwitterweb)) + ), Nil) + + val tflockService = Endpoint(456, -14238, "tflock") + val flockdbEdgesService = Endpoint(789, 6915, "flockdb_edges") + + val tflock = Span(142224153997690008L, "select", 6924056367845423617L, Some(7899774722699781565L), List( + Annotation(59541848L, gen.Constants.CLIENT_SEND, Some(tflockService)), + Annotation(59544889L, gen.Constants.CLIENT_RECV, Some(tflockService)), + Annotation(59541031L, gen.Constants.SERVER_RECV, Some(flockdbEdgesService)), + Annotation(59542894L, gen.Constants.SERVER_SEND, Some(flockdbEdgesService)) + ), Nil) + + val flockService = Endpoint(2130706433, 0, "flock") + + val flock = Span(142224153997690008L, "select", 7330066031642813936L, Some(6924056367845423617L), List( + Annotation(59541299L, gen.Constants.CLIENT_SEND, Some(flockService)), + Annotation(59542778L, gen.Constants.CLIENT_RECV, Some(flockService)) + ), Nil) + + val trace = new Trace(Seq(monorail, tflock, tfe, flock)) + val adjusted = adjuster.adjust(trace) + + // let's see how we did + val adjustedFlock = adjusted.getSpanById(7330066031642813936L).get + val adjustedTflock = adjusted.getSpanById(6924056367845423617L).get + val flockCs = adjustedFlock.getAnnotation(gen.Constants.CLIENT_SEND).get + val tflockSr = adjustedTflock.getAnnotation(gen.Constants.SERVER_RECV).get + + // tflock must receive the request before it send a request to flock + flockCs.timestamp must be_>(tflockSr.timestamp) + } } }