Skip to content

Commit

Permalink
Removed client name buisness from code
Browse files Browse the repository at this point in the history
Simplified stuff to remove the need to store client names in SpanServiceName

Author: @jerryli9876
Fixes #60
URL: #60
  • Loading branch information
jerryli9876 committed Jul 3, 2012
1 parent b8314e4 commit 8399069
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name ) }
.filter(0) { s : SpanServiceName => s.isSetParent_id() }
.mapTo(0 -> ('id, 'parent_id, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.service_name ) }

// TODO: account for possible differences between sent and received service names
val idName = PrepTsvSource()
.read
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.groupBy('service, 'name_1){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob
val spanInfo = PreprocessedSpanSource()
.read
.filter(0) { s : SpanServiceName => s.isSetParent_id() }
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service, 'annotations))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name, s.annotations.toList) }
.mapTo(0 -> ('id, 'parent_id, 'service, 'annotations))
{ s: SpanServiceName => (s.id, s.parent_id, s.service_name, s.annotations.toList) }
.flatMap('annotations -> 'duration) { al : List[Annotation] => {
var clientSend : Option[Annotation] = None
var clientReceive : Option[Annotation] = None
Expand All @@ -47,7 +47,7 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob
})
val clientDuration = for (cs <- clientSend; cr <- clientReceive) yield (cr.timestamp - cs.timestamp)
val serverDuration = for (sr <- serverReceive; ss <- serverSend) yield (ss.timestamp - sr.timestamp)
// to deal with the case where there is no server duration
// to deal with the case where there is no client duration
if (clientDuration == None) serverDuration else clientDuration
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
class MostCommonCalls(args : Args) extends Job(args) with DefaultDateRangeJob {
val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name) }
.mapTo(0 -> ('id, 'parent_id, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.service_name) }

val idName = PrepTsvSource()
.read

