Skip to content

Commit

Permalink
Two simple tests for the ServerResponsetime Hadoop jobs
Browse files Browse the repository at this point in the history
Author: @johanoskarsson
Pull Request: #28
URL: #28
  • Loading branch information
johanoskarsson committed Jun 14, 2012
1 parent 73105fd commit 741c3c9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import cascading.scheme.local.{TextLine => CLTextLine}
import com.twitter.elephantbird.cascading2.scheme.LzoThriftScheme
import cascading.scheme.Scheme
import cascading.flow.FlowProcess
import com.twitter.scalding.{DateOps, TimePathedSource, DateRange, Mappable}
import com.twitter.zipkin.gen.Span
import org.apache.hadoop.mapred.{JobConf, RecordReader, OutputCollector}
import com.twitter.scalding._

// Scala is pickier than Java about type parameters, and Cascading's Scheme
// declaration leaves some type parameters underspecified. Fill in the type
Expand All @@ -43,9 +43,8 @@ abstract class HourlySuffixSource(prefixTemplate : String, dateRange : DateRange
trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] {
def column: Class[_]

override def localScheme = {
println("This does not work yet"); new CLTextLine
}
// TODO this won't actually work locally, but we need something here for the tests
override def localScheme = new CLTextLine()

override def hdfsScheme = HadoopSchemeInstance(new LzoThriftScheme[T](column))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.twitter.zipkin.hadoop

import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import sources.SpanSource
import scala.collection.JavaConverters._

class ServerResponsetimeSpec extends Specification with TupleConversions {
noDetailedDiffs()

implicit val dateRange = DateRange(RichDate(123), RichDate(321))

val endpoint = new gen.Endpoint(123, 666, "service")
val span = new gen.Span(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "ss").setHost(endpoint)).asJava,
List[gen.BinaryAnnotation]().asJava)


"ServerResponsetime" should {
"have no output if input is < 100 entries" in {
JobTest("com.twitter.zipkin.hadoop.ServerResponsetime").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(SpanSource(), List(span -> 0)).
sink[(String, Int)](Tsv("outputFile")) {
outputBuffer => outputBuffer.toMap mustEqual Map()
}.run.finish
}
"return one entry with avg 1 ms" in {
JobTest("com.twitter.zipkin.hadoop.ServerResponsetime").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(SpanSource(), repeatSpan(span, 101)).
sink[(String, String, Double, Double, Double)](Tsv("outputFile")) {
outputBuffer => outputBuffer foreach { e =>
e mustEqual ("0.0.0.123", "service", 102d, 1d, 0d)
}
}.run.finish
}

}

def repeatSpan(span: gen.Span, count: Int): List[(gen.Span, Int)] = {
((0 to count).toSeq map { i: Int => span.deepCopy().setId(i) -> i }).toList
}
}

0 comments on commit 741c3c9

Please sign in to comment.