Skip to content

Commit

Permalink
Added WhaleReport and spec
Browse files Browse the repository at this point in the history
WhaleReport finds all traces w/ 500 Internal Server Errors and finds all spans
in those traces with retries and/or timeouts

Author: @jerryli9876
Fixes #71
URL: #71
  • Loading branch information
jerryli9876 committed Jul 19, 2012
1 parent 857bf2e commit 2031a53
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import cascading.pipe.joiner._
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}
import sources.{PrepTsvSource, PreprocessedSpanSourceTest, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSourceTest, PreprocessedSpanSource, Util}

/**
* Find out how often services call each other throughout the entire system
*/

class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.filter(0) { s : SpanServiceName => s.isSetParent_id() }
Expand All @@ -40,5 +39,6 @@ class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.groupBy('service, 'name_1){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.twitter.zipkin.hadoop
import com.twitter.zipkin.gen.{Constants, SpanServiceName, Annotation}
import cascading.pipe.joiner.LeftJoin
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import sources.{PrepTsvSource, Util, PreprocessedSpanSource}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, Util, PreprocessedSpanSource}

/**
* Per service call (i.e. pair of services), finds the average run time (in microseconds) of that service call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
import sources.{PrepNoNamesSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource, Util}

/**
* Find out how often each service does memcache accesses
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.PreprocessedSpanSource
import com.twitter.zipkin.hadoop.sources.PreprocessedSpanSource
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.SpanSource
import com.twitter.zipkin.hadoop.sources.SpanSource
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import scala.collection.JavaConverters._
import java.nio.ByteBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Find which services timeout the most
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation}
import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.hadoop.sources.{Util, PreprocessedSpanSource}
import java.nio.ByteBuffer


/**
* Finds traces that have 500 Internal Service Errors and finds the spans in those traces that have retries or timeouts
*/