val result = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.groupBy('service, 'name_1){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import sources.{PrepTsvSource, PreprocessedSpanSource, Util}

class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {

// TODO: Support retry as well in a way that doesn't involve messing with the code
val ERROR_TYPE = List("finagle.timeout", "finagle.retry")

val input = args.required("error_type")
Expand All @@ -38,20 +37,19 @@ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {
// Preprocess the data into (trace_id, id, parent_id, annotations, client service name, service name)
val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'annotations, 'cService, 'service) )
{ s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.client_service, s.service_name) }
.mapTo(0 -> ('id, 'parent_id, 'annotations, 'service) )
{ s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.service_name) }


// Project to (id, service name)
// Project to (id, service name)
val idName = PrepTsvSource()
.read

// Left join with idName to find the parent's service name, if applicable
val result = spanInfo
.filter('annotations){annotations : List[Annotation] => annotations.exists({a : Annotation => a.value == input})}
.project('id, 'parent_id, 'cService, 'service)
.project('id, 'parent_id, 'service)
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.project('service, 'name_1)
.groupBy('service, 'name_1){ _.size('numTimeouts) }
.write(Tsv(args("output")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.twitter.zipkin.hadoop.sources

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Annotation, BinaryAnnotation, Span, SpanServiceName}
import com.twitter.zipkin.gen._
import scala.collection.JavaConverters._

/**
Expand All @@ -33,12 +33,12 @@ class FindNames(args : Args) extends Job(args) with DefaultDateRangeJob {
}

val findNames = preprocessed
.flatMap('annotations -> ('cService, 'service)) { Util.getClientAndServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'cService, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String, String) =>
.flatMap('annotations -> 'service) { Util.getServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations, cService, service) =>
new SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, cService, service).setParent_id(pid)
case (tid, name, id, pid, annotations, binary_annotations, service) =>
new SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, service).setParent_id(pid)
}
}.write(PreprocessedSpanSource())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.collection.JavaConverters._
*/
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = SpanSource()
// val preprocessed = FixedSpanSource("file.lzo")
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,17 @@ object Util {
}

/**
* Given a list of annotations, finds the client's name if present and the best possible service name by the
* same semantics as in getServiceName.
* Given a list of annotations, finds the best possible service name
* @param annotations a list of Annotations
* @return Some(client's service name, service name) if a service name exists, None otherwise
* @return Some(service name) if a service name exists, None otherwise
*/
def getClientAndServiceName(annotations : List[Annotation]) : Option[(String, String)] = {
def getServiceName(annotations : List[Annotation]) : Option[String] = {
var service: Option[Annotation] = None
var clientSend : Annotation = null
var hasServerRecv = false
annotations.foreach { a : Annotation =>
if ((Constants.CLIENT_SEND.equals(a.getValue) || Constants.CLIENT_RECV.equals(a.getValue))) {
if (a.getHost != null) {
if (!hasServerRecv) {
service = Some(a)
}
clientSend = a
if ((a.getHost != null) && !hasServerRecv) {
service = Some(a)
}
}
if ((Constants.SERVER_RECV.equals(a.getValue) || Constants.SERVER_SEND.equals(a.getValue))) {
Expand All @@ -78,7 +73,7 @@ object Util {
}
}
for (s <- service)
yield if (clientSend == null) (null, s.getHost.service_name) else (clientSend.getHost.service_name, s.getHost.service_name)
yield s.getHost.service_name
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ class CommonServiceCallsSpec extends Specification with TupleConversions {
val endpoint2 = new gen.Endpoint(123, 666, "service2")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint), new gen.Annotation(3000, "ss").setHost(endpoint), new gen.Annotation(4000, "cr").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava, "service", "service")
List[gen.BinaryAnnotation]().asJava, "service")
val span1 = new gen.SpanServiceName(123456, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(2000, "sr").setHost(endpoint2), new gen.Annotation(4000, "ss").setHost(endpoint2), new gen.Annotation(5000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")
val span2 = new gen.SpanServiceName(1234567, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")

val spans = (Util.repeatSpan(span, 30, 32, 1) ++ Util.repeatSpan(span1, 50, 100, 32))

Expand All @@ -58,14 +58,14 @@ class CommonServiceCallsSpec extends Specification with TupleConversions {
source(PrepTsvSource(), Util.getSpanIDtoNames(spans)).
sink[(String, String, Long)](Tsv("outputFile")) {
val result = new HashMap[String, Long]()
result("service, Unknown Service Name") = 0
result("service2, Unknown Service Name") = 0
result("service, null") = 0
result("service2, null") = 0
result("service2, service1") = 0
outputBuffer => outputBuffer foreach { e =>
result(e._1 + ", " + e._2) = e._3
}
result("service, Unknown Service Name") mustEqual 31
result("service2, Unknown Service Name") mustEqual 20
result("service, null") mustEqual 31
result("service2, null") mustEqual 20
result("service2, service") mustEqual 31
}
}.run.finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ class DependencyTreeSpec extends Specification with TupleConversions {
val endpoint2 = new gen.Endpoint(123, 666, "service2")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava, "service", "service")
List[gen.BinaryAnnotation]().asJava, "service")
val span1 = new gen.SpanServiceName(123456, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(4000, "sr").setHost(endpoint1)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1", "service1")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
val span2 = new gen.SpanServiceName(1234567, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")

val spans = Util.repeatSpan(span, 30, 40, 1) ++ Util.repeatSpan(span1, 50, 200, 40)

Expand All @@ -59,15 +59,15 @@ class DependencyTreeSpec extends Specification with TupleConversions {
.source(PrepTsvSource(), Util.getSpanIDtoNames(spans))
.sink[(String, String, Long)](Tsv("outputFile")) {
val map = new HashMap[String, Long]()
map("service, Unknown Service Name") = 0
map("service, null") = 0
map("service1, service") = 0
map("service1, Unknown Service Name") = 0
map("service1, null") = 0
outputBuffer => outputBuffer foreach { e =>
map(e._1 + ", " + e._2) = e._3
}
map("service, Unknown Service Name") mustEqual 31
map("service, null") mustEqual 31
map("service1, service") mustEqual 31
map("service1, Unknown Service Name") mustEqual 20
map("service1, null") mustEqual 20
}.run.finish
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class ExpensiveEndpointsSpec extends Specification with TupleConversions {
val endpoint2 = new gen.Endpoint(123, 666, "service2")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(2000, "sr").setHost(endpoint), new gen.Annotation(3000, "ss").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava, "service", "service")
List[gen.BinaryAnnotation]().asJava, "service")
val span1 = new gen.SpanServiceName(123456, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(1500, "sr").setHost(endpoint2), new gen.Annotation(4500, "ss").setHost(endpoint2), new gen.Annotation(5000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")

val spans = Util.repeatSpan(span, 30, 40, -1) ++ Util.repeatSpan(span1, 30, 100, 40)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class PopularKeysSpec extends Specification with TupleConversions {
val endpoint = new gen.Endpoint(123, 666, "service")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service", "service")
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service")
val span1 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service", "service")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service")


"PopularKeys" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ class TimeoutsSpec extends Specification with TupleConversions {
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint),
new gen.Annotation(2001, "finagle.timeout")).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service", "service")
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service")

val span1 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(2000, "sr").setHost(endpoint1)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1", "service1")
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")

val span2 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(2000, "sr").setHost(endpoint2),
new gen.Annotation(2001, "finagle.timeout")).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service2", "service2")
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service2")

val spans = Util.repeatSpan(span, 101, 120, 1) ::: Util.repeatSpan(span1, 20, 300, 102) ::: Util.repeatSpan(span2, 30, 400, 300)

Expand All @@ -66,15 +66,15 @@ class TimeoutsSpec extends Specification with TupleConversions {
source(PrepTsvSource(), Util.getSpanIDtoNames(spans)).
sink[(String, String, Long)](Tsv("outputFile")) {
val map = new HashMap[String, Long]()
map("service, Unknown Service Name") = 0
map("service, null") = 0
map("service2, service1") = 0
map("service2, Unknown Service Name") = 0
map("service2, null") = 0
outputBuffer => outputBuffer foreach { e =>
map(e._1 + ", " + e._2) = e._3
}
map("service, Unknown Service Name") mustEqual 102
map("service, null") mustEqual 102
map("service2, service1") mustEqual 21
map("service2, Unknown Service Name") mustEqual 10
map("service2, null") mustEqual 10
}.run.finish
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,16 @@ import scala.collection.JavaConverters._

class UtilSpec extends Specification {

"Util.getClientAndServiceName" should {
"Util.getServiceName" should {
"yield None if the list is empty" in {
val l : List[Annotation] = List()
Util.getClientAndServiceName(l) must be_==(None)
Util.getServiceName(l) must be_==(None)
}
"yield (null, service name) if there is no client side name" in {
val endpoint = new gen.Endpoint(123, 666, "service")
val l : List[Annotation] = List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "ss").setHost(endpoint))
Util.getClientAndServiceName(l) must be_==(Some(null, "service"))

}
"yield (client name, service name) if both are present" in {
"yield Some(service name) if present" in {
val endpoint = new gen.Endpoint(123, 666, "service")
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val l : List[Annotation] = List(new gen.Annotation(1000, "cr").setHost(endpoint), new gen.Annotation(2000, "ss").setHost(endpoint1))
Util.getClientAndServiceName(l) must be_==(Some("service", "service1"))
Util.getServiceName(l) must be_==(Some("service1"))
}
}

Expand All @@ -46,10 +40,10 @@ class UtilSpec extends Specification {
val endpoint = new gen.Endpoint(123, 666, "service")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service", "service").setParent_id(0)
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service").setParent_id(0)
val span1 = new gen.SpanServiceName(12345, "methodcall", 667,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service", "service").setParent_id(1)
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service").setParent_id(1)

Util.repeatSpan(span, 1, 666, 0) must beEqualTo(List((span, 666),(span1, 667)))
}
Expand All @@ -72,10 +66,10 @@ class UtilSpec extends Specification {
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service", "service").setParent_id(0)
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service").setParent_id(0)
val span1 = new gen.SpanServiceName(12345, "methodcall", 667,
List(new gen.Annotation(1000, "sr").setHost(endpoint1), new gen.Annotation(2000, "cr").setHost(endpoint1)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service1", "service1").setParent_id(1)
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service1").setParent_id(1)
Util.getSpanIDtoNames(List((span, 666), (span1, 667))) must beEqualTo(List((666, "service"), (667, "service1")))
}
}
Expand Down
Loading

0 comments on commit 8399069

Please sign in to comment.