Skip to content

Commit

Permalink
Fix time skew adjustment for client sends with 127.0.0.1 as host IP
Browse files Browse the repository at this point in the history
Author: @franklinhu
Fixes #94
URL: #94
  • Loading branch information
Franklin Hu committed Aug 2, 2012
1 parent cd65332 commit 9914ff4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ object Constants {
val CoreServer: Seq[String] = Seq(ServerRecv, ServerSend)

val CoreAnnotations: Seq[String] = CoreClient ++ CoreServer

/* 127.0.0.1 */
val LocalhostLoopBackIP = (127 << 24) | 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ case class Trace(private val s: Seq[Span]) {
s => s.parentId == None
}

/**
* Find a span by the id. Note that this iterates through all the spans.
*/
def getSpanById(spanId: Long): Option[Span] = spans.find { s => s.id == spanId }

/**
* In some cases we don't care if it's the actual root span or just the span
* that is closes to the root. For example it could be that we don't yet log spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.query.adjusters

import com.twitter.finagle.tracing.{Trace => FTrace}
import com.twitter.zipkin.common._
import com.twitter.zipkin.gen
import com.twitter.zipkin.{Constants, gen}
import com.twitter.zipkin.query.{Trace, SpanTreeEntry}
import scala.collection.Map

Expand Down Expand Up @@ -264,7 +264,8 @@ class TimeSkewAdjuster extends Adjuster {
val annotations = span.annotations.map { a =>
a.host match {
case Some(host) =>
if (clockSkew.endpoint.ipv4 == host.ipv4) {
if (clockSkew.endpoint.ipv4 == host.ipv4 ||
(a.value == Constants.ClientRecv || a.value == Constants.ClientSend) && Constants.LocalhostLoopBackIP == host.ipv4) {
// found our host, adjust timestamp and add
Annotation(a.timestamp - clockSkew.skew, a.value, Some(host))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,53 @@ class TimeSkewAdjusterSpec extends Specification with JMocker with ClassMocker {
val trace1 = new Trace(Seq(span))
adjuster.adjust(trace1) mustEqual trace1
}

"adjust even if we only have client send" in {
val tfeService = Endpoint(123, 9455, "api.twitter.com-ssl")

val tfe = Span(142224153997690008L, "GET", 142224153997690008L, None, List(
Annotation(60498165L, gen.Constants.SERVER_RECV, Some(tfeService)),
Annotation(61031100L, gen.Constants.SERVER_SEND, Some(tfeService))
), Nil)

val monorailService = Endpoint(456, 8000, "monorail")
val clusterTwitterweb = Endpoint(123, -13145, "cluster_twitterweb_unicorn")

val monorail = Span(142224153997690008L, "following/index", 7899774722699781565L, Some(142224153997690008L), List(
Annotation(59501663L, gen.Constants.SERVER_RECV, Some(monorailService)),
Annotation(59934508L, gen.Constants.SERVER_SEND, Some(monorailService)),
Annotation(60499730L, gen.Constants.CLIENT_SEND, Some(clusterTwitterweb)),
Annotation(61030844L, gen.Constants.CLIENT_RECV, Some(clusterTwitterweb))
), Nil)

val tflockService = Endpoint(456, -14238, "tflock")
val flockdbEdgesService = Endpoint(789, 6915, "flockdb_edges")

val tflock = Span(142224153997690008L, "select", 6924056367845423617L, Some(7899774722699781565L), List(
Annotation(59541848L, gen.Constants.CLIENT_SEND, Some(tflockService)),
Annotation(59544889L, gen.Constants.CLIENT_RECV, Some(tflockService)),
Annotation(59541031L, gen.Constants.SERVER_RECV, Some(flockdbEdgesService)),
Annotation(59542894L, gen.Constants.SERVER_SEND, Some(flockdbEdgesService))
), Nil)

val flockService = Endpoint(2130706433, 0, "flock")

val flock = Span(142224153997690008L, "select", 7330066031642813936L, Some(6924056367845423617L), List(
Annotation(59541299L, gen.Constants.CLIENT_SEND, Some(flockService)),
Annotation(59542778L, gen.Constants.CLIENT_RECV, Some(flockService))
), Nil)

val trace = new Trace(Seq(monorail, tflock, tfe, flock))
val adjusted = adjuster.adjust(trace)

// let's see how we did
val adjustedFlock = adjusted.getSpanById(7330066031642813936L).get
val adjustedTflock = adjusted.getSpanById(6924056367845423617L).get
val flockCs = adjustedFlock.getAnnotation(gen.Constants.CLIENT_SEND).get
val tflockSr = adjustedTflock.getAnnotation(gen.Constants.SERVER_RECV).get

// tflock must receive the request before it send a request to flock
flockCs.timestamp must be_>(tflockSr.timestamp)
}
}
}

0 comments on commit 9914ff4

Please sign in to comment.