Skip to content

Commit

Permalink
Made preprocessing more complete
Browse files Browse the repository at this point in the history
Preprocessed : Now only merges spans
FindNames : Finds client side and service
names
FindIDtoNames : Finds (id, service name)
Other files modified to
accommodate these

Author: @jerryli9876
Fixes #54
URL: #54
  • Loading branch information
jerryli9876 committed Jun 29, 2012
1 parent ff71364 commit fad482d
Show file tree
Hide file tree
Showing 16 changed files with 210 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner._
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}
import sources.{PrepTsvSource, PreprocessedSpanSourceTest, PreprocessedSpanSource, Util}

/**
* Find out how often services call each other throughout the entire system
Expand All @@ -28,22 +28,17 @@ import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotati
class DependencyTree(args: Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.read
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name ) }

// TODO: account for possible differences between sent and received service names
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)

val idName = PrepTsvSource()
.read
/* Join with the original on parent ID to get the parent's service name */
val spanInfoWithParent = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.groupBy('service, 'name_1){ _.size('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import com.twitter.scalding._
import java.nio.ByteBuffer
import java.util.Arrays
import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource, Util}
import sources.{PrepNoNamesSpanSource, Util}

/**
* Find out how often each service does memcache accesses
*/
class MemcacheRequest(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PrepNoMergeSpanSource()
val preprocessed = PrepNoNamesSpanSource()
.read
.mapTo(0 -> ('annotations, 'binary_annotations))
{ s: Span => (s.annotations.toList, s.binary_annotations.toList) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import com.twitter.zipkin.gen.{SpanServiceName}
import sources.{PreprocessedSpanSource, Util}
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* For each service finds the services that it most commonly calls
Expand All @@ -31,17 +31,13 @@ class MostCommonCalls(args : Args) extends Job(args) with DefaultDateRangeJob {
.mapTo(0 -> ('id, 'parent_id, 'cService, 'service))
{ s: SpanServiceName => (s.id, s.parent_id, s.client_service, s.service_name) }

val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)
val idName = PrepTsvSource()
.read

val result = spanInfo
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.groupBy('service, 'parentService){ _.size('count) }
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.groupBy('service, 'name_1){ _.size('count) }
.groupBy('service){ _.sortBy('count) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import cascading.pipe.joiner.LeftJoin
import sources.{PreprocessedSpanSource, Util}
import com.twitter.zipkin.gen.{SpanServiceName, Annotation}
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Find which services timeout the most
Expand All @@ -43,20 +43,16 @@ class Timeouts(args: Args) extends Job(args) with DefaultDateRangeJob {


// Project to (id, service name)
val idName = spanInfo
.project('id, 'service)
.filter('service) {n : String => n != null }
.unique('id, 'service)
.rename('id, 'id1)
.rename('service, 'parentService)
val idName = PrepTsvSource()
.read

// Left join with idName to find the parent's service name, if applicable
val result = spanInfo
.filter('annotations){annotations : List[Annotation] => annotations.exists({a : Annotation => a.value == input})}
.project('id, 'parent_id, 'cService, 'service)
.joinWithSmaller('parent_id -> 'id1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'parentService) -> 'parentService){ Util.getBestClientSideName }
.project('service, 'parentService)
.groupBy('service, 'parentService){ _.size('numTimeouts) }
.joinWithSmaller('parent_id -> 'id_1, idName, joiner = new LeftJoin)
.map(('parent_id, 'cService, 'name_1) -> 'name_1){ Util.getBestClientSideName }
.project('service, 'name_1)
.groupBy('service, 'name_1){ _.size('numTimeouts) }
.write(Tsv(args("output")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.hadoop

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Span, Constants, Annotation}
import sources.{PrepNoMergeSpanSource}
import sources.{PrepNoNamesSpanSource}

/**
* Obtain the IDs and the durations of the one hundred service calls which take the longest per service
Expand All @@ -28,7 +28,7 @@ class WorstRuntimes(args: Args) extends Job(args) with DefaultDateRangeJob {

val clientAnnotations = Seq(Constants.CLIENT_RECV, Constants.CLIENT_SEND)

val preprocessed = PrepNoMergeSpanSource()
val preprocessed = PrepNoNamesSpanSource()
.read
.mapTo(0 -> ('id, 'annotations)) {
s : Span => (s.id, s.annotations.toList)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.sources

import com.twitter.scalding.{DefaultDateRangeJob, Job, Args}
import com.twitter.zipkin.gen.SpanServiceName

/**
* Finds the mapping from span ID to service name
*/

class FindIDtoName(args : Args) extends Job(args) with DefaultDateRangeJob {

val spanInfo = PreprocessedSpanSource()
.read
.mapTo(0 -> ('id_1, 'name_1))
{ s: SpanServiceName => (s.id, s.service_name ) }
.filter('name_1) {n : String => n != null }
.unique('id_1, 'name_1)
.write(PrepTsvSource())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.zipkin.hadoop.sources

import com.twitter.scalding._
import com.twitter.zipkin.gen.{Annotation, BinaryAnnotation, Span, SpanServiceName}
import scala.collection.JavaConverters._

/**
* Finds the best client side and service names for each span, if any exist
*/

class FindNames(args : Args) extends Job(args) with DefaultDateRangeJob {

val preprocessed = PrepNoNamesSpanSource()
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}

val findNames = preprocessed
.flatMap('annotations -> ('cService, 'service)) { Util.getClientAndServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'cService, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String, String) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations, cService, service) =>
new SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, cService, service).setParent_id(pid)
}
}.write(PreprocessedSpanSource())


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package com.twitter.zipkin.hadoop.sources

import com.twitter.zipkin.gen.{BinaryAnnotation, Span, SpanServiceName, Annotation}
import com.twitter.zipkin.gen.{BinaryAnnotation, Span, Annotation}
import com.twitter.scalding._
import com.twitter.zipkin.gen
import scala.collection.JavaConverters._

/**
* Preprocesses the data by merging different pieces of the same span and finds the best client side
* and service names possible, if any exist
* Preprocesses the data by merging different pieces of the same span
*/
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = SpanSource()
// val preprocessed = FixedSpanSource("file.lzo")
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
Expand All @@ -38,15 +38,13 @@ class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
(left._1 ++ right._1, left._2 ++ right._2)
}
}
.flatMap('annotations -> ('cService, 'service)) { Util.getClientAndServiceName }
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations, 'cService, 'service) -> 'spanWithServiceNames) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation], String, String) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations, cService, service) =>
{
val s = new gen.SpanServiceName(tid, name, id, annotations.asJava, binary_annotations.asJava, cService, service)
s.setParent_id(pid)
}
}
}.write(PreprocessedSpanSource())

val onlyMerge = preprocessed
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation]) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations) =>
new gen.Span(tid, name, id, annotations.asJava, binary_annotations.asJava).setParent_id(pid)
}
}.write(PrepNoNamesSpanSource())
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ abstract class HourlySuffixLzoThrift[T <: TBase[_,_] : Manifest](prefix : String
abstract class HourlySuffixSource(prefixTemplate : String, dateRange : DateRange) extends
TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY_HOUR + "/*", dateRange, DateOps.UTC)

abstract class DailySuffixSource(prefixTemplate : String, dateRange : DateRange) extends
TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY + "/*", dateRange, DateOps.UTC)

trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] {
def column: Class[_]
Expand All @@ -54,18 +56,63 @@ trait LzoThrift[T <: TBase[_, _]] extends Mappable[T] {
}

/**
* Ensures that a _SUCCESS file is present in the Source path.
*/
trait SuccessFileSource extends FileSource {
override protected def pathIsGood(p : String, conf : Configuration) = {
val path = new Path(p)
Option(path.getFileSystem(conf).globStatus(path)).
map{ statuses : Array[FileStatus] =>
// Must have a file that is called "_SUCCESS"
statuses.exists { fs : FileStatus =>
fs.getPath.getName == "_SUCCESS"
}
}.
getOrElse(false)
}
}

trait LzoTsv extends DelimitedScheme {
// TODO: This doesn't work locally
override def localScheme = {new TextDelimited(fields, separator, types) }
override def hdfsScheme = HadoopSchemeInstance(new LzoTextDelimited(fields, separator, types))
}

/**
* This is the source for trace data. Directories are like so: /logs/zipkin/yyyy/mm/dd/hh
*/
case class SpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("/logs/zipkin/", dateRange)

case class SpanSource1(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("good_data", dateRange)

case class FixedSpanSource(p : String) extends FixedPathSource(p) with LzoThrift[Span] {
def column = classOf[Span]
}

/**
* This is the source for trace data that has been merged. Directories are like in SpanSource
*/
case class PrepNoMergeSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("test", dateRange)
case class PrepNoNamesSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("test", dateRange)

/**
* This is the source for trace data that has been merged and for which we've found
* the best possible client side and service names. Directories are like in SpanSource
*/
case class PreprocessedSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("testagain", dateRange)

case class PreprocessedSpanSourceTest(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("testalpha", dateRange)

/**
* This is the source for data of the form (id, service name)
*/

case class PrepTsvSource()(implicit dateRange : DateRange)
extends DailySuffixSource("id_names", dateRange)
with LzoTsv
with Mappable[(Long, String)]
with SuccessFileSource {
override val fields = new Fields("id_1", "name_1")
override val types : Array[Class[_]] = Array(classOf[Long], classOf[String])
override val columnNums = (0 until types.size)
}

Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ object Util {
((0 to count).toSeq map { i: Int => span.deepCopy().setId(i + offset).setParent_id(i + parentOffset) -> (i + offset)}).toList
}

/**
* Given a list of spans and their IDs, creates a list of (id, service name)
* @param spans a list of (Span, SpanID)
* @return a list of (SpanID, service name)
*/
def getSpanIDtoNames(spans : List[(gen.SpanServiceName, Int)]) : List[(Long, String)] = {
spans.map { s : (gen.SpanServiceName, Int) => {
val (span, _) = s
(span.id, span.service_name)
}
}
}

/**
* Given two strings, finds the minimum edit distance between them given only substitutions, deletions, and additions with the
* Levenshtein algorithm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.scalding._
import gen.AnnotationType
import scala.collection.JavaConverters._
import collection.mutable.HashMap
import sources.{PreprocessedSpanSource, Util}
import sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Tests that MostCommonCalls finds the most commonly called services per service
Expand All @@ -46,14 +46,16 @@ class CommonServiceCallsSpec extends Specification with TupleConversions {
List(new gen.Annotation(1000, "cs").setHost(endpoint2), new gen.Annotation(3000, "cr").setHost(endpoint2)).asJava,
List(new gen.BinaryAnnotation("bye", null, AnnotationType.BOOL)).asJava, "service2", "service2")

val spans = (Util.repeatSpan(span, 30, 32, 1) ++ Util.repeatSpan(span1, 50, 100, 32))

"MostCommonCalls" should {
"Return the most common service calls" in {
JobTest("com.twitter.zipkin.hadoop.MostCommonCalls").
arg("input", "inputFile").
arg("output", "outputFile").
arg("date", "2012-01-01T01:00").
source(PreprocessedSpanSource(), (Util.repeatSpan(span, 30, 32, 1) ++ Util.repeatSpan(span1, 50, 100, 32))).
source(PreprocessedSpanSource(), spans).
source(PrepTsvSource(), Util.getSpanIDtoNames(spans)).
sink[(String, String, Long)](Tsv("outputFile")) {
val result = new HashMap[String, Long]()
result("service, Unknown Service Name") = 0
Expand Down
Loading

0 comments on commit fad482d

Please sign in to comment.