Skip to content

Commit

Permalink
Run hadoop jobs
Browse files Browse the repository at this point in the history
Renamed job-runner and made Processor accept both popular annotations and
popular keys

Author: @jerryli9876
Fixes #64
URL: #64
  • Loading branch information
jerryli9876 committed Jul 6, 2012
1 parent 1807e47 commit 8eb2005
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 8 deletions.
40 changes: 37 additions & 3 deletions project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.io.File
object Zipkin extends Build {

lazy val zipkin = Project(id = "zipkin",
base = file(".")) aggregate(hadoop, test, thrift, server, common, scrooge, scribe)
base = file(".")) aggregate(hadoop, hadoopjobrunner, test, thrift, server, common, scrooge, scribe)

val proxyRepo = Option(System.getenv("SBT_PROXY_REPO"))
val travisCi = Option(System.getenv("SBT_TRAVIS_CI")) // for adding travis ci maven repos before others
Expand All @@ -26,8 +26,16 @@ object Zipkin extends Build {
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.twitter" % "scalding_2.9.1" % "0.5.3",
"com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0",

/*
FIXME ElephantBird 3.0.0 picks up libthrift 0.7.0, which is currently
incompatible with sbt-thrift so made these intransitive
*/
"com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0" intransitive(),
"com.twitter.elephantbird" % "elephant-bird-core" % "3.0.0" intransitive(),
"org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime",
"com.google.protobuf" % "protobuf-java" % "2.3.0",
"org.apache.thrift" % "libthrift" % "0.5.0",
"cascading" % "cascading-hadoop" % "2.0.0-wip-288",
/* Test dependencies */
"org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test"
),
Expand All @@ -37,7 +45,9 @@ object Zipkin extends Build {
"Concurrent Maven Repo" at "http://conjars.org/repo")
case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo
}),

mainClass in assembly := Some("com.twitter.scalding.Tool"),

ivyXML := // slim down the jar
<dependencies>
<exclude module="jms"/>
Expand All @@ -54,6 +64,30 @@ object Zipkin extends Build {
}
).dependsOn(thrift)

lazy val hadoopjobrunner = Project(
id = "zipkinhadoopjobrunner",
base = file("zipkin-hadoop-job-runner"),
settings = Project.defaultSettings ++
StandardProject.newSettings ++
assemblySettings ++
TravisCiRepos.newSettings).settings(

name := "zipkin-hadoop-job-runner",
version := "0.2.0-SNAPSHOT",
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime",
/* Test dependencies */
"org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test"
),

mergeStrategy in assembly := {
case inf if inf.startsWith("META-INF/") || inf.startsWith("project.clj") => MergeStrategy.discard
case _ => MergeStrategy.deduplicate
}
).dependsOn(thrift)


lazy val test = Project(
id = "zipkin-test",
base = file("zipkin-test"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import scala.collection.JavaConverters._
import org.apache.thrift.transport.{TFramedTransport, TSocket, TTransport, TTransportException}
import java.io.{FileNotFoundException, File}
import com.twitter.zipkin.gen
import java.net.SocketException
import java.util.{Arrays, Scanner}

/**
* Runs the PopularKeysClient on the input
*/
object ProcessPopularKeys {
def main(args : Array[String]) {
val c = new PopularKeysClient()
val portNumber = augmentString(args(2)).toInt
val isKeyData = augmentString(args(3)).toBoolean
c.start(args(0), args(1), portNumber, isKeyData)
}
}

/**
* Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
* typeahead functionality for annotations
*/
class PopularKeysClient {
/**
* Given a file name, the server name and port number, connects to the server, then writes to it
* the top 100 key values per service name given the data in filename
* @param filename
* @param serverName
* @param portNumber
*/
def start(filename : String, serverName : String, portNumber : Int, isKeyData : Boolean) {
var transport : TTransport = null
try {
// establish connection to the server
transport = new TFramedTransport(new TSocket(serverName, portNumber))
val protocol = new TBinaryProtocol(transport)
val client = new gen.ZipkinCollector.Client(protocol)
transport.open()
// Read file
val s = new Scanner(new File(filename))
var line : Scanner = new Scanner(s.nextLine())
if (!s.hasNextLine()) return
var oldService : String = line.next()
var keys : List[String] = List(line.next())
while (s.hasNextLine()) {
line = new Scanner(s.nextLine())
val currentString = line.next()
var value = ""
while (line.hasNext()) {
value += " " + line.next()
}
// Keep adding the keys to the current service's list until we are done with that service
if (oldService != currentString) {
// when we are, write that list to the server
if (isKeyData)
client.storeTopKeyValueAnnotations(oldService, keys.asJava)
else
client.storeTopAnnotations(oldService, keys.asJava)
println("Writing " + keys.toString + " to service " + oldService)
// and start processing the new one
keys = List(value)
oldService = currentString
} else {
keys = keys ::: List(value)
}
}
// Write the last service in the file and its keys as well
if (isKeyData)
client.storeTopKeyValueAnnotations(oldService, keys.asJava)
else
client.storeTopAnnotations(oldService, keys.asJava)
println("Writing " + keys.toString + " to service " + oldService)
} catch {
case se: SocketException => se.printStackTrace()
case tte : TTransportException => tte.printStackTrace()
case te : TException => te.printStackTrace()
case e : Exception => e.printStackTrace()
} finally {
if (transport != null)
transport.close()
}
}
}

31 changes: 31 additions & 0 deletions zipkin-hadoop-job-runner/src/scripts/find_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#Usage : find_data.sh hostname input_file_name server_name port_number is_key_data

HOSTNAME=$1
HDFSFILENAME=$2
SERVERNAME=$3
PORTNUMBER=$4
ISKEYDATA=$5

#Get the file location for the jar file with all the dependencies and moves it to where the job is being run
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
DIR="$(dirname "$DIR")"
DIR="$(dirname "$DIR")"
scp $DIR/target/zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar $HOSTNAME:.

#Reads the input into the server
ssh -C $HOSTNAME "java -cp zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar com.twitter.zipkin.hadoop.ProcessPopularKeys "$HDFSFILENAME" "$SERVERNAME" "$PORTNUMBER" "$ISKEYDATA
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.PreprocessedSpanSource
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}

/**
* Per service, find the 100 most common annotations used to annotate spans involving that service
*/
class PopularAnnotations(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PreprocessedSpanSource()
.read
.mapTo(0 -> ('service, 'annotations))
{ s: SpanServiceName => (s.service_name, s.annotations.toList) }


val result = preprocessed
.filter('annotations){ al : List[Annotation] => (al != null) && (al.size > 0) }
.flatMap('annotations -> 'value) { ba : List[Annotation] => ba.map{b: Annotation => b.value} }
.groupBy('service, 'value){ _.size('keyCount) }
// TODO Kinda hacky
.filter('keyCount) { count : Int => count > 1 }
.groupBy('service) { _.sortBy('keyCount).reverse.take(100) }
.discard('keyCount)
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob {
.flatMap('binary_annotations -> 'key) { ba : List[BinaryAnnotation] => ba.map{b: BinaryAnnotation => b.key} }
.groupBy('service, 'key){ _.size('keyCount) }
.groupBy('service) { _.sortBy('keyCount).reverse.take(100) }
.discard('keyCount)
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
import sources.{PreprocessedSpanSource, Util}
import scala.collection.JavaConverters._
import scala.collection.mutable._

/**
* Tests that PopularKeys finds the most popular keys per service
*/

class PopularAnnotationsSpec extends Specification with TupleConversions {

noDetailedDiffs()

implicit val dateRange = DateRange(RichDate(123), RichDate(321))

val endpoint = new gen.Endpoint(123, 666, "service")
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service")
val span1 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
val span2 = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")


"PopularAnnotations" should {
"return a map with correct entries for each service" in {
JobTest("com.twitter.zipkin.hadoop.PopularAnnotations").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(PreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0) ::: Util.repeatSpan(span2, 10, 500, 0)).
sink[(String, String)](Tsv("outputFile")) {
val map = new HashMap[String, List[String]]()
map("service") = Nil
map("service1") = Nil
outputBuffer => outputBuffer foreach { e =>
map(e._1) ::= e._2
}
map("service") mustEqual List("sr")
map("service1") mustEqual List("sr", "cr")
}.run.finish
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ class PopularKeysSpec extends Specification with TupleConversions {
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(PreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0)).
sink[(String, String, Int)](Tsv("outputFile")) {
val map = new HashMap[String, Int]()
sink[(String, String)](Tsv("outputFile")) {
val map = new HashMap[String, List[String]]()
map("service") = Nil
outputBuffer => outputBuffer foreach { e =>
map(e._1 + e._2) = e._3
map(e._1) ::= e._2
}
map("servicebye") mustEqual 51
map("servicehi") mustEqual 102
map("service") mustEqual List("bye", "hi")
}.run.finish
}

Expand Down

0 comments on commit 8eb2005

Please sign in to comment.