From 741c3c9a2499b19378e937f3de5e8f521c866fe7 Mon Sep 17 00:00:00 2001 From: Johan Oskarsson Date: Thu, 14 Jun 2012 13:34:11 -0700 Subject: [PATCH] Two simple tests for the ServerResponsetime Hadoop jobs Author: @johanoskarsson Pull Request: #28 URL: https://github.com/twitter/zipkin/pull/28 --- .../zipkin/hadoop/sources/SpanSource.scala | 7 ++- .../hadoop/ServerResponsetimeSpec.scala | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 zipkin-hadoop/src/main/test/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala index 692af575156..606516319b5 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/SpanSource.scala @@ -20,9 +20,9 @@ import cascading.scheme.local.{TextLine => CLTextLine} import com.twitter.elephantbird.cascading2.scheme.LzoThriftScheme import cascading.scheme.Scheme import cascading.flow.FlowProcess -import com.twitter.scalding.{DateOps, TimePathedSource, DateRange, Mappable} import com.twitter.zipkin.gen.Span import org.apache.hadoop.mapred.{JobConf, RecordReader, OutputCollector} +import com.twitter.scalding._ // Scala is pickier than Java about type parameters, and Cascading's Scheme // declaration leaves some type parameters underspecified. Fill in the type @@ -43,9 +43,8 @@ abstract class HourlySuffixSource(prefixTemplate : String, dateRange : DateRange trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] { def column: Class[_] - override def localScheme = { - println("This does not work yet"); new CLTextLine - } + // TODO this won't actually work locally, but we need something here for the tests + override def localScheme = new CLTextLine() override def hdfsScheme = HadoopSchemeInstance(new LzoThriftScheme[T](column)) } diff --git a/zipkin-hadoop/src/main/test/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala b/zipkin-hadoop/src/main/test/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala new file mode 100644 index 00000000000..02e673ca6d3 --- /dev/null +++ b/zipkin-hadoop/src/main/test/com/twitter/zipkin/hadoop/ServerResponsetimeSpec.scala @@ -0,0 +1,49 @@ +package com.twitter.zipkin.hadoop + +import org.specs.Specification +import com.twitter.zipkin.gen +import com.twitter.scalding._ +import sources.SpanSource +import scala.collection.JavaConverters._ + +class ServerResponsetimeSpec extends Specification with TupleConversions { + noDetailedDiffs() + + implicit val dateRange = DateRange(RichDate(123), RichDate(321)) + + val endpoint = new gen.Endpoint(123, 666, "service") + val span = new gen.Span(12345, "methodcall", 666, + List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "ss").setHost(endpoint)).asJava, + List[gen.BinaryAnnotation]().asJava) + + + "ServerResponsetime" should { + "have no output if input is < 100 entries" in { + JobTest("com.twitter.zipkin.hadoop.ServerResponsetime"). + arg("input", "inputFile"). + arg("output", "outputFile"). + arg("date", "2012-01-01T01:00"). + source(SpanSource(), List(span -> 0)). + sink[(String, Int)](Tsv("outputFile")) { + outputBuffer => outputBuffer.toMap mustEqual Map() + }.run.finish + } + "return one entry with avg 1 ms" in { + JobTest("com.twitter.zipkin.hadoop.ServerResponsetime"). + arg("input", "inputFile"). + arg("output", "outputFile"). + arg("date", "2012-01-01T01:00"). + source(SpanSource(), repeatSpan(span, 101)). + sink[(String, String, Double, Double, Double)](Tsv("outputFile")) { + outputBuffer => outputBuffer foreach { e => + e mustEqual ("0.0.0.123", "service", 102d, 1d, 0d) + } + }.run.finish + } + + } + + def repeatSpan(span: gen.Span, count: Int): List[(gen.Span, Int)] = { + ((0 to count).toSeq map { i: Int => span.deepCopy().setId(i) -> i }).toList + } +}