Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop jobs #47

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
47509e8
Adding a few hadoop jobs
jerryli9876 Jun 14, 2012
bb06b90
Merge branch 'master' of https://github.com/twitter/zipkin into hadoo…
jerryli9876 Jun 14, 2012
d4035e6
Merge branch 'master' of https://github.com/twitter/zipkin into hadoo…
jerryli9876 Jun 14, 2012
897b4ed
Work in progress
jerryli9876 Jun 19, 2012
921f00b
Don't run tests in parallel
johanoskarsson Jun 20, 2012
0ae4151
Added scalding queries and their test files
jerryli9876 Jun 26, 2012
4d0a21c
Added Util.scala
jerryli9876 Jun 26, 2012
9965fe7
Modified Util.scala
jerryli9876 Jun 26, 2012
0dfea64
Added DependencyTree and its test file
jerryli9876 Jun 26, 2012
500bbac
Merge branch 'master' of github.com:twitter/zipkin into hadoop_jobs
johanoskarsson Jun 26, 2012
85043e9
Added javadocs for specs and used preprocessing unit
jerryli9876 Jun 27, 2012
17934de
Merge branch 'hadoop_jobs' of https://github.com/twitter/zipkin into …
jerryli9876 Jun 27, 2012
6a53091
Merge branch 'master' of https://github.com/twitter/zipkin into hadoo…
jerryli9876 Jun 27, 2012
aefbc1e
removed old files
jerryli9876 Jun 27, 2012
e8048a5
Added full preprocessing and removed some incomplete and test files
jerryli9876 Jun 28, 2012
08cdc49
Removed printlns from tests, added UtilSpec, other minor changes
jerryli9876 Jun 28, 2012
38788c7
Added missing comments and preliminary edit distance stuff
jerryli9876 Jun 28, 2012
3bc3390
Removed incomplete stuff
jerryli9876 Jun 28, 2012
8070184
Timeouts accepts a command line argument determining type of error
jerryli9876 Jun 28, 2012
dac24b7
modified TimeoutSpec
jerryli9876 Jun 28, 2012
0f6720d
Indentation fixes, added comments and other minor changes
jerryli9876 Jun 28, 2012
0143a18
Removed bad import statements
jerryli9876 Jun 28, 2012
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Zipkin extends Build {

name := "zipkin-hadoop",
version := "0.2.0-SNAPSHOT",
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.twitter" % "scalding_2.9.1" % "0.5.3",
"com.twitter.elephantbird" % "elephant-bird-cascading2" % "3.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner._
import sources.{PreprocessedSpanSource, PrepSpanSource, Util}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting this now:
[error] /Users/johan/Dev/zipkin/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/DependencyTree.scala:21: PrepSpanSource is not a member of sources
[error] import sources.{PreprocessedSpanSource, PrepSpanSource, Util}

import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}

/**
* Find out how often services call each other throughout the entire system
*/

class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name ) }

// TODO: account for possible differences between sent and received service names
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource, Util}

/**
* Find out how often each service does memcache accesses
*/
class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PrepNoMergeSpanSource()
.read
.mapTo(0 -> ('annotations, 'binary_annotations))
{ s: Span => (s.annotations.toList, s.binary_annotations.toList) }


val result = preprocessed
// from the annotations, find the service name
.flatMap(('annotations, 'binary_annotations) -> ('service, 'memcacheNames)){ abl : (List[Annotation], List[BinaryAnnotation]) =>
var clientSent: Option[Annotation] = None
abl match { case (al, bl) =>
al.foreach { a : Annotation =>
if (Constants.CLIENT_SEND.equals(a.getValue)) clientSent = Some(a)
}
// from the binary annotations, find the value of the memcache visits if there are any
var memcachedKeys : Option[BinaryAnnotation] = None
bl.foreach { ba : BinaryAnnotation => if (ba.key == "memcached.keys") memcachedKeys = Some(ba) }
for (cs <- clientSent; key <- memcachedKeys)
yield (cs.getHost.service_name, new String(Util.getArrayFromBuffer(key.value)))
}
}
.project('service, 'memcacheNames)
.groupBy('service, 'memcacheNames){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import com.twitter.zipkin.gen.{SpanServiceName}
import sources.{PreprocessedSpanSource, Util}

/**
* For each service finds the services that it most commonly calls
*/

class MostCommonCalls(args : Args) extends Job(args) with DefaultDateRangeJob {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here describing what the job does. Also add Apache header.

val spanInfo = PreprocessedSpanSource()
.read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this use the preprocessing util?

.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name) }

