diff --git a/project/Project.scala b/project/Project.scala index 7973d7af8dd..01f264f0d72 100644 --- a/project/Project.scala +++ b/project/Project.scala @@ -8,7 +8,7 @@ import java.io.File object Zipkin extends Build { lazy val zipkin = Project(id = "zipkin", - base = file(".")) aggregate(hadoop, test, thrift, server, common, scrooge, scribe) + base = file(".")) aggregate(hadoop, hadoopjobrunner, test, thrift, server, common, scrooge, scribe) val proxyRepo = Option(System.getenv("SBT_PROXY_REPO")) val travisCi = Option(System.getenv("SBT_TRAVIS_CI")) // for adding travis ci maven repos before others @@ -26,8 +26,16 @@ object Zipkin extends Build { parallelExecution in Test := false, libraryDependencies ++= Seq( "com.twitter" % "scalding_2.9.1" % "0.5.3", - "com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0", - + /* + FIXME ElephantBird 3.0.0 picks up libthrift 0.7.0, which is currently + incompatible with sbt-thrift so made these intransitive + */ + "com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0" intransitive(), + "com.twitter.elephantbird" % "elephant-bird-core" % "3.0.0" intransitive(), + "org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime", + "com.google.protobuf" % "protobuf-java" % "2.3.0", + "org.apache.thrift" % "libthrift" % "0.5.0", + "cascading" % "cascading-hadoop" % "2.0.0-wip-288", /* Test dependencies */ "org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test" ), @@ -37,7 +45,9 @@ object Zipkin extends Build { "Concurrent Maven Repo" at "http://conjars.org/repo") case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo }), + mainClass in assembly := Some("com.twitter.scalding.Tool"), + ivyXML := // slim down the jar @@ -54,6 +64,30 @@ object Zipkin extends Build { } ).dependsOn(thrift) + lazy val hadoopjobrunner = Project( + id = "zipkinhadoopjobrunner", + base = file("zipkin-hadoop-job-runner"), + settings = Project.defaultSettings ++ + StandardProject.newSettings ++ + assemblySettings ++ + TravisCiRepos.newSettings).settings( + + name := "zipkin-hadoop-job-runner", + version := "0.2.0-SNAPSHOT", + parallelExecution in Test := false, + libraryDependencies ++= Seq( + "org.slf4j" % "slf4j-log4j12" % "1.6.4" % "runtime", + /* Test dependencies */ + "org.scala-tools.testing" % "specs_2.9.1" % "1.6.9" % "test" + ), + + mergeStrategy in assembly := { + case inf if inf.startsWith("META-INF/") || inf.startsWith("project.clj") => MergeStrategy.discard + case _ => MergeStrategy.deduplicate + } + ).dependsOn(thrift) + + lazy val test = Project( id = "zipkin-test", base = file("zipkin-test"), diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala new file mode 100644 index 00000000000..a2c8ad14dd3 --- /dev/null +++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala @@ -0,0 +1,105 @@ +/* +* Copyright 2012 Twitter Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.twitter.zipkin.hadoop + +import org.apache.thrift.TException +import org.apache.thrift.protocol.TBinaryProtocol +import scala.collection.JavaConverters._ +import org.apache.thrift.transport.{TFramedTransport, TSocket, TTransport, TTransportException} +import java.io.{FileNotFoundException, File} +import com.twitter.zipkin.gen +import java.net.SocketException +import java.util.{Arrays, Scanner} + +/** + * Runs the PopularKeysClient on the input + */ +object ProcessPopularKeys { + def main(args : Array[String]) { + val c = new PopularKeysClient() + val portNumber = augmentString(args(2)).toInt + val isKeyData = augmentString(args(3)).toBoolean + c.start(args(0), args(1), portNumber, isKeyData) + } +} + +/** + * Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the + * typeahead functionality for annotations + */ +class PopularKeysClient { + /** + * Given a file name, the server name and port number, connects to the server, then writes to it + * the top 100 key values per service name given the data in filename + * @param filename + * @param serverName + * @param portNumber + */ + def start(filename : String, serverName : String, portNumber : Int, isKeyData : Boolean) { + var transport : TTransport = null + try { + // establish connection to the server + transport = new TFramedTransport(new TSocket(serverName, portNumber)) + val protocol = new TBinaryProtocol(transport) + val client = new gen.ZipkinCollector.Client(protocol) + transport.open() + // Read file + val s = new Scanner(new File(filename)) + var line : Scanner = new Scanner(s.nextLine()) + if (!s.hasNextLine()) return + var oldService : String = line.next() + var keys : List[String] = List(line.next()) + while (s.hasNextLine()) { + line = new Scanner(s.nextLine()) + val currentString = line.next() + var value = "" + while (line.hasNext()) { + value += " " + line.next() + } + // Keep adding the keys to the current service's list until we are done with that service + if (oldService != currentString) { + // when we are, write that list to the server + if (isKeyData) + client.storeTopKeyValueAnnotations(oldService, keys.asJava) + else + client.storeTopAnnotations(oldService, keys.asJava) + println("Writing " + keys.toString + " to service " + oldService) + // and start processing the new one + keys = List(value) + oldService = currentString + } else { + keys = keys ::: List(value) + } + } + // Write the last service in the file and its keys as well + if (isKeyData) + client.storeTopKeyValueAnnotations(oldService, keys.asJava) + else + client.storeTopAnnotations(oldService, keys.asJava) + println("Writing " + keys.toString + " to service " + oldService) + } catch { + case se: SocketException => se.printStackTrace() + case tte : TTransportException => tte.printStackTrace() + case te : TException => te.printStackTrace() + case e : Exception => e.printStackTrace() + } finally { + if (transport != null) + transport.close() + } + } +} + diff --git a/zipkin-hadoop-job-runner/src/scripts/find_data.sh b/zipkin-hadoop-job-runner/src/scripts/find_data.sh new file mode 100755 index 00000000000..2d169ab4472 --- /dev/null +++ b/zipkin-hadoop-job-runner/src/scripts/find_data.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# Copyright 2012 Twitter Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#Usage : find_data.sh hostname input_file_name server_name port_number is_key_data + +HOSTNAME=$1 +HDFSFILENAME=$2 +SERVERNAME=$3 +PORTNUMBER=$4 +ISKEYDATA=$5 + +#Get the file location for the jar file with all the dependencies and moves it to where the job is being run +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +DIR="$(dirname "$DIR")" +DIR="$(dirname "$DIR")" +scp $DIR/target/zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar $HOSTNAME:. + +#Reads the input into the server +ssh -C $HOSTNAME "java -cp zipkin-hadoop-job-runner-assembly-0.2.0-SNAPSHOT.jar com.twitter.zipkin.hadoop.ProcessPopularKeys "$HDFSFILENAME" "$SERVERNAME" "$PORTNUMBER" "$ISKEYDATA \ No newline at end of file diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala new file mode 100644 index 00000000000..fbdd9a027fa --- /dev/null +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularAnnotations.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.zipkin.hadoop + + +import com.twitter.scalding._ +import sources.PreprocessedSpanSource +import com.twitter.zipkin.gen.{SpanServiceName, Annotation} + +/** + * Per service, find the 100 most common annotations used to annotate spans involving that service + */ +class PopularAnnotations(args : Args) extends Job(args) with DefaultDateRangeJob { + + val preprocessed = PreprocessedSpanSource() + .read + .mapTo(0 -> ('service, 'annotations)) + { s: SpanServiceName => (s.service_name, s.annotations.toList) } + + + val result = preprocessed + .filter('annotations){ al : List[Annotation] => (al != null) && (al.size > 0) } + .flatMap('annotations -> 'value) { ba : List[Annotation] => ba.map{b: Annotation => b.value} } + .groupBy('service, 'value){ _.size('keyCount) } + // TODO Kinda hacky + .filter('keyCount) { count : Int => count > 1 } + .groupBy('service) { _.sortBy('keyCount).reverse.take(100) } + .discard('keyCount) + .write(Tsv(args("output"))) + +} diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala index 117426bfea6..b982112eaa2 100644 --- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala +++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/PopularKeys.scala @@ -37,6 +37,7 @@ class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob { .flatMap('binary_annotations -> 'key) { ba : List[BinaryAnnotation] => ba.map{b: BinaryAnnotation => b.key} } .groupBy('service, 'key){ _.size('keyCount) } .groupBy('service) { _.sortBy('keyCount).reverse.take(100) } + .discard('keyCount) .write(Tsv(args("output"))) } diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala new file mode 100644 index 00000000000..2ee1c15fa1b --- /dev/null +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularAnnotationsSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.twitter.zipkin.hadoop + +import org.specs.Specification +import com.twitter.zipkin.gen +import com.twitter.scalding._ +import gen.AnnotationType +import sources.{PreprocessedSpanSource, Util} +import scala.collection.JavaConverters._ +import scala.collection.mutable._ + +/** + * Tests that PopularKeys finds the most popular keys per service + */ + +class PopularAnnotationsSpec extends Specification with TupleConversions { + + noDetailedDiffs() + + implicit val dateRange = DateRange(RichDate(123), RichDate(321)) + + val endpoint = new gen.Endpoint(123, 666, "service") + val endpoint1 = new gen.Endpoint(123, 666, "service1") + val span = new gen.SpanServiceName(12345, "methodcall", 666, + List(new gen.Annotation(1000, "sr").setHost(endpoint)).asJava, + List(new gen.BinaryAnnotation("hi", null, AnnotationType.BOOL)).asJava, "service") + val span1 = new gen.SpanServiceName(12345, "methodcall", 666, + List(new gen.Annotation(1000, "sr").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava, + List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1") + val span2 = new gen.SpanServiceName(12345, "methodcall", 666, + List(new gen.Annotation(2000, "cr").setHost(endpoint)).asJava, + List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1") + + + "PopularAnnotations" should { + "return a map with correct entries for each service" in { + JobTest("com.twitter.zipkin.hadoop.PopularAnnotations"). + arg("input", "inputFile"). + arg("output", "outputFile"). + arg("date", "2012-01-01T01:00"). + source(PreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0) ::: Util.repeatSpan(span2, 10, 500, 0)). + sink[(String, String)](Tsv("outputFile")) { + val map = new HashMap[String, List[String]]() + map("service") = Nil + map("service1") = Nil + outputBuffer => outputBuffer foreach { e => + map(e._1) ::= e._2 + } + map("service") mustEqual List("sr") + map("service1") mustEqual List("sr", "cr") + }.run.finish + } + } +} + diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala index 42d049cf35f..04a325e2e69 100644 --- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala +++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/PopularKeysSpec.scala @@ -50,13 +50,13 @@ class PopularKeysSpec extends Specification with TupleConversions { arg("output", "outputFile"). arg("date", "2012-01-01T01:00"). source(PreprocessedSpanSource(), Util.repeatSpan(span, 101, 0, 0) ::: Util.repeatSpan(span1, 50, 200, 0)). - sink[(String, String, Int)](Tsv("outputFile")) { - val map = new HashMap[String, Int]() + sink[(String, String)](Tsv("outputFile")) { + val map = new HashMap[String, List[String]]() + map("service") = Nil outputBuffer => outputBuffer foreach { e => - map(e._1 + e._2) = e._3 + map(e._1) ::= e._2 } - map("servicebye") mustEqual 51 - map("servicehi") mustEqual 102 + map("service") mustEqual List("bye", "hi") }.run.finish }