diff --git a/project/Project.scala b/project/Project.scala
index ade587cbebe..233384b0c0f 100644
--- a/project/Project.scala
+++ b/project/Project.scala
@@ -83,7 +83,7 @@ object Zipkin extends Build {
).dependsOn(thrift)
lazy val hadoopjobrunner = Project(
- id = "zipkinhadoopjobrunner",
+ id = "zipkin-hadoop-job-runner",
base = file("zipkin-hadoop-job-runner"),
settings = Project.defaultSettings ++
StandardProject.newSettings ++
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/HadoopJobClient.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/HadoopJobClient.scala
new file mode 100644
index 00000000000..52df45976d3
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/HadoopJobClient.scala
@@ -0,0 +1,90 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import collection.mutable.HashMap
+import com.twitter.zipkin.gen
+import java.util.Scanner
+
+/**
+ * Basic client for postprocessing hadoop jobs
+ * @param combineSimilarNames whether or not we should differentiate between similar names
+ */
+
+abstract class HadoopJobClient(val combineSimilarNames: Boolean) {
+
+ protected val DELIMITER = ":"
+ val serviceNameList = if (combineSimilarNames) new StandardizedServiceNameList else new ServiceNameList
+
+ /**
+ * Populate the name list
+ * @param s Scanner representing the name list
+ */
+
+ def populateServiceNameList(s: Scanner)
+
+ /**
+ * Returns the key value of a line
+ * @param lineScanner Scanner for a single line
+ * @return the key value returned
+ */
+ def getKeyValue(lineScanner: Scanner): String
+
+ /**
+ * Process a key and its value
+ * @param s the key passed
+ * @param value values associated with the key
+ */
+ def processKey(s: String, value: List[String])
+
+ /**
+ * Starts the postprocessing for the client
+ * @param filename the input filename
+ * @param output the output filename
+ */
+ def start(filename : String, output : String)
+
+ /**
+ * Processes a single file, expected in TSV format, with key being the first value on each row
+ * @param s a Scanner representing a file
+ */
+ def processFile(s: Scanner) {
+ println("Started processFile")
+ var serviceToValues = new HashMap[String, List[String]]()
+ while (s.hasNextLine()) {
+ val line = new Scanner(s.nextLine())
+ val currentString = getKeyValue(line).trim()
+ var value = ""
+ if (line.hasNext()) value = line.next()
+ while (line.hasNext()) {
+ value += " " + line.next()
+ }
+ val serviceName = serviceNameList.getName(currentString)
+ if (serviceToValues.contains(serviceName)) {
+ serviceToValues(serviceName) ::= value
+ } else {
+ serviceToValues += serviceName -> List(value)
+ }
+ }
+ println(serviceToValues)
+ for (t <- serviceToValues) {
+ val (service, values) = t
+ println(service + ", " + values)
+ processKey(service, values)
+ }
+ }
+}
\ No newline at end of file
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServiceClient.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServiceClient.scala
new file mode 100644
index 00000000000..8160e895846
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServiceClient.scala
@@ -0,0 +1,69 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import java.util.Scanner
+import com.twitter.zipkin.gen
+import scala.collection.JavaConverters._
+
+/**
+ * Client which writes to the server, and which writes per service
+ * @param portNumber the port number to write to
+ */
+
+abstract class PerServiceClient(combineSimilarNames: Boolean, portNumber: Int) extends
+ WriteToServerClient(combineSimilarNames, portNumber) {
+
+ def populateServiceNameList(s: Scanner) {
+ if (!combineSimilarNames) return
+ while (s.hasNextLine()) {
+ val line = new Scanner(s.nextLine())
+ serviceNameList.add(line.next())
+ }
+ }
+
+ def getKeyValue(lineScanner: Scanner) = {
+ lineScanner.next()
+ }
+}
+
+/**
+ * Connects to the Zipkin Collector, then processes data from PopularAnnotations and sends it there. This powers the
+ * typeahead functionality for annotations
+ */
+
+class PopularAnnotationsClient(portNumber: Int) extends PerServiceClient(false, portNumber) {
+
+ def processKey(service: String, values: List[String]) {
+ println("Writing " + values.mkString(", ") + " to " + service)
+ client.storeTopAnnotations(service, values.asJava)
+ }
+
+}
+
+/**
+ * Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
+ * typeahead functionality for annotations
+ */
+
+class PopularKeyValuesClient(portNumber: Int) extends PerServiceClient(false, portNumber) {
+
+ def processKey(service: String, values: List[String]) {
+ client.storeTopKeyValueAnnotations(service, values.asJava)
+ }
+
+}
\ No newline at end of file
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServicePairClient.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServicePairClient.scala
new file mode 100644
index 00000000000..56800945676
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/PerServicePairClient.scala
@@ -0,0 +1,41 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import java.util.Scanner
+import com.twitter.zipkin.gen
+
+/**
+ * Client which writes to a server, and per service pair
+ * @param combineSimilarNames
+ * @param portNumber
+ */
+abstract class PerServicePairClient(combineSimilarNames: Boolean, portNumber: Int) extends
+ WriteToServerClient(combineSimilarNames, portNumber) {
+
+ def populateServiceNameList(s: Scanner) {
+ if (!combineSimilarNames) return
+ while (s.hasNextLine()) {
+ val line = new Scanner(s.nextLine())
+ serviceNameList.add(line.next() + DELIMITER + line.next())
+ }
+ }
+
+ def getKeyValue(lineScanner: Scanner) = {
+ lineScanner.next() + DELIMITER + lineScanner.next()
+ }
+}
\ No newline at end of file
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/Postprocess.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/Postprocess.scala
new file mode 100644
index 00000000000..370eea421e8
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/Postprocess.scala
@@ -0,0 +1,85 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+//TODO: Replace (or supplement) this with one main method that runs all jobs
+
+/**
+ * Runs the PopularKeysClient on the input
+ */
+object ProcessPopularKeys {
+ def main(args : Array[String]) {
+ val portNumber = augmentString(args(2)).toInt
+ val c = new PopularKeyValuesClient(portNumber)
+ c.start(args(0), args(1))
+ }
+}
+
+/**
+ * Runs the PopularAnnotationsClient on the input
+ */
+
+object ProcessPopularAnnotations {
+ def main(args : Array[String]) {
+ val portNumber = augmentString(args(2)).toInt
+ println("Arguments: " + args.mkString(", "))
+ val c = new PopularAnnotationsClient(portNumber)
+ c.start(args(0), args(1))
+ }
+}
+
+
+/**
+ * Runs the MemcacheRequestClient on the input
+ */
+
+object ProcessMemcacheRequest {
+ def main(args : Array[String]) {
+ val c = new MemcacheRequestClient()
+ c.start(args(0), args(1))
+ WriteToFileClient.closeAllWriters()
+ }
+}
+
+
+/**
+ * Runs the TimeoutsClient on the input
+ */
+
+object ProcessTimeouts {
+ def main(args : Array[String]) {
+ val c = new TimeoutsClient()
+ c.start(args(0), args(1))
+ WriteToFileClient.closeAllWriters()
+ }
+}
+
+
+/**
+ * Runs the ExpensiveEndpointsClient on the input
+ */
+
+object ProcessExpensiveEndpoints {
+
+ def main(args: Array[String]) {
+ val c = new ExpensiveEndpointsClient()
+ c.start(args(0), args(1))
+ WriteToFileClient.closeAllWriters()
+ }
+
+}
+
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala
deleted file mode 100644
index 194c73a7342..00000000000
--- a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ProcessPopularKeys.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
-* Copyright 2012 Twitter Inc.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package com.twitter.zipkin.hadoop
-
-import org.apache.thrift.TException
-import org.apache.thrift.protocol.TBinaryProtocol
-import scala.collection.JavaConverters._
-import org.apache.thrift.transport.{TFramedTransport, TSocket, TTransport, TTransportException}
-import java.io.{FileNotFoundException, File}
-import com.twitter.zipkin.gen
-import java.net.SocketException
-import java.util.{Arrays, Scanner}
-
-/**
- * Runs the PopularKeysClient on the input
- */
-object ProcessPopularKeys {
- def main(args : Array[String]) {
- val c = new PopularKeysClient()
- val portNumber = augmentString(args(2)).toInt
- val isKeyData = augmentString(args(3)).toBoolean
- c.start(args(0), args(1), portNumber, isKeyData)
- }
-}
-
-/**
- * Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
- * typeahead functionality for annotations
- */
-class PopularKeysClient {
- /**
- * Given a file name, the server name and port number, connects to the server, then writes to it
- * the top 100 key values per service name given the data in filename
- * @param filename
- * @param serverName
- * @param portNumber
- */
- def start(filename : String, serverName : String, portNumber : Int, isKeyData : Boolean) {
- var transport : TTransport = null
- try {
- // establish connection to the server
- transport = new TFramedTransport(new TSocket(serverName, portNumber))
- val protocol = new TBinaryProtocol(transport)
- val client = new gen.ZipkinCollector.Client(protocol)
- transport.open()
- // Read file
- val s = new Scanner(new File(filename))
- var line : Scanner = new Scanner(s.nextLine())
- if (!s.hasNextLine()) return
- var oldService : String = line.next()
- var keys : List[String] = List(line.next())
- while (s.hasNextLine()) {
- line = new Scanner(s.nextLine())
- val currentString = line.next()
- var value = ""
- if (line.hasNext()) value = line.next()
- while (line.hasNext()) {
- value += " " + line.next()
- }
- // Keep adding the keys to the current service's list until we are done with that service
- if (oldService != currentString) {
- // when we are, write that list to the server
- if (isKeyData)
- client.storeTopKeyValueAnnotations(oldService, keys.asJava)
- else
- client.storeTopAnnotations(oldService, keys.asJava)
- println("Writing " + keys.toString + " to service " + oldService)
- // and start processing the new one
- keys = List(value)
- oldService = currentString
- } else {
- keys = keys ::: List(value)
- }
- }
- // Write the last service in the file and its keys as well
- if (isKeyData)
- client.storeTopKeyValueAnnotations(oldService, keys.asJava)
- else
- client.storeTopAnnotations(oldService, keys.asJava)
- } catch {
- case se: SocketException => se.printStackTrace()
- case tte : TTransportException => tte.printStackTrace()
- case te : TException => te.printStackTrace()
- case e : Exception => e.printStackTrace()
- } finally {
- if (transport != null)
- transport.close()
- }
- }
-}
-
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ServiceNameList.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ServiceNameList.scala
new file mode 100644
index 00000000000..b3f6fc82b95
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/ServiceNameList.scala
@@ -0,0 +1,84 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import collection.mutable.{HashMap, HashSet}
+import java.util.{ArrayList}
+import scala.math.Ordering.String
+import org.apache.commons.lang.StringUtils
+
+/**
+ * A basic data structure used to store service names
+ */
+
+class ServiceNameList() {
+
+ protected var serviceNames = new ArrayList[String]()
+ protected var serviceNamesSet = new HashSet[String]()
+
+ def add(name: String) {
+ if (!serviceNamesSet.contains(name)) {
+ serviceNames.add(name)
+ serviceNamesSet += name
+ }
+ }
+
+ def getName(s: String) : String = s
+
+ def getAllNames() = new ArrayList[String](serviceNames)
+
+}
+
+/**
+ * A data structure used to store service names which detects similar names and clumps them together
+ */
+
+class StandardizedServiceNameList() extends ServiceNameList {
+
+ val DISTANCE = 2
+
+ val originalToStandardizedIndex = new HashMap[String, Int]()
+
+ override def add(name: String) {
+ val trimmed = name.trim()
+ if (originalToStandardizedIndex.contains(trimmed)) return
+ var foundMatch = false
+ for (index <- 0 until serviceNames.size()) {
+ val key = serviceNames.get(index)
+ // If these are actually just near spellings of the same service
+ if (StringUtils.getLevenshteinDistance(key.toLowerCase, trimmed.toLowerCase) <= DISTANCE) {
+ foundMatch = true
+ if (String.lteq(trimmed, key)) {
+ serviceNames.set(index, trimmed)
+ }
+ originalToStandardizedIndex += trimmed -> index
+ }
+ }
+ if (!foundMatch) {
+ serviceNames.add(trimmed)
+ originalToStandardizedIndex += trimmed -> (serviceNames.size() - 1)
+ }
+ }
+
+ override def getName(s: String) = {
+ if (!originalToStandardizedIndex.contains(s)) {
+ throw new IllegalArgumentException("Input not in map: " + s)
+ }
+ serviceNames.get(originalToStandardizedIndex(s))
+ }
+
+}
\ No newline at end of file
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToFileClient.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToFileClient.scala
new file mode 100644
index 00000000000..b7dfaf06ec5
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToFileClient.scala
@@ -0,0 +1,198 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import java.io._
+import java.util.Scanner
+import com.twitter.zipkin.gen
+import collection.mutable.HashMap
+
+/**
+ * A client which writes to a file. This is intended for use mainly to format emails
+ * @param combineSimilarNames
+ * @param jobname
+ */
+
+abstract class WriteToFileClient(combineSimilarNames: Boolean, jobname: String) extends HadoopJobClient(combineSimilarNames) {
+
+ protected var outputDir = ""
+
+ def populateServiceNameList(s: Scanner) {
+ if (!combineSimilarNames) return
+ while (s.hasNextLine()) {
+ val line = new Scanner(s.nextLine())
+ serviceNameList.add(getKeyValue(line))
+ }
+ }
+
+ def getKeyValue(lineScanner: Scanner) = {
+ lineScanner.next().replace('/', '.')
+ }
+
+ def start(input: String, outputDir: String) {
+ this.outputDir = outputDir
+ populateServiceNameList(new Scanner(new File(input)))
+ processFile(new Scanner(new File(input)))
+ }
+}
+
+/**
+ * A companion object to the WriteToFileClient which ensures that only one writer is ever open per service
+ */
+
+object WriteToFileClient {
+
+ protected var pws : HashMap[String, PrintWriter] = new HashMap[String, PrintWriter]()
+
+ def getWriter(s: String) = {
+ if (pws.contains(s)) pws(s)
+ else {
+ val pw = new PrintWriter((new FileOutputStream(s, true)))
+ pws += s -> pw
+ pw
+ }
+ }
+
+ def closeAllWriters() {
+ for (s <- pws.keys) {
+ pws(s).close()
+ }
+ }
+
+}
+
+/**
+ * A client which writes MemcacheRequest data to the file specified
+ */
+
+class MemcacheRequestClient extends WriteToFileClient(true, "MemcacheRequest") {
+
+ def processKey(service: String, values: List[String]) {
+ val numberMemcacheRequests = {
+ val valuesToInt = values map ({ s: String => augmentString(s).toInt })
+ valuesToInt.foldLeft(0) ((left: Int, right: Int) => left + right )
+ }
+ val pw = WriteToFileClient.getWriter(outputDir + "/" + service)
+ pw.println(service + " made " + numberMemcacheRequests + " redundant memcache requests")
+ pw.flush()
+ }
+
+}
+
+/**
+ * A client which writes to a file, per each service pair
+ */
+
+abstract class WriteToFilePerServicePairClient(jobname: String) extends WriteToFileClient(false, jobname) {
+
+ def writeHeader(service: String, pw: PrintWriter)
+
+ def writeValue(value: String, pw: PrintWriter)
+
+ def processKey(service: String, values: List[String]) {
+ val pw = WriteToFileClient.getWriter(outputDir + "/" + service)
+ writeHeader(service, pw)
+ values.foreach {value: String => writeValue(value, pw)}
+ pw.flush()
+ }
+}
+
+
+/**
+ * A client which writes Timeouts data to the file specified
+ */
+
+class TimeoutsClient extends WriteToFilePerServicePairClient("Timeouts") {
+
+ def writeHeader(service: String, pw: PrintWriter) {
+ pw.println(service + " timed out in calls to the following services:")
+ }
+
+ def writeValue(value: String, pw: PrintWriter) {
+ pw.println(value)
+ }
+
+}
+
+
+/**
+ * A client which writes Retries data to the file specified
+ */
+
+class RetriesClient extends WriteToFilePerServicePairClient("Retries") {
+
+ def writeHeader(service: String, pw: PrintWriter) {
+ pw.println(service + " retried in calls to the following services:")
+ }
+
+ def writeValue(value: String, pw: PrintWriter) {
+ pw.println(value)
+ }
+
+}
+
+
+/**
+ * A client which writes WorstRuntimes data to the file specified
+ */
+
+class WorstRuntimesClient extends WriteToFilePerServicePairClient("WorstRuntimes") {
+
+ def writeHeader(service: String, pw: PrintWriter) {
+ pw.println("Service " + service + " took the longest for these spans:")
+ }
+
+ def writeValue(value: String, pw: PrintWriter) {
+ pw.println(value)
+ }
+
+}
+
+
+/**
+ * A client which writes WorstRuntimesPerTrace data to the file specified. Formats it as a HTML url
+ */
+
+class WorstRuntimesPerTraceClient(zipkinUrl: String) extends WriteToFilePerServicePairClient("WorstRuntimesPerTrace") {
+
+ def writeHeader(service: String, pw: PrintWriter) {
+ pw.println("Service " + service + " took the longest for these traces:")
+ }
+
+ // TODO: Use script to determine which traces are sampled, then wrap in pretty HTML
+ def writeValue(value: String, pw: PrintWriter) {
+ pw.println("" + value + "")
+ }
+
+}
+
+
+/**
+ * A client which writes ExpensiveEndpoints data to the file specified
+ */
+
+class ExpensiveEndpointsClient extends WriteToFilePerServicePairClient("ExpensiveEndpoints") {
+
+ def writeHeader(service: String, pw: PrintWriter) {
+ pw.println("The most expensive calls for " + service + " were:")
+ }
+
+ def writeValue(value: String, pw: PrintWriter) {
+ pw.println(value)
+ }
+
+}
diff --git a/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToServerClient.scala b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToServerClient.scala
new file mode 100644
index 00000000000..ba5d09d753a
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/main/scala/com/twitter/zipkin/hadoop/WriteToServerClient.scala
@@ -0,0 +1,52 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import org.apache.thrift.protocol.TBinaryProtocol
+import com.twitter.zipkin.gen
+import java.util.Scanner
+import java.io.File
+import java.net.SocketException
+import org.apache.thrift.transport.{TTransportException, TSocket, TFramedTransport, TTransport}
+import org.apache.thrift.TException
+
+abstract class WriteToServerClient(combineSimilarNames: Boolean, portNumber: Int) extends HadoopJobClient(combineSimilarNames) {
+
+ protected var client : gen.ZipkinCollector.Client = null
+
+ def start(filename: String, serverName: String) {
+ var transport : TTransport = null
+ try {
+ // establish connection to the server
+ transport = new TFramedTransport(new TSocket(serverName, portNumber))
+ val protocol = new TBinaryProtocol(transport)
+ client = new gen.ZipkinCollector.Client(protocol)
+ transport.open()
+ // Read file
+ println("Connected to server...populating ssnm")
+ populateServiceNameList(new Scanner(new File(filename)))
+ println("Finished populating ssnm...beginning processFile")
+ processFile(new Scanner(new File(filename)))
+ } catch {
+ // TODO: Investigate using logging
+ case t: Throwable => t.printStackTrace()
+ } finally {
+ if (transport != null)
+ transport.close()
+ }
+ }
+}
diff --git a/zipkin-hadoop-job-runner/src/test/scala/com/twitter/zipkin/hadoop/StandardizedServiceNameListSpec.scala b/zipkin-hadoop-job-runner/src/test/scala/com/twitter/zipkin/hadoop/StandardizedServiceNameListSpec.scala
new file mode 100644
index 00000000000..9934edf3559
--- /dev/null
+++ b/zipkin-hadoop-job-runner/src/test/scala/com/twitter/zipkin/hadoop/StandardizedServiceNameListSpec.scala
@@ -0,0 +1,37 @@
+package com.twitter.zipkin.hadoop
+
+import org.specs.Specification
+
+class StandardizedServiceNameListSpec extends Specification {
+
+ "StandardizedServiceNameList" should {
+ "Add correctly if there are no similar names" in {
+ var ssnm = new StandardizedServiceNameList()
+ ssnm.add("hello")
+ ssnm.add("aloha")
+ ssnm.getName("hello") mustEqual "hello"
+ ssnm.getName("aloha") mustEqual "aloha"
+ }
+ "Add correctly if there are exactly duplicated names" in {
+ var ssnm = new StandardizedServiceNameList()
+ ssnm.add("hello")
+ ssnm.add("hello")
+ ssnm.add("aloha")
+ ssnm.getName("hello") mustEqual "hello"
+ ssnm.getName("aloha") mustEqual "aloha"
+ }
+ "Add correctly if there are near duplicate names" in {
+ var ssnm = new StandardizedServiceNameList()
+ ssnm.add("hello")
+ ssnm.add("alpha")
+ ssnm.add("aloha")
+ ssnm.add("aloha1")
+ ssnm.add("hello123")
+ ssnm.getName("hello") mustEqual "hello"
+ ssnm.getName("hello123") mustEqual "hello123"
+ ssnm.getName("aloha") mustEqual "aloha"
+ ssnm.getName("alpha") mustEqual "aloha"
+ ssnm.getName("aloha1") mustEqual "aloha"
+ }
+ }
+}
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala
index 48d3e105798..df4af830dff 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala
@@ -38,6 +38,7 @@ class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
+ .map('name_1 -> 'name_1){ s: String => if (s == null) Util.UNKNOWN_SERVICE_NAME else s }
.groupBy('service, 'name_1){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala
index 261cf3e9fd4..19669014b36 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/ExpensiveEndpoints.scala
@@ -57,6 +57,8 @@ class ExpensiveEndpoints(args : Args) extends Job(args) with DefaultDateRangeJob
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id_1, idName)
+ .map('name_1 -> 'name_1){ s: String => if (s == null) Util.UNKNOWN_SERVICE_NAME else s }
.groupBy('name_1, 'service){ _.average('duration) }
+ .groupBy('name_1, 'service){ _.sortBy('duration).reverse}
.write(Tsv(args("output")))
}
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala
new file mode 100644
index 00000000000..52b3c04dd91
--- /dev/null
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/FindDuplicateTraces.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.twitter.zipkin.hadoop
+
+import com.twitter.zipkin.gen.{BinaryAnnotation, Constants, SpanServiceName, Annotation}
+import com.twitter.scalding.{Tsv, DefaultDateRangeJob, Job, Args}
+import com.twitter.zipkin.hadoop.sources.{Util, PreprocessedSpanSource}
+import java.nio.ByteBuffer
+
+
+/**
+ * Finds traces with duplicate trace IDs
+ */
+
+class FindDuplicateTraces(args: Args) extends Job(args) with DefaultDateRangeJob {
+
+ val maxDuration = augmentString(args.required("maximum_duration")).toInt
+
+ val result = PreprocessedSpanSource()
+ .read
+ .mapTo(0 ->('trace_id, 'annotations)) { s: SpanServiceName =>
+ (s.trace_id, s.annotations.toList)
+ }.flatMap('annotations -> 'first_and_last_timestamps ) {al : List[Annotation] =>
+ var first : Long = if (al.length > 0) al(0).timestamp else Int.MaxValue
+ var last : Long = if (al.length > 0) al(0).timestamp else -1
+ al.foreach { a : Annotation =>
+ val timestamp = a.timestamp
+ if (timestamp < first) first = timestamp
+ else if (timestamp > last) last = timestamp
+ }
+ if (first < Int.MaxValue && last > -1) Some(List(first, last)) else None
+ }.groupBy('trace_id){ _.reduce('first_and_last_timestamps -> 'first_and_last_timestamps) { (left : List[Long], right : List[Long]) =>
+ val first = if (left(0) > right(0)) right(0) else left(0)
+ val last = if (left(1) > right(1)) left(1) else right(1)
+ List(first, last)
+ }
+ }
+ .filter('first_and_last_timestamps) { timestamps : List[Long] =>
+ val durationInSeconds = (timestamps(1) - timestamps(0)) / 1000000
+ durationInSeconds >= maxDuration
+ }.project('trace_id)
+ .write(Tsv(args("output")))
+}
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala
index 5d3d79d4090..b95352c22f2 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/MemcacheRequest.scala
@@ -19,8 +19,8 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
-import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
-import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource, Util}
+import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource, Util}
+import com.twitter.zipkin.gen._
/**
* Find out how often each service does memcache accesses
@@ -29,26 +29,24 @@ class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = PrepNoNamesSpanSource()
.read
- .mapTo(0 -> ('annotations, 'binary_annotations))
- { s: Span => (s.annotations.toList, s.binary_annotations.toList) }
+ .mapTo(0 -> ('parent_id, 'binary_annotations))
+ { s: Span => (s.parent_id, s.binary_annotations.toList) }
- val result = preprocessed
- // from the annotations, find the service name
- .flatMap(('annotations, 'binary_annotations) -> ('service, 'memcacheNames)){ abl : (List[Annotation], List[BinaryAnnotation]) =>
- var clientSent: Option[Annotation] = None
- abl match { case (al, bl) =>
- al.foreach { a : Annotation =>
- if (Constants.CLIENT_SEND.equals(a.getValue)) clientSent = Some(a)
- }
+ val memcacheNames = preprocessed
+ .flatMap('binary_annotations -> 'memcacheNames){ bal : List[BinaryAnnotation] =>
// from the binary annotations, find the value of the memcache visits if there are any
- var memcachedKeys : Option[BinaryAnnotation] = None
- bl.foreach { ba : BinaryAnnotation => if (ba.key == "memcached.keys") memcachedKeys = Some(ba) }
- for (cs <- clientSent; key <- memcachedKeys)
- yield (cs.getHost.service_name, new String(Util.getArrayFromBuffer(key.value)))
- }
+ bal.find { ba : BinaryAnnotation => ba.key == "memcached.keys" }
}
- .project('service, 'memcacheNames)
- .groupBy('service, 'memcacheNames){ _.size('count) }
+ .project('parent_id, 'memcacheNames)
+
+ val memcacheRequesters = PreprocessedSpanSource()
+ .read
+ .mapTo(0 -> ('trace_id, 'id, 'service))
+ { s: SpanServiceName => (s.trace_id, s.id, s.service_name)}
+ .joinWithSmaller('id -> 'parent_id, memcacheNames)
+ .groupBy('trace_id, 'service, 'memcacheNames){ _.size('count) }
+ .filter('count) { count: Int => count > 1 }
+ .groupBy('service){ _.size('count) }
.write(Tsv(args("output")))
}
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala
index a447568414f..fc5f01d7e3c 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/Timeouts.scala
@@ -50,6 +50,7 @@ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {
.filter('annotations){annotations : List[Annotation] => annotations.exists({a : Annotation => a.value == input})}
.project('id, 'parent_id, 'service)
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
+ .map('name_1 -> 'name_1){ s: String => if (s == null) Util.UNKNOWN_SERVICE_NAME else s }
.project('service, 'name_1)
.groupBy('service, 'name_1){ _.size('numTimeouts) }
.write(Tsv(args("output")))
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala
index 81ed127f27d..d8755003af0 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimes.scala
@@ -17,8 +17,8 @@
package com.twitter.zipkin.hadoop
import com.twitter.scalding._
-import com.twitter.zipkin.gen.{Span, Constants, Annotation}
-import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource}
+import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource}
+import com.twitter.zipkin.gen.{SpanServiceName, Span, Constants, Annotation}
/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
@@ -28,16 +28,15 @@ class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {
val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)
- val preprocessed = PrepNoNamesSpanSource()
+ val preprocessed = PreprocessedSpanSource()
.read
- .mapTo(0 -> ('id, 'annotations)) {
- s : Span => (s.id, s.annotations.toList)
+ .mapTo(0 -> ('service, 'id, 'annotations)) {
+ s : SpanServiceName => (s.service_name, s.id, s.annotations.toList)
}
val result = preprocessed
- .project('id, 'annotations)
// let's find those client annotations and convert into service name and duration
- .flatMap('annotations -> ('service, 'duration)) { annotations: List[Annotation] =>
+ .flatMap('annotations -> 'duration) { annotations: List[Annotation] =>
var clientSend: Option[Annotation] = None
var clientReceived: Option[Annotation] = None
annotations.foreach { a =>
@@ -46,7 +45,7 @@ class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {
}
// only return a value if we have both annotations
for (cs <- clientSend; cr <- clientReceived)
- yield (cs.getHost.service_name, (cr.timestamp - cs.timestamp) / 1000)
+ yield (cr.timestamp - cs.timestamp) / 1000
}.discard('annotations)
//sort by duration, find the 100 largest
.groupBy('service) { _.sortBy('duration).reverse.take(100)}
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala
new file mode 100644
index 00000000000..d8ef0786022
--- /dev/null
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTrace.scala
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.twitter.zipkin.hadoop
+
+import com.twitter.scalding._
+import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource}
+import com.twitter.zipkin.gen.{SpanServiceName, Span, Constants, Annotation}
+
+/**
+ * Obtain the IDs and the durations of the one hundred service calls which take the longest per service
+ */
+
+class WorstRuntimesPerTrace(args: Args) extends Job(args) with DefaultDateRangeJob {
+
+ val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)
+
+ val preprocessed = PreprocessedSpanSource()
+ .read
+ .mapTo(0 -> ('service, 'trace_id, 'annotations)) {
+ s : SpanServiceName => (s.service_name, s.trace_id, s.annotations.toList)
+ }
+
+ val result = preprocessed
+ // let's find those client annotations and convert into service name and duration
+ .flatMap('annotations -> 'duration) { annotations: List[Annotation] =>
+ var clientSend: Option[Annotation] = None
+ var clientReceived: Option[Annotation] = None
+ annotations.foreach { a =>
+ if (Constants.CLIENT_SEND.equals(a.getValue)) clientSend = Some(a)
+ if (Constants.CLIENT_RECV.equals(a.getValue)) clientReceived = Some(a)
+ }
+ // only return a value if we have both annotations
+ for (cs <- clientSend; cr <- clientReceived)
+ yield (cr.timestamp - cs.timestamp) / 1000
+ }.discard('annotations)
+ //sort by duration, find the 100 largest
+ .groupBy('service, 'trace_id) { _.sum('duration) }
+ .groupBy('service) { _.sortBy('duration).reverse.take(100)}
+ .write(Tsv(args("output")))
+
+}
\ No newline at end of file
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
index 120337cb4da..6aa1beee1d2 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Util.scala
@@ -131,33 +131,4 @@ object Util {
}
}
}
-
- /**
- * Given two strings, finds the minimum edit distance between them given only substitutions, deletions, and additions with the
- * Levenshtein algorithm.
- * @param s1 a String
- * @param s2 another String
- * @return the edit distance between them after they're converted to lower case.
- */
- def getLevenshteinDistance(s1 : String, s2 : String) : Int = {
- val shorter = if (s1.length() > s2.length() ) s2.toLowerCase else s1.toLowerCase
- val longer = if (s1.length() > s2.length() ) s1.toLowerCase else s2.toLowerCase
-
- var lastAndCurrentRow = ((0 to shorter.length() ).toArray, new Array[Int](shorter.length() + 1))
- for (i <- 1 to longer.length()) {
- val (lastRow, currentRow) = lastAndCurrentRow
- currentRow(0) = i
- for (j <- 1 to shorter.length()) {
- if (longer.charAt(i - 1) == shorter.charAt(j - 1)) {
- // they match; edit distance stays the same
- currentRow(j) = lastRow(j - 1)
- } else {
- // they differ; edit distance is the min of substitution, deletion, and addition
- currentRow(j) = math.min(math.min( lastRow(j), currentRow(j - 1) ), lastRow(j - 1)) + 1
- }
- }
- lastAndCurrentRow = lastAndCurrentRow.swap
- }
- return lastAndCurrentRow._1(shorter.length())
- }
}
\ No newline at end of file
diff --git a/zipkin-hadoop/src/scripts/run_all_jobs.rb b/zipkin-hadoop/src/scripts/run_all_jobs.rb
index b659b721b11..9b67c264579 100755
--- a/zipkin-hadoop/src/scripts/run_all_jobs.rb
+++ b/zipkin-hadoop/src/scripts/run_all_jobs.rb
@@ -53,13 +53,14 @@ def self.parse(args)
options = OptparseAllJobArguments.parse(ARGV)
-date_cmd = options.dates.join(" ")
+date_cmd_UTC = options.dates.join(" ") + " -t UTC"
end_date_cmd = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
+end_date_cmd_UTC = end_date_cmd + " -t UTC"
config_string = options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
run_job_cmd = "ruby " + File.dirname(__FILE__) + "/run_job.rb" + config_string
puts "Run_job_command = " + run_job_cmd
-puts "Date = " + date_cmd
+puts "Date = " + date_cmd_UTC
def run_jobs_in_parallel(prep, jobs)
threads = []
@@ -73,22 +74,25 @@ def run_jobs_in_parallel(prep, jobs)
end
#Run the commands
-system(run_job_cmd + " -j Preprocessed -p -d " + date_cmd)
+system(run_job_cmd + " -j Preprocessed -p -d " + date_cmd_UTC)
-jobs_set_1 = Array[ run_job_cmd + " -j WorstRuntimes -o " + options.output + "/WorstRuntimes -d " + end_date_cmd,
- run_job_cmd + " -j MemcacheRequest -o " + options.output + "/MemcacheRequest -d " + end_date_cmd]
+jobs_set_1 = Array[ ]
-jobs_set_2 = Array[ run_job_cmd + " -j PopularKeys -o " + options.output + "/PopularKeys -d " + end_date_cmd,
- run_job_cmd + " -j PopularAnnotations -o " + options.output + "/PopularAnnotations -d " + end_date_cmd
-# run_job_cmd + " -j WhaleReport -o " + options.output + "/WhaleReport -d " + end_date_cmd,
+jobs_set_2 = Array[ run_job_cmd + " -j PopularKeys -o " + options.output + "/PopularKeys -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j PopularAnnotations -o " + options.output + "/PopularAnnotations -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j MemcacheRequest -o " + options.output + "/MemcacheRequest -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j WorstRuntimes -o " + options.output + "/WorstRuntimes -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j WorstRuntimesPerTrace -o " + options.output + "/WorstRuntimesPerTrace -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j WhaleReport -o " + options.output + "/WhaleReport -d " + end_date_cmd_UTC
]
-jobs_set_3 = Array[ run_job_cmd + " -j DependencyTree -o " + options.output + "/DependencyTree -d " + end_date_cmd,
- run_job_cmd + " -j Timeouts -s \" --error_type finagle.timeout \" -o " + options.output + "/Timeouts -d " + end_date_cmd,
- run_job_cmd + " -j Timeouts -s \" --error_type finagle.retry \" -o " + options.output + "/Retries -d " + end_date_cmd ]
+jobs_set_3 = Array[ run_job_cmd + " -j DependencyTree -o " + options.output + "/DependencyTree -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j ExpensiveEndpoints -o " + options.output + "/ExpensiveEndpoints -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j Timeouts -s \" --error_type finagle.timeout \" -o " + options.output + "/Timeouts -d " + end_date_cmd_UTC,
+ run_job_cmd + " -j Timeouts -s \" --error_type finagle.retry \" -o " + options.output + "/Retries -d " + end_date_cmd_UTC ]
-jobs = run_jobs_in_parallel(run_job_cmd + " -j FindNames -p -d " + end_date_cmd, jobs_set_1)
-jobs += run_jobs_in_parallel(run_job_cmd + " -j FindIDtoName -p -d " + end_date_cmd, jobs_set_2)
+jobs = run_jobs_in_parallel(run_job_cmd + " -j FindNames -p -d " + end_date_cmd_UTC, jobs_set_1)
+jobs += run_jobs_in_parallel(run_job_cmd + " -j FindIDtoName -p -d " + end_date_cmd_UTC, jobs_set_2)
jobs += run_jobs_in_parallel("", jobs_set_3)
jobs.each { |aThread| aThread.join }
diff --git a/zipkin-hadoop/src/scripts/run_job.rb b/zipkin-hadoop/src/scripts/run_job.rb
index f6a90d1cf61..b28acaea198 100755
--- a/zipkin-hadoop/src/scripts/run_job.rb
+++ b/zipkin-hadoop/src/scripts/run_job.rb
@@ -20,12 +20,13 @@ def self.parse(args)
options.job = nil
options.uses_settings = false
options.uses_hadoop_config = false
+ options.set_timezone = false
options.dates = []
options.output = ""
options.preprocessor = false
opts = OptionParser.new do |opts|
- opts.banner = "Usage: run_job.rb -j JOB -d DATE -o OUTPUT -p -s SETTINGS -c CONFIG"
+ opts.banner = "Usage: run_job.rb -j JOB -d DATE -o OUTPUT -p -t TIMEZONE -s SETTINGS -c CONFIG"
opts.separator ""
opts.separator "Specific options:"
@@ -49,6 +50,11 @@ def self.parse(args)
options.preprocessor = true
end
+ opts.on("-t", "--tz [TIMEZONE]", "Specify timezone for job. Default is local time") do |timezone|
+ options.set_timezone = true
+ options.timezone = timezone || ''
+ end
+
opts.on("-s", "--settings [SETTINGS]", "Optional settings for the job") do |settings|
options.uses_settings = true
options.settings = settings || ''
@@ -110,7 +116,8 @@ def date_to_cmd(date)
cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
settings_string = options.uses_settings ? " " + options.settings : ""
cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
-cmd_args = options.job + settings_string + " --date " + cmd_date + " --tz UTC"
+timezone_cmd = options.set_timezone ? " --tz " + options.timezone : ""
+cmd_args = options.job + settings_string + " --date " + cmd_date + timezone_cmd
if options.preprocessor
if not remote_file_exists?(time_to_remote_file(end_date, options.job + "/") + "/_SUCCESS", options)
diff --git a/zipkin-hadoop/src/scripts/scald.rb b/zipkin-hadoop/src/scripts/scald.rb
index 603b185876e..d34265f456c 100755
--- a/zipkin-hadoop/src/scripts/scald.rb
+++ b/zipkin-hadoop/src/scripts/scald.rb
@@ -15,7 +15,7 @@
require 'fileutils'
require 'thread'
-JAR_VERSION="0.2.0-SNAPSHOT"
+JAR_VERSION="0.3.0-SNAPSHOT"
#Usage : scald.rb [--hdfs|--local|--print] job
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala
index 1eac6801361..745a4c1622a 100644
--- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/DependencyTreeSpec.scala
@@ -59,15 +59,15 @@ class DependencyTreeSpec extends Specification with TupleConversions {
.source(PrepTsvSource(), Util.getSpanIDtoNames(spans))
.sink[(String, String, Long)](Tsv("outputFile")) {
val map = new HashMap[String, Long]()
- map("service, null") = 0
+ map("service, " + Util.UNKNOWN_SERVICE_NAME) = 0
map("service1, service") = 0
- map("service1, null") = 0
+ map("service1, " + Util.UNKNOWN_SERVICE_NAME) = 0
outputBuffer => outputBuffer foreach { e =>
map(e._1 + ", " + e._2) = e._3
}
- map("service, null") mustEqual 31
+ map("service, " + Util.UNKNOWN_SERVICE_NAME) mustEqual 31
map("service1, service") mustEqual 31
- map("service1, null") mustEqual 20
+ map("service1, " + Util.UNKNOWN_SERVICE_NAME) mustEqual 20
}.run.finish
}
}
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala
new file mode 100644
index 00000000000..e26beb96f49
--- /dev/null
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/FindDuplicateTracesSpec.scala
@@ -0,0 +1,67 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import org.specs.Specification
+import com.twitter.zipkin.gen
+import com.twitter.scalding._
+import gen.AnnotationType
+import scala.collection.JavaConverters._
+import collection.mutable.HashMap
+import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}
+
+/**
+ * Tests that DependencyTree finds all service calls and how often per pair
+ * of endpoints
+ */
+
+class FindDuplicateTracesSpec extends Specification with TupleConversions {
+ noDetailedDiffs()
+
+ implicit val dateRange = DateRange(RichDate(123), RichDate(321))
+
+ val endpoint = new gen.Endpoint(123, 666, "service")
+ val endpoint1 = new gen.Endpoint(123, 666, "service1")
+ val endpoint2 = new gen.Endpoint(123, 666, "service2")
+ val span = new gen.SpanServiceName(12345, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint)).asJava,
+ List[gen.BinaryAnnotation]().asJava, "service")
+ val span1 = new gen.SpanServiceName(12345, "methodcall", 444,
+ List(new gen.Annotation(2735959595959L, "cs").setHost(endpoint1), new gen.Annotation(4000, "sr").setHost(endpoint1)).asJava,
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
+ val span2 = new gen.SpanServiceName(1234567, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")
+
+ val spans = Util.repeatSpan(span, 2, 40, 1) ++ Util.repeatSpan(span1, 2, 200, 40) ++ Util.repeatSpan(span2, 1, 40, 1)
+
+ "FindDuplicateTraces" should {
+ "Find exactly the traces that take longer than 10 minutes to run" in {
+ JobTest("com.twitter.zipkin.hadoop.FindDuplicateTraces")
+ .arg("input", "inputFile")
+ .arg("output", "outputFile")
+ .arg("date", "2012-01-01T01:00")
+ .arg("maximum_duration", "600")
+ .source(PreprocessedSpanSource(), spans)
+ .sink[Long](Tsv("outputFile")) {
+ outputBuffer => outputBuffer foreach { e =>
+ e mustEqual 12345
+ }
+ }.run.finish
+ }
+ }
+}
\ No newline at end of file
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala
index 5c8fcb6e3d2..62fffef2cd1 100644
--- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/MemcacheRequestSpec.scala
@@ -20,10 +20,10 @@ import org.specs.Specification
import com.twitter.zipkin.gen
import com.twitter.scalding._
import gen.AnnotationType
-import com.twitter.zipkin.hadoop.sources.{PrepNoNamesSpanSource, Util}
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import java.nio.ByteBuffer
+import sources.{PreprocessedSpanSource, PrepNoNamesSpanSource, Util}
/**
* Tests that MemcacheRequest finds, per service and memcache request type, the number
@@ -39,15 +39,22 @@ class MemcacheRequestSpec extends Specification with TupleConversions {
val endpoint = new gen.Endpoint(123, 666, "service")
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val endpoint2 = new gen.Endpoint(123, 666, "service2")
- val span = new gen.Span(12345, "methodcall", 666,
- List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
- List[gen.BinaryAnnotation]().asJava)
- val span1 = new gen.Span(123456, "methodcall", 666,
+ val span = new gen.SpanServiceName(12345, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint)).asJava,
+ List[gen.BinaryAnnotation]().asJava, "service")
+ val span1 = new gen.Span(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(4000, "cr").setHost(endpoint1)).asJava,
List(new gen.BinaryAnnotation("memcached.keys", ByteBuffer.allocate(4).putInt(0, 10), AnnotationType.BOOL)).asJava)
- val span2 = new gen.Span(1234567, "methodcall", 666,
- List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
+ val span2 = new gen.SpanServiceName(12346, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "sr").setHost(endpoint)).asJava,
+ List[gen.BinaryAnnotation]().asJava, "service")
+ val span3 = new gen.Span(123456, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(4000, "cr").setHost(endpoint1)).asJava,
List(new gen.BinaryAnnotation("memcached.keys", ByteBuffer.allocate(4).putInt(0, 10), AnnotationType.BOOL)).asJava)
+ val span4 = new gen.Span(123456, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(4000, "cr").setHost(endpoint1)).asJava,
+ List(new gen.BinaryAnnotation("foobar", ByteBuffer.allocate(4).putInt(0, 10), AnnotationType.BOOL)).asJava)
+
"MemcacheRequest" should {
@@ -56,18 +63,18 @@ class MemcacheRequestSpec extends Specification with TupleConversions {
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
- source(PrepNoNamesSpanSource(), (Util.repeatSpan(span, 100, 1000, 0) ++ Util.repeatSpan(span2, 10, 0, 0) ++ Util.repeatSpan(span1, 20, 100, 0))).
- sink[(String, String, Long)](Tsv("outputFile")) {
+ source(PrepNoNamesSpanSource(), Util.repeatSpan(span1, 10, 100, 0) ++ Util.repeatSpan(span3, 2, 200, 300) ++ Util.repeatSpan(span3, 0, 1000, 500) ++ Util.repeatSpan(span4, 2, 1000, 11) ).
+ source(PreprocessedSpanSource(), Util.repeatSpan(span, 12, 0, 20) ++ Util.repeatSpan(span2, 2, 300, 400) ++ Util.repeatSpan(span2, 0, 500, 100000)).
+ sink[(String, Long)](Tsv("outputFile")) {
val counts = new HashMap[String, Long]()
counts("service") = 0
counts("service1") = 0
- counts("service2") = 0
outputBuffer => outputBuffer foreach { e =>
- counts(e._1) += e._3
+ counts(e._1) += e._2
+ println(e)
}
- counts("service") mustEqual 0
- counts("service1") mustEqual 21
- counts("service2") mustEqual 11
+ counts("service") mustEqual 2
+ counts("service1") mustEqual 0
}.run.finish
}
}
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala
index 5c1f66e1761..2cc7d289384 100644
--- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/TimeoutsSpec.scala
@@ -66,15 +66,15 @@ class TimeoutsSpec extends Specification with TupleConversions {
source(PrepTsvSource(), Util.getSpanIDtoNames(spans)).
sink[(String, String, Long)](Tsv("outputFile")) {
val map = new HashMap[String, Long]()
- map("service, null") = 0
+ map("service, " + Util.UNKNOWN_SERVICE_NAME) = 0
map("service2, service1") = 0
- map("service2, null") = 0
+ map("service2, " + Util.UNKNOWN_SERVICE_NAME) = 0
outputBuffer => outputBuffer foreach { e =>
map(e._1 + ", " + e._2) = e._3
}
- map("service, null") mustEqual 102
+ map("service, " + Util.UNKNOWN_SERVICE_NAME) mustEqual 102
map("service2, service1") mustEqual 21
- map("service2, null") mustEqual 10
+ map("service2, " + Util.UNKNOWN_SERVICE_NAME) mustEqual 10
}.run.finish
}
}
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/UtilSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/UtilSpec.scala
index 2bd27e488fd..2de5c73abc9 100644
--- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/UtilSpec.scala
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/UtilSpec.scala
@@ -74,26 +74,4 @@ class UtilSpec extends Specification {
}
}
- "Util.getLevenshteinDistance" should {
- "get correct edit distance when a string is empty" in {
- Util.getLevenshteinDistance("whee", "") must be_==(4)
- Util.getLevenshteinDistance("", "pie") must be_==(3)
- }
- "get correct edit distance when deletions are necessary" in {
- Util.getLevenshteinDistance("hi", "h") must be_==(1)
- Util.getLevenshteinDistance("hihi", "hh") must be_==(2)
- }
- "get correct edit distance when substitutions are necessary" in {
- Util.getLevenshteinDistance("hi", "ho") must be_==(1)
- Util.getLevenshteinDistance("hihi", "heha") must be_==(2)
- }
- "get correct edit distance when additions are necessary" in {
- Util.getLevenshteinDistance("h", "ho") must be_==(1)
- Util.getLevenshteinDistance("hh", "heha") must be_==(2)
- }
- "get correct edit distance in general case" in {
- Util.getLevenshteinDistance("aloha", "BoCa") must be_==(3)
- Util.getLevenshteinDistance("all", "lK") must be_==(2)
- }
- }
}
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala
new file mode 100644
index 00000000000..00519c98328
--- /dev/null
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesPerTraceSpec.scala
@@ -0,0 +1,71 @@
+/*
+* Copyright 2012 Twitter Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.twitter.zipkin.hadoop
+
+import org.specs.Specification
+import com.twitter.zipkin.gen
+import com.twitter.scalding._
+import gen.AnnotationType
+import scala.collection.JavaConverters._
+import collection.mutable.HashMap
+import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}
+
+/**
+ * Tests that DependencyTree finds all service calls and how often per pair
+ * of endpoints
+ */
+
+class WorstRuntimesPerTraceSpec extends Specification with TupleConversions {
+ noDetailedDiffs()
+
+ implicit val dateRange = DateRange(RichDate(123), RichDate(321))
+
+ val endpoint = new gen.Endpoint(123, 666, "service")
+ val endpoint1 = new gen.Endpoint(123, 666, "service1")
+ val endpoint2 = new gen.Endpoint(123, 666, "service1")
+ val span = new gen.SpanServiceName(12345, "methodcall", 666,
+ List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
+ List[gen.BinaryAnnotation]().asJava, "service")
+ val span1 = new gen.SpanServiceName(12345, "methodcall", 444,
+ List(new gen.Annotation(2000, "cs").setHost(endpoint1), new gen.Annotation(4000, "cr").setHost(endpoint1)).asJava,
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
+ val span2 = new gen.SpanServiceName(12345, "methodcall", 666,
+ List(new gen.Annotation(10000, "cs").setHost(endpoint2), new gen.Annotation(13000, "cr").setHost(endpoint2)).asJava,
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
+
+ val spans = Util.repeatSpan(span, 1, 40, 1) ++ Util.repeatSpan(span1, 1, 200, 40) ++ Util.repeatSpan(span2, 0, 40, 1)
+
+ "WorstRuntimesPerTrace" should {
+ "Find services which took the longest time per trace" in {
+ JobTest("com.twitter.zipkin.hadoop.WorstRuntimesPerTrace")
+ .arg("input", "inputFile")
+ .arg("output", "outputFile")
+ .arg("date", "2012-01-01T01:00")
+ .source(PreprocessedSpanSource(), spans)
+ .sink[(String, Long, Int)](Tsv("outputFile")) {
+ val map = new HashMap[String, Int]()
+ outputBuffer => outputBuffer foreach { e =>
+ map(e._1 + ", " + e._2) = e._3
+ }
+ map("service, 12345") mustEqual 2
+ map("service1, 12345") mustEqual 7
+ }.run.finish
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala
index e87a4196cfb..fa949006a8a 100644
--- a/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala
+++ b/zipkin-hadoop/src/test/scala/com/twitter/zipkin/hadoop/WorstRuntimesSpec.scala
@@ -22,7 +22,7 @@ import com.twitter.scalding._
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import com.twitter.zipkin.gen.AnnotationType
-import com.twitter.zipkin.hadoop.sources.{Util, PrepNoNamesSpanSource}
+import sources.{PreprocessedSpanSource, Util, PrepNoNamesSpanSource}
/**
* Tests that WorstRuntimes finds the spans which take the longest to run
@@ -37,15 +37,15 @@ class WorstRuntimesSpec extends Specification with TupleConversions {
val endpoint = new gen.Endpoint(123, 666, "service")
val endpoint1 = new gen.Endpoint(123, 666, "service1")
val endpoint2 = new gen.Endpoint(123, 666, "service2")
- val span = new gen.Span(12345, "methodcall", 666,
+ val span = new gen.SpanServiceName(12345, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint), new gen.Annotation(2000, "cr").setHost(endpoint)).asJava,
- List[gen.BinaryAnnotation]().asJava)
- val span1 = new gen.Span(123456, "methodcall", 666,
+ List[gen.BinaryAnnotation]().asJava, "service")
+ val span1 = new gen.SpanServiceName(123456, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint1), new gen.Annotation(4000, "cr").setHost(endpoint1)).asJava,
- List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava)
- val span2 = new gen.Span(1234567, "methodcall", 666,
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service1")
+ val span2 = new gen.SpanServiceName(1234567, "methodcall", 666,
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
- List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava)
+ List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2")
"WorstRuntimes" should {
@@ -54,19 +54,19 @@ class WorstRuntimesSpec extends Specification with TupleConversions {
.arg("input", "inputFile")
.arg("output", "outputFile")
.arg("date", "2012-01-01T01:00")
- .source(PrepNoNamesSpanSource(), (Util.repeatSpan(span, 20, 0, 0) ++ Util.repeatSpan(span1, 20, 100, 0)))
- .sink[(Long, String, Long)](Tsv("outputFile")) {
+ .source(PreprocessedSpanSource(), (Util.repeatSpan(span, 20, 0, 0) ++ Util.repeatSpan(span1, 20, 100, 0)))
+ .sink[(String, Long, Long)](Tsv("outputFile")) {
var counts = new HashMap[String, Long]()
counts += ("service" -> 0)
counts += ("service1" -> 0)
counts += ("service2" -> 0)
outputBuffer => outputBuffer foreach { e =>
- if (e._2 == "service") {
+ if (e._1 == "service") {
e._3 mustEqual 1
- } else if (e._2 == "service1") {
+ } else if (e._1 == "service1") {
e._3 mustEqual 3
}
- counts(e._2) += 1
+ counts(e._1) += 1
}
counts("service") mustEqual 21
counts("service1") mustEqual 21