Skip to content

Commit

Permalink
Hadoop jobs
Browse files Browse the repository at this point in the history
Added a bunch of scalding queries and tests

DependencyTree : Finds out how often services call each other throughout the entire system

MemcacheRequest : Find out how often each service does memcache accesses

MostCommonCalls : For each service finds the services that it most commonly calls

PopularKeys : Per service, find the 100 most common keys used to annotate spans involving that service

Timeouts : Find which service calls timeout the most

WorstRuntimes : Obtain the IDs and the durations of the one hundred service calls which take the longest per service

sources/Preprocessed : Preprocesses the data by merging different pieces of the same span and finds the best client side and service names possible, if any exist

sources/Util : Added a collection of useful functions throughout the library

Author: @jerryli9876
Fixes #47
URL: #47
  • Loading branch information
jerryli9876 committed Jun 28, 2012
1 parent dd9ca09 commit c169c8c
Show file tree
Hide file tree
Showing 19 changed files with 1,103 additions and 13 deletions.
1 change: 1 addition & 0 deletions project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Zipkin extends Build {

name := "zipkin-hadoop",
version := "0.2.0-SNAPSHOT",
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.twitter" % "scalding_2.9.1" % "0.5.3",
"com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner._
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}

/**
* Find out how often services call each other throughout the entire system
*/

class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name ) }

// TODO: account for possible differences between sent and received service names
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource, Util}

/**
* Find out how often each service does memcache accesses
*/
class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PrepNoMergeSpanSource()
.read
.mapTo(0 -> ('annotations, 'binary_annotations))
{ s: Span => (s.annotations.toList, s.binary_annotations.toList) }


val result = preprocessed
// from the annotations, find the service name
.flatMap(('annotations, 'binary_annotations) -> ('service, 'memcacheNames)){ abl : (List[Annotation], List[BinaryAnnotation]) =>
var clientSent: Option[Annotation] = None
abl match { case (al, bl) =>
al.foreach { a : Annotation =>
if (Constants.CLIENT_SEND.equals(a.getValue)) clientSent = Some(a)
}
// from the binary annotations, find the value of the memcache visits if there are any
var memcachedKeys : Option[BinaryAnnotation] = None
bl.foreach { ba : BinaryAnnotation => if (ba.key == "memcached.keys") memcachedKeys = Some(ba) }
for (cs <- clientSent; key <- memcachedKeys)
yield (cs.getHost.service_name, new String(Util.getArrayFromBuffer(key.value)))
}
}
.project('service, 'memcacheNames)
.groupBy('service, 'memcacheNames){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import com.twitter.zipkin.gen.{SpanServiceName}
import sources.{PreprocessedSpanSource, Util}

/**
* For each service finds the services that it most commonly calls
*/

class MostCommonCalls(args : Args) extends Job(args) with DefaultDateRangeJob {
val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name) }

val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

val result = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span}

/**
* Per service, find the 100 most common keys used to annotate spans involving that service
*/
class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PreprocessedSpanSource()
.read
.mapTo(0 -> ('service, 'binary_annotations))
{ s: SpanServiceName => (s.service_name, s.binary_annotations.toList) }


val result = preprocessed
.filter('binary_annotations){ ba : List[BinaryAnnotation] => (ba != null) && (ba.size > 0) }
.flatMap('binary_annotations -> 'key) { ba : List[BinaryAnnotation] => ba.map{b: BinaryAnnotation => b.key} }
.groupBy('service, 'key){ _.size('keyCount) }
.groupBy('service) { _.sortBy('keyCount).reverse.take(100) }
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}

/**
* Find which services timeout the most
*/

class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {

// TODO: Support retry as well in a way that doesn't involve messing with the code
val ERROR_TYPE = List("finagle.timeout", "finagle.retry")

val input = args.required("error_type")
if (!ERROR_TYPE.contains(input)) {
throw new IllegalArgumentException("Invalid error type : " + input)
}

// Preprocess the data into (trace_id, id, parent_id, annotations, client service name, service name)
val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'annotations, 'cService, 'service) )
{ s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.client_service, s.service_name) }


// Project to (id, service name)
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

// Left join with idName to find the parent's service name, if applicable
val result = spanInfo
.filter('annotations){annotations : List[Annotation] => annotations.exists({a : Annotation => a.value == input})}
.project('id, 'parent_id, 'cService, 'service)
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.project('service, 'parentService)
.groupBy('service, 'parentService){ _.size('numTimeouts) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
*/

class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {

val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)

val preprocessed = PrepNoMergeSpanSource()
.read
.mapTo(0 -> ('id, 'annotations)) {
s : Span => (s.id, s.annotations.toList)
}

val result = preprocessed
.project('id, 'annotations)
// let's find those client annotations and convert into service name and duration
.flatMap('annotations -> ('service, 'duration)) { annotations: List[Annotation] =>
var clientSend: Option[Annotation] = None
var clientReceived: Option[Annotation] = None
annotations.foreach { a =>
if (Constants.CLIENT_SEND.equals(a.getValue)) clientSend = Some(a)
if (Constants.CLIENT_RECV.equals(a.getValue)) clientReceived = Some(a)
}
// only return a value if we have both annotations
for (cs <- clientSend; cr <- clientReceived)
yield (cs.getHost.service_name, (cr.timestamp - cs.timestamp) / 1000)
}.discard('annotations)
//sort by duration, find the 100 largest
.groupBy('service) { _.sortBy('duration).reverse.take(100)}
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.twitter.zipkin.hadoop.sources

import com.twitter.zipkin.gen.{BinaryAnnotation, Span, SpanServiceName, Annotation}
import com.twitter.scalding._
import com.twitter.zipkin.gen
import scala.collection.JavaConverters._

/**
* Preprocesses the data by merging different pieces of the same span and finds the best client side
* and service names possible, if any exist
*/
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = SpanSource()
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}
.groupBy('trace_id, 'name, 'id, 'parent_id) {
_.reduce('annotations, 'binary_annotations) {
(left: (List[Annotation], List[BinaryAnnotation]), right: (List[Annotation], List[BinaryAnnotation])) =>
(left._1 ++ right._1, left._2 ++ right._2)
}
}
.flatMap('annotations -> ('cService, 'service)) { Util.getClientAndServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'cService, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String, String) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations, cService, service) =>
{
val s = new gen.SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, cService, service)
s.setParent_id(pid)
}
}
}.write(PreprocessedSpanSource())
}
Loading

0 comments on commit c169c8c

Please sign in to comment.