class WhaleReport(args: Args) extends Job(args) with DefaultDateRangeJob {

val ERRORS = List("finagle.timeout", "finagle.retry")

val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 ->('trace_id, 'id, 'service, 'annotations, 'binary_annotations))
{ s: SpanServiceName => (s.trace_id, s.id, s.service_name, s.annotations.toList, s.binary_annotations.toList)
}

val errorTraces = spanInfo
.project('trace_id, 'binary_annotations)
.filter('binary_annotations) {
bal: List[BinaryAnnotation] =>
bal.exists({ ba: BinaryAnnotation => {
ba != null && ba.value != null && cleanString(ba.value) == WhaleReport.ERROR_MESSAGE
}
})
}
.project('trace_id)
.rename('trace_id -> 'trace_id_1)

val filtered = spanInfo
.flatMap('annotations -> 'error) { al : List[Annotation] => { al.find { a : Annotation => ERRORS.contains(a.value) } } }
.joinWithSmaller('trace_id -> 'trace_id_1, errorTraces)
.discard('trace_id_1)
.groupBy('trace_id) { _.toList[String]('service -> 'serviceList) }
.write(Tsv(args("output")))

// When converting from ByteBuffer to String some null values seem to be passed along, so we clean them
private def cleanString(bb : ByteBuffer) : String = {
val chars = (new String(Util.getArrayFromBuffer(bb))).toCharArray
var result = ""
for (char <- chars) {
if (char.asInstanceOf[Int] != 0) {
result += char
}
}
result
}
}

object WhaleReport {
val ERROR_MESSAGE = "500 Internal Server Error"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import sources.{PrepNoNamesSpanSource}
import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
Expand Down
1 change: 0 additions & 1 deletion zipkin-hadoop/src/scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,5 @@ $DIR/run_job.sh -j PopularKeys -d $ENDTIME -o $OUTPUT/PopularKeys &
$DIR/run_job.sh -j PopularAnnotations -d $ENDTIME -o $OUTPUT/PopularAnnotations &
$DIR/run_job.sh -j FindIDtoName -p -d $ENDTIME
$DIR/run_job.sh -j DependencyTree -d $ENDTIME -o $OUTPUT/DependencyTree &
$DIR/run_job.sh -j MostCommonCalls -d $ENDTIME -o $OUTPUT/MostCommonCalls &
$DIR/run_job.sh -j Timeouts -s "--error_type finagle.timeout" -o $OUTPUT/Timeouts -d $ENDTIME &
$DIR/run_job.sh -j Timeouts -s "--error_type finagle.retry" -o $OUTPUT/Retries -d $ENDTIME &
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.scalding._
import gen.AnnotationType
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Tests that DependencyTree finds all service calls and how often per pair
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.twitter.scalding.RichDate
import com.twitter.zipkin.gen.AnnotationType
import com.twitter.scalding.JobTest
import com.twitter.scalding.Tsv
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Tests that ExpensiveEndpointSpec finds the average run time of each service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import sources.{PrepNoNamesSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource, Util}
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import java.nio.ByteBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, Util}
import scala.collection.JavaConverters._
import scala.collection.mutable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PreprocessedSpanSource, Util}
import scala.collection.JavaConverters._
import scala.collection.mutable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ package com.twitter.zipkin.hadoop
import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import sources.{SpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{SpanSource, Util}
import scala.collection.JavaConverters._

class ServerResponsetimeSpec extends Specification with TupleConversions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.twitter.scalding._
import gen.AnnotationType
import scala.collection.JavaConverters._
import scala.collection.mutable._
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Tests that Timeouts finds the service calls where timeouts occur and how often
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.twitter.zipkin.hadoop

import org.specs.Specification
import sources.Util
import com.twitter.zipkin.hadoop.sources.Util
import com.twitter.zipkin.gen
import gen.{AnnotationType, Annotation}
import com.twitter.zipkin.gen.{AnnotationType, Annotation}
import scala.collection.JavaConverters._

class UtilSpec extends Specification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,63 @@ import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import collection.mutable.{HashMap, HashSet}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}
import java.nio.ByteBuffer
import java.util.Arrays

/**
* Tests that MostCommonCalls finds the most commonly called services per service
*/
* Tests that WhaleReport finds traces with 500 Internal Service Errors and finds the spans in those traces with finagle.retry or finagle.timeouts.
*/

class CommonServiceCallsSpec extends Specification with TupleConversions {
class WhaleReportSpec extends Specification with TupleConversions {
noDetailedDiffs()

implicit val dateRange = DateRange(RichDate(123), RichDate(321))

val buf = ByteBuffer.allocate(100);

// Create a character ByteBuffer
val cbuf = buf.asCharBuffer();

// Write a string
cbuf.put(WhaleReport.ERROR_MESSAGE);

val endpoint = new gen.Endpoint(123, 666, "service")
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val endpoint2 = new gen.Endpoint(123, 666, "service2")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint), new gen.Annotation(3000, "ss").setHost(endpoint), new gen.Annotation(4000, "cr").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava, "service")
val span1 = new gen.SpanServiceName(123456, "methodcall", 666,
List(new gen.Annotation(1000, "finagle.timeout").setHost(endpoint), new gen.Annotation(1001, "sr").setHost(endpoint), new gen.Annotation(1002, "ss").setHost(endpoint), new gen.Annotation(1003, "cr").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]( new gen.BinaryAnnotation("http.responsecode", buf, AnnotationType.BOOL ) ).asJava, "service")
val span1 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(2000, "sr").setHost(endpoint2), new gen.Annotation(4000, "ss").setHost(endpoint2), new gen.Annotation(5000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")
val span2 = new gen.SpanServiceName(1234567, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
val span2 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "finagle.retry").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")

val spans = (Util.repeatSpan(span, 30, 32, 1) ++ Util.repeatSpan(span1, 50, 100, 32))
val spans = (Util.repeatSpan(span, 0, 32, 1) ++ Util.repeatSpan(span1, 0, 100, 32) ++ Util.repeatSpan(span2, 0, 200, 100))

"MostCommonCalls" should {
"Return the most common service calls" in {
JobTest("com.twitter.zipkin.hadoop.MostCommonCalls").
"WhaleReport" should {
"Return fail whales!" in {
JobTest("com.twitter.zipkin.hadoop.WhaleReport").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(PreprocessedSpanSource(), spans).
source(PrepTsvSource(), Util.getSpanIDtoNames(spans)).
sink[(String, String, Long)](Tsv("outputFile")) {
val result = new HashMap[String, Long]()
result("service, null") = 0
result("service2, null") = 0
result("service2, service1") = 0
sink[(Long, List[String])](Tsv("outputFile")) {
var result = new HashSet[String]()
var actual = new HashSet[String]()
result += "service"
result += "service2"
outputBuffer => outputBuffer foreach { e =>
result(e._1 + ", " + e._2) = e._3
e._1 mustEqual 12345
for (name <- e._2)
actual += name
}
result("service, null") mustEqual 31
result("service2, null") mustEqual 20
result("service2, service") mustEqual 31
}
actual mustEqual result
}.run.finish
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.scalding._
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import com.twitter.zipkin.gen.AnnotationType
import sources.{Util, PrepNoNamesSpanSource}
import com.twitter.zipkin.hadoop.sources.{Util, PrepNoNamesSpanSource}

/**
* Tests that WorstRuntimes finds the spans which take the longest to run
Expand Down

0 comments on commit 2031a53

Please sign in to comment.