val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

val result = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop


import com.twitter.scalding._
import sources.{PreprocessedSpanSource, PrepSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span}

/**
* Per service, find the 100 most common keys used to annotate spans involving that service
*/
class PopularKeys(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PreprocessedSpanSource()
.read
.mapTo(0 -> ('service, 'binary_annotations))
{ s: SpanServiceName => (s.service_name, s.binary_annotations.toList) }


val result = preprocessed
.filter('binary_annotations){ ba : List[BinaryAnnotation] => (ba != null) && (ba.size > 0) }
.flatMap('binary_annotations -> 'key) { ba : List[BinaryAnnotation] => ba.map{b: BinaryAnnotation => b.key} }
.groupBy('service, 'key){ _.size('keyCount) }
.groupBy('service) { _.sortBy('keyCount).reverse.take(100) }
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}

/**
* Find which services timeout the most
*/

class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {

// TODO: Support retry as well in a way that doesn't involve messing with the code
val ERROR_TYPE = List("finagle.timeout", "finagle.retry")

val input = args.required("error_type")
if (!ERROR_TYPE.contains(input)) {
throw new IllegalArgumentException("Invalid error type : " + input)
}

// Preprocess the data into (trace_id, id, parent_id, annotations, client service name, service name)
val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id, 'parent_id, 'annotations, 'cService, 'service) )
{ s: SpanServiceName => (s.id, s.parent_id, s.annotations.toList, s.client_service, s.service_name) }


// Project to (id, service name)
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

// Left join with idName to find the parent's service name, if applicable
val result = spanInfo
.filter('annotations){annotations : List[Annotation] => annotations.exists({a : Annotation => a.value == input})}
.project('id, 'parent_id, 'cService, 'service)
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.project('service, 'parentService)
.groupBy('service, 'parentService){ _.size('numTimeouts) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
*/

class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {

val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)

val preprocessed = PrepNoMergeSpanSource()
.read
.mapTo(0 -> ('id, 'annotations)) {
s : Span => (s.id, s.annotations.toList)
}

val result = preprocessed
.project('id, 'annotations)
// let's find those client annotations and convert into service name and duration
.flatMap('annotations -> ('service, 'duration)) { annotations: List[Annotation] =>
var clientSend: Option[Annotation] = None
var clientReceived: Option[Annotation] = None
annotations.foreach { a =>
if (Constants.CLIENT_SEND.equals(a.getValue)) clientSend = Some(a)
if (Constants.CLIENT_RECV.equals(a.getValue)) clientReceived = Some(a)
}
// only return a value if we have both annotations
for (cs <- clientSend; cr <- clientReceived)
yield (cs.getHost.service_name, (cr.timestamp - cs.timestamp) / 1000)
}.discard('annotations)
//sort by duration, find the 100 largest
.groupBy('service) { _.sortBy('duration).reverse.take(100)}
.write(Tsv(args("output")))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.twitter.zipkin.hadoop.sources

import com.twitter.zipkin.gen.{BinaryAnnotation, Span, SpanServiceName, Annotation}
import com.twitter.scalding._
import com.twitter.zipkin.gen
import scala.collection.JavaConverters._

/**
* Preprocesses the data by merging different pieces of the same span and finds the best client side
* and service names possible, if any exist
*/
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here describing what it does

val preprocessed = SpanSource()
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}
.groupBy('trace_id, 'name, 'id, 'parent_id) {
_.reduce('annotations, 'binary_annotations) {
(left: (List[Annotation], List[BinaryAnnotation]), right: (List[Annotation], List[BinaryAnnotation])) =>
(left._1 ++ right._1, left._2 ++ right._2)
}
}
.flatMap('annotations -> ('cService, 'service)) { Util.getClientAndServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'cService, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String, String) =>
a match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this match needed? I don't know if Scalding supports it but normally you can do things like:

collection foreach { case (traceId: Long, spanId: Long) => println(traceId) }

or something similar.

case (tid, name, id, pid, annotations, binary_annotations, cService, service) =>
{
val s = new gen.SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, cService, service)
s.setParent_id(pid)
}
}
}.write(PreprocessedSpanSource())
}
Loading