Skip to content

Commit

Permalink
Add endpoints. Made clients to postprocess hadoop jobs
Browse files Browse the repository at this point in the history
Made postprocessing framework, so that it is straightforward to add new
endpoints. Made postprocessors which write to servers and files.

Author: @jerryli9876
Fixes #92
URL: #92
  • Loading branch information
jerryli9876 committed Aug 3, 2012
1 parent cd48498 commit bc52d61
Show file tree
Hide file tree
Showing 28 changed files with 1,003 additions and 234 deletions.
2 changes: 1 addition & 1 deletion project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object Zipkin extends Build {
).dependsOn(thrift)

lazy val hadoopjobrunner = Project(
id = "zipkinhadoopjobrunner",
id = "zipkin-hadoop-job-runner",
base = file("zipkin-hadoop-job-runner"),
settings = Project.defaultSettings ++
StandardProject.newSettings ++
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import collection.mutable.HashMap
import com.twitter.zipkin.gen
import java.util.Scanner

/**
* Basic client for postprocessing hadoop jobs
* @param combineSimilarNames whether or not we should differentiate between similar names
*/

abstract class HadoopJobClient(val combineSimilarNames: Boolean) {

protected val DELIMITER = ":"
val serviceNameList = if (combineSimilarNames) new StandardizedServiceNameList else new ServiceNameList

/**
* Populate the name list
* @param s Scanner representing the name list
*/

def populateServiceNameList(s: Scanner)

/**
* Returns the key value of a line
* @param lineScanner Scanner for a single line
* @return the key value returned
*/
def getKeyValue(lineScanner: Scanner): String

/**
* Process a key and its value
* @param s the key passed
* @param value values associated with the key
*/
def processKey(s: String, value: List[String])

/**
* Starts the postprocessing for the client
* @param filename the input filename
* @param output the output filename
*/
def start(filename : String, output : String)

/**
* Processes a single file, expected in TSV format, with key being the first value on each row
* @param s a Scanner representing a file
*/
def processFile(s: Scanner) {
println("Started processFile")
var serviceToValues = new HashMap[String, List[String]]()
while (s.hasNextLine()) {
val line = new Scanner(s.nextLine())
val currentString = getKeyValue(line).trim()
var value = ""
if (line.hasNext()) value = line.next()
while (line.hasNext()) {
value += " " + line.next()
}
val serviceName = serviceNameList.getName(currentString)
if (serviceToValues.contains(serviceName)) {
serviceToValues(serviceName) ::= value
} else {
serviceToValues += serviceName -> List(value)
}
}
println(serviceToValues)
for (t <- serviceToValues) {
val (service, values) = t
println(service + ", " + values)
processKey(service, values)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import java.util.Scanner
import com.twitter.zipkin.gen
import scala.collection.JavaConverters._

/**
* Client which writes to the server, and which writes per service
* @param portNumber the port number to write to
*/

abstract class PerServiceClient(combineSimilarNames: Boolean, portNumber: Int) extends
WriteToServerClient(combineSimilarNames, portNumber) {

def populateServiceNameList(s: Scanner) {
if (!combineSimilarNames) return
while (s.hasNextLine()) {
val line = new Scanner(s.nextLine())
serviceNameList.add(line.next())
}
}

def getKeyValue(lineScanner: Scanner) = {
lineScanner.next()
}
}

/**
* Connects to the Zipkin Collector, then processes data from PopularAnnotations and sends it there. This powers the
* typeahead functionality for annotations
*/

class PopularAnnotationsClient(portNumber: Int) extends PerServiceClient(false, portNumber) {

def processKey(service: String, values: List[String]) {
println("Writing " + values.mkString(", ") + " to " + service)
client.storeTopAnnotations(service, values.asJava)
}

}

/**
* Connects to the Zipkin Collector, then processes data from PopularKeys and sends it there. This powers the
* typeahead functionality for annotations
*/

class PopularKeyValuesClient(portNumber: Int) extends PerServiceClient(false, portNumber) {

def processKey(service: String, values: List[String]) {
client.storeTopKeyValueAnnotations(service, values.asJava)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import java.util.Scanner
import com.twitter.zipkin.gen

/**
* Client which writes to a server, and per service pair
* @param combineSimilarNames
* @param portNumber
*/
abstract class PerServicePairClient(combineSimilarNames: Boolean, portNumber: Int) extends
WriteToServerClient(combineSimilarNames, portNumber) {

def populateServiceNameList(s: Scanner) {
if (!combineSimilarNames) return
while (s.hasNextLine()) {
val line = new Scanner(s.nextLine())
serviceNameList.add(line.next() + DELIMITER + line.next())
}
}

def getKeyValue(lineScanner: Scanner) = {
lineScanner.next() + DELIMITER + lineScanner.next()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

//TODO: Replace (or supplement) this with one main method that runs all jobs

/**
* Runs the PopularKeysClient on the input
*/
object ProcessPopularKeys {
def main(args : Array[String]) {
val portNumber = augmentString(args(2)).toInt
val c = new PopularKeyValuesClient(portNumber)
c.start(args(0), args(1))
}
}

/**
* Runs the PopularAnnotationsClient on the input
*/

object ProcessPopularAnnotations {
def main(args : Array[String]) {
val portNumber = augmentString(args(2)).toInt
println("Arguments: " + args.mkString(", "))
val c = new PopularAnnotationsClient(portNumber)
c.start(args(0), args(1))
}
}


/**
* Runs the MemcacheRequestClient on the input
*/

object ProcessMemcacheRequest {
def main(args : Array[String]) {
val c = new MemcacheRequestClient()
c.start(args(0), args(1))
WriteToFileClient.closeAllWriters()
}
}


/**
* Runs the TimeoutsClient on the input
*/

object ProcessTimeouts {
def main(args : Array[String]) {
val c = new TimeoutsClient()
c.start(args(0), args(1))
WriteToFileClient.closeAllWriters()
}
}


/**
* Runs the ExpensiveEndpointsClient on the input
*/

object ProcessExpensiveEndpoints {

def main(args: Array[String]) {
val c = new ExpensiveEndpointsClient()
c.start(args(0), args(1))
WriteToFileClient.closeAllWriters()
}

}

Loading

0 comments on commit bc52d61

Please sign in to comment.