From 3b98bd7d2cd1b3a32a6c217f12df297897c04f2d Mon Sep 17 00:00:00 2001 From: Jerry Li Date: Mon, 2 Jul 2012 10:09:58 -0700 Subject: [PATCH 1/2] Added ExpensiveEndpoints and test files --- .../zipkin/hadoop/ExpensiveEndpoints.scala | 64 +++++++++++++++ .../twitter/zipkin/hadoop/sources/Util.scala | 2 +- .../hadoop/ExpensiveEndpointsSpec.scala | 77 +++++++++++++++++++ 3 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala create mode 100644 zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala new file mode 100644 index 00000000000..1c765ff6f90 --- /dev/null +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala @@ -0,0 +1,64 @@ +/* +* Copyright 2012 Twitter Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.twitter.zipkin.hadoop + +import com.twitter.zipkin.gen.{Constants, SpanServiceName, Annotation} +import cascading.pipe.joiner.LeftJoin +import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args} +import sources.{PrepTsvSource, Util, PreprocessedSpanSource} + +/** + * Per service call (i.e. pair of services), finds the average run time (in microseconds) of that service call + */ +class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob { + + val spanInfo = PreprocessedSpanSource() + .read + .mapTo(0 -> ('id, 'parent_id, 'cService, 'service, 'annotations)) + { s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name, s.annotations.toList) } + .flatMap('annotations -> 'duration) { + al : List[Annotation] => { + var clientSend : Option[Annotation] = None + var clientReceive : Option[Annotation] = None + var serverReceive : Option[Annotation] = None + var serverSend : Option[Annotation] = None + al.foreach( { + a : Annotation => { + if (a.getHost != null) { + if (Constants.CLIENT_SEND.equals(a.value)) clientSend = Some(a) + else if (Constants.CLIENT_RECV.equals(a.value)) clientReceive = Some(a) + else if (Constants.SERVER_RECV.equals(a.value)) serverReceive = Some(a) + else if (Constants.SERVER_SEND.equals(a.value)) serverSend = Some(a) + } + } + }) + val clientDuration = for (cs <- clientSend; cr <- clientReceive) yield (cr.timestamp - cs.timestamp) + val serverDuration = for (sr <- serverReceive; ss <- serverSend) yield (ss.timestamp - sr.timestamp) + // to deal with the case where there is no server duration + if (clientDuration == None) serverDuration else clientDuration + } + } + + val idName = PrepTsvSource() + .read + /* Join with the original on parent ID to get the parent's service name */ + val spanInfoWithParent = spanInfo + .filter('parent_id){ id : Long => id != 0 } + .joinWithSmaller('parent_id -> 'id_1, idName) + .groupBy('name_1, 'service){ _.average('duration) } + .write(Tsv(args("output"))) +} diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala index 6b013a3f844..7ea8f33ff90 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala @@ -121,7 +121,7 @@ object Util { * second element is the span ID of the copy. */ def repeatSpan(span: gen.Span, count: Int, offset : Int, parentOffset : Int): List[(gen.Span, Int)] = { - ((0 to count).toSeq map { i: Int => span.deepCopy().setId(i + offset).setParent_id(i + parentOffset) -> (i + offset)}).toList + ((0 to count).toSeq map { i: Int => span.deepCopy().setId(i + offset).setParent_id(if (parentOffset == -1) 0 else i + parentOffset) -> (i + offset)}).toList } /** diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala new file mode 100644 index 00000000000..08620647d1d --- /dev/null +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/ExpensiveEndpointsSpec.scala @@ -0,0 +1,77 @@ +/* +* Copyright 2012 Twitter Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.twitter.zipkin.hadoop + +import org.specs.Specification +import com.twitter.zipkin.gen +import com.twitter.scalding._ +import com.twitter.zipkin.gen +import gen.AnnotationType +import scala.collection.JavaConverters._ +import collection.mutable.HashMap +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.DateRange +import com.twitter.scalding.RichDate +import com.twitter.zipkin.gen.AnnotationType +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import sources.{PrepTsvSource, PreprocessedSpanSource, Util} + +/** +* Tests that ExpensiveEndpointSpec finds the average run time of each service +*/ + +class ExpensiveEndpointsSpec extends Specification with TupleConversions { + noDetailedDiffs() + + implicit val dateRange = DateRange(RichDate(123), RichDate(321)) + + val endpoint = new gen.Endpoint(123, 666, "service") + val endpoint1 = new gen.Endpoint(123, 666, "service1") + val endpoint2 = new gen.Endpoint(123, 666, "service2") + val span = new gen.SpanServiceName(12345, "methodcall", 666, + List(new gen.Annotation(2000, "sr").setHost(endpoint), new gen.Annotation(3000, "ss").setHost(endpoint)).asJava, + List[gen.BinaryAnnotation]().asJava, "service", "service") + val span1 = new gen.SpanServiceName(123456, "methodcall", 666, + List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(1500, "sr").setHost(endpoint2), new gen.Annotation(4500, "ss").setHost(endpoint2), new gen.Annotation(5000, "cr").setHost(endpoint2)).asJava, + List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2") + + val spans = Util.repeatSpan(span, 30, 40, -1) ++ Util.repeatSpan(span1, 30, 100, 40) + + "ExpensiveEndpoints" should { + "Return the most common service calls" in { + JobTest("com.twitter.zipkin.hadoop.ExpensiveEndpoints"). + arg("input", "inputFile"). + arg("output", "outputFile"). + arg("date", "2012-01-01T01:00"). + source(PreprocessedSpanSource(), spans). + source(PrepTsvSource(), Util.getSpanIDtoNames(spans)). + sink[(String, String, Long)](Tsv("outputFile")) { + val result = new HashMap[String, Long]() + result("service, service2") = 0 + outputBuffer => outputBuffer foreach { e => + println(e) + result(e._1 + ", " + e._2) = e._3 + } +// result("Unknown Service Name") mustEqual 3000 +// result("service") mustEqual 2000 +// result("service2") mustEqual 3000 + result("service, service2") mustEqual 4000 + } + }.run.finish + } +} From da0963847ad396c260115f586315af97e199d57d Mon Sep 17 00:00:00 2001 From: Jerry Li Date: Mon, 2 Jul 2012 13:25:34 -0700 Subject: [PATCH 2/2] Minor changes to ExpensiveEndpoints.scala --- .../com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala index 1c765ff6f90..04f5213046d 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala @@ -28,16 +28,15 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob val spanInfo = PreprocessedSpanSource() .read + .filter(0) { s : SpanServiceName => s.isSetParent_id() } .mapTo(0 -> ('id, 'parent_id, 'cService, 'service, 'annotations)) { s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name, s.annotations.toList) } - .flatMap('annotations -> 'duration) { - al : List[Annotation] => { + .flatMap('annotations -> 'duration) { al : List[Annotation] => { var clientSend : Option[Annotation] = None var clientReceive : Option[Annotation] = None var serverReceive : Option[Annotation] = None var serverSend : Option[Annotation] = None - al.foreach( { - a : Annotation => { + al.foreach( { a : Annotation => { if (a.getHost != null) { if (Constants.CLIENT_SEND.equals(a.value)) clientSend = Some(a) else if (Constants.CLIENT_RECV.equals(a.value)) clientReceive = Some(a) @@ -57,7 +56,6 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob .read /* Join with the original on parent ID to get the parent's service name */ val spanInfoWithParent = spanInfo - .filter('parent_id){ id : Long => id != 0 } .joinWithSmaller('parent_id -> 'id_1, idName) .groupBy('name_1, 'service){ _.average('duration) } .write(Tsv(args("output")))