Skip to content

Commit

Permalink
Add dependency graph endpoints
Browse files Browse the repository at this point in the history
Part 1 redux of several pull requests that will make up the dependency graph.

Fixed some minor issues with the hadoop scripts, and with email.mustache.
Added dependency graph endpoints on the server side.

Author: @jerryli9876
Fixes #128
URL: #128
  • Loading branch information
jerryli9876 committed Aug 28, 2012
1 parent 2cc3378 commit cf30f5a
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 104 deletions.
7 changes: 4 additions & 3 deletions zipkin-hadoop-job-runner/src/main/resources/email.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html>
<head>
<h2>Service Report for {{serviceName}}</h2>
<h2>Service Report for {{standardServiceName}}</h2>
</head>
{{/header}}

{{#body}}
<body>
{{#services}}
<h3>{{serviceName}}</h3>
{{#oneLineResults}}
<p>{{result}}</p> <br />
{{/oneLineResults}}
Expand Down Expand Up @@ -45,8 +46,8 @@
{{/tableUrlRows}}
</table>
{{/tableResults}}
{{/services}}
</body>
{{/body}}
</html>
{{/html}}

Expand Down
46 changes: 17 additions & 29 deletions zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ def self.parse(args)
end
end

options = OptparseHasSampledArguments.parse(ARGV)

$config = {
:zipkin_query_host => "localhost", #whatever the collector is
:zipkin_query_port => 9411,
:skip_zookeeper => true
}

# Queries the client to ask whether the trace is stored
def sampled_traces(trace_ids)
result = false
traces = nil
Expand All @@ -80,39 +79,28 @@ def sampled_traces(trace_ids)
return traces
end

# Gets trace id from input line
def get_trace_id(line)
return line.split("\t")[1].to_i
end

File.open(options.output, 'w') do |out_file|
trace_list = []
File.open(options.input, 'r').each do |line|
trace_list = trace_list << get_trace_id(line)
end
sampled = sampled_traces(trace_list)
File.open(options.input, 'r').each do |line|
if (sampled.include?(get_trace_id(line)))
out_file.print line
puts line
# Reads the input from the first file, and then outputs the lines from the input which are sampled to the outputfile
def sample(inputfile, outputfile)
File.open(outputfile, 'w') do |out_file|
trace_list = []
File.open(inputfile, 'r').each do |line|
trace_list = trace_list << get_trace_id(line)
end
end
end

=begin
h = Hash.new
File.open(options.input, 'r').each do |line|
ary = line.split("\t")
if h[ary[0]] == nil
h[ary[0]] = Array.new(1, ary[1].to_i)
else
h[ary[0]] = h[ary[0]] << ary[1].to_i
sampled = sampled_traces(trace_list)
File.open(inputfile, 'r').each do |line|
if (sampled.include?(get_trace_id(line)))
out_file.print line
end
end
end
end

ary = Array.new()
h.each do |service, traces|
p sampled_traces(traces)
if __FILE__ == $0
options = OptparseHasSampledArguments.parse(ARGV)
sample(options.input, options.output)
end
=end
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import scala.collection.JavaConverters._
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = SpanSource()
.read
.mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
.mapTo(0 ->('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
s: Span => (s.trace_id, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}
.groupBy('trace_id, 'name, 'id, 'parent_id) {
_.reduce('annotations, 'binary_annotations) {
Expand All @@ -39,11 +39,11 @@ class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
}

val onlyMerge = preprocessed
.mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) {
a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation]) =>
.mapTo(('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) {
a : (Long, Long, Long, List[Annotation], List[BinaryAnnotation]) =>
a match {
case (tid, name, id, pid, annotations, binary_annotations) =>
new gen.Span(tid, name, id, annotations.asJava, binary_annotations.asJava).setParent_id(pid)
case (tid, id, pid, annotations, binary_annotations) =>
new gen.Span(tid, "", id, annotations.asJava, binary_annotations.asJava).setParent_id(pid)
}
}.write(PrepNoNamesSpanSource())
}
76 changes: 37 additions & 39 deletions zipkin-hadoop/src/scripts/run_all_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bash/env ruby
#!/usr/bin/env ruby
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,6 +19,7 @@
require 'ostruct'
require 'pp'
require 'date'
require 'run_job.rb'

class OptparseAllJobArguments

Expand All @@ -40,6 +41,7 @@ def self.parse(args)
opts.separator "Specific options:"

opts.on("-d", "--date STARTDATE,ENDDATE", Array, "The DATES to run the jobs over. Expected format for dates are is %Y-%m-%dT%H:%M") do |list|
options.dates = list.map{|date| DateTime.strptime(date, '%Y-%m-%dT%H:%M')}
options.dates = list
end

Expand All @@ -65,51 +67,47 @@ def self.parse(args)
end
end

options = OptparseAllJobArguments.parse(ARGV)

date_cmd_UTC = options.dates.join(" ") + " -t UTC"
end_date_cmd = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
end_date_cmd_UTC = end_date_cmd + " -t UTC"
config_string = options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
run_job_cmd = "ruby " + File.dirname(__FILE__) + "/run_job.rb" + config_string

puts "Run_job_command = " + run_job_cmd
puts "Date = " + date_cmd_UTC
#Run the commands

def run_jobs_in_parallel(prep, jobs)
threads = []
prepThread = Thread.new(prep) { |prep| system(prep) }
prepThread = Thread.new(prep) { |prep| run_job(prep) }
for job in jobs
threads << Thread.new(job) { |job| system(job) }
threads << Thread.new(job) { |job| run_job(job) }
end
prepThread.join
return threads
# threads.each { |aThread| aThread.join }
end

#Run the commands
system(run_job_cmd + " -j Preprocessed -p -d " + date_cmd_UTC)

jobs_set_1 = Array[ ]

jobs_set_2 = Array[ run_job_cmd + " -j PopularKeys -o " + options.output + "/PopularKeys -d " + end_date_cmd_UTC,
run_job_cmd + " -j PopularAnnotations -o " + options.output + "/PopularAnnotations -d " + end_date_cmd_UTC,
run_job_cmd + " -j MemcacheRequest -o " + options.output + "/MemcacheRequest -d " + end_date_cmd_UTC,
run_job_cmd + " -j WorstRuntimes -o " + options.output + "/WorstRuntimes -d " + end_date_cmd_UTC,
run_job_cmd + " -j WorstRuntimesPerTrace -o " + options.output + "/WorstRuntimesPerTrace -d " + end_date_cmd_UTC,
run_job_cmd + " -j WhaleReport -o " + options.output + "/WhaleReport -d " + end_date_cmd_UTC
]

jobs_set_3 = Array[ run_job_cmd + " -j DependencyTree -o " + options.output + "/DependencyTree -d " + end_date_cmd_UTC,
run_job_cmd + " -j ExpensiveEndpoints -o " + options.output + "/ExpensiveEndpoints -d " + end_date_cmd_UTC,
run_job_cmd + " -j Timeouts -s \" --error_type finagle.timeout \" -o " + options.output + "/Timeouts -d " + end_date_cmd_UTC,
run_job_cmd + " -j Timeouts -s \" --error_type finagle.retry \" -o " + options.output + "/Retries -d " + end_date_cmd_UTC ]

jobs = run_jobs_in_parallel(run_job_cmd + " -j FindNames -p -d " + end_date_cmd_UTC, jobs_set_1)
jobs += run_jobs_in_parallel(run_job_cmd + " -j FindIDtoName -p -d " + end_date_cmd_UTC, jobs_set_2)
jobs += run_jobs_in_parallel("", jobs_set_3)

jobs.each { |aThread| aThread.join }

puts "All jobs finished!"
def run_all_jobs(dates, output, hadoop_config)
uses_hadoop_config = hadoop_config != nil
config_string = uses_hadoop_config ? " --config " + hadoop_config : nil

jobs_set_1 = []

jobs_set_2 = Array[JobArguments.new(dates, nil, config_string, "UTC", "PopularKeys", output + "/PopularKeys", false),
JobArguments.new(dates, nil, config_string, "UTC", "PopularAnnotations", output + "/PopularAnnotations", false),
JobArguments.new(dates, nil, config_string, "UTC", "MemcacheRequest", output + "/MemcacheRequest",false),
JobArguments.new(dates, nil, config_string, "UTC", "WorstRuntimes", output + "/WorstRuntimes", false),
JobArguments.new(dates, nil, config_string, "UTC", "WorstRuntimesPerTrace", output + "/WorstRuntimesPerTrace", false),
JobArguments.new(dates, nil, config_string, "UTC", "WhaleReport", output + "/WhaleReport", false)]

jobs_set_3 = Array[JobArguments.new(dates, nil, config_string, "UTC", "DependencyTree", output + "/DependencyTree", false),
JobArguments.new(dates, nil, config_string, "UTC", "ExpensiveEndpoints", output + "/ExpensiveEndpoints", false),
JobArguments.new(dates, "\" --error_type finagle.timeout \"", config_string, "UTC", "Timeouts", output + "/Timeouts", false),
JobArguments.new(dates, " \" --error_type finagle.retry \"", config_string, "UTC", "Timeouts", output + "/Retries", false),]

run_job(JobArguments.new(dates, nil, config_string, "UTC", "Preprocessed", nil, true))
jobs = run_jobs_in_parallel(JobArguments.new(dates, nil, config_string, "UTC", "FindNames", nil, true), jobs_set_1)
jobs += run_jobs_in_parallel(JobArguments.new(dates, nil, config_string, "UTC", "FindIDtoName", nil, true), jobs_set_2)
jobs += run_jobs_in_parallel(nil, jobs_set_3)

jobs.each { |aThread| aThread.join }

puts "All jobs finished!"
end

if __FILE__ == $0
options = OptparseAllJobArguments.parse(ARGV)
run_all_jobs(options.dates, options.output, options.hadoop_config)
end
73 changes: 51 additions & 22 deletions zipkin-hadoop/src/scripts/run_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,21 @@ def self.parse(args)
end
end

options = OptparseJobArguments.parse(ARGV)
start_date = options.dates.at(0)
end_date = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
class JobArguments
attr_accessor :dates, :settings, :hadoop_config, :timezone, :job, :output, :prep

def initialize(dates, settings, hadoop_config, timezone, job, output, prep)
@dates = dates
@settings = settings
@hadoop_config = hadoop_config
@timezone = timezone
@job = job
@output = output
@prep = prep
end

end


def time_to_remote_file(time, prefix)
return prefix + time.year.to_s() + "/" + append_zero(time.month) + "/" + append_zero(time.day) + "/" + append_zero(time.hour)
Expand All @@ -113,34 +125,51 @@ def is_hadoop_local_machine?()
return system("hadoop dfs -test -e .")
end

def remote_file_exists?(pathname, options)
def remote_file_exists?(pathname, hadoop_config)
cmd = is_hadoop_local_machine?() ? "" : "ssh -C " + $HOST + " "
cmd += "hadoop"
cmd += options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
cmd += "hadoop "
cmd += (hadoop_config == nil) ? "" : hadoop_config
cmd += " dfs -test -e " + pathname
result = system(cmd)
puts "In run_job, remote_file_exists for " + pathname + ": " + result.to_s()
return result
end

def date_to_cmd(date)
return date.to_s()[0..18]
def date_to_cmd(time)
return time.year.to_s() + "-" + append_zero(time.month) + "-" + append_zero(time.day) + "T" + append_zero(time.hour) + ":00"
end

cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
settings_string = options.uses_settings ? " " + options.settings : ""
cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
timezone_cmd = options.set_timezone ? " --tz " + options.timezone : ""
cmd_args = options.job + settings_string + " --date " + cmd_date + timezone_cmd

if options.preprocessor
if not remote_file_exists?(time_to_remote_file(end_date, options.job + "/") + "/_SUCCESS", options)
cmd = cmd_head + "sources." + cmd_args
system(cmd)
def run_job(args)
if (args == nil)
return
end
else
if not remote_file_exists?(options.output + "/_SUCCESS", options)
cmd = cmd_head + cmd_args + " --output " + options.output
system(cmd)
uses_settings = args.settings != nil
set_timezone = args.timezone != nil
start_date = args.dates.at(0)
end_date = args.dates.length > 1 ? args.dates.at(1) : args.dates.at(0)
hadoop_config_cmd = (args.hadoop_config == nil) ? "" : args.hadoop_config
cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
settings_string = uses_settings ? " " + args.settings : ""
cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
timezone_cmd = set_timezone ? " --tz " + args.timezone : ""
cmd_args = args.job + settings_string + " " + hadoop_config_cmd + " --date " + cmd_date + timezone_cmd

if args.prep
if not remote_file_exists?(time_to_remote_file(end_date, args.job + "/") + "/_SUCCESS", args.hadoop_config)
cmd = cmd_head + "sources." + cmd_args
puts cmd
system(cmd)
end
else
if not remote_file_exists?(args.output + "/_SUCCESS", args.hadoop_config)
cmd = cmd_head + cmd_args + " --output " + args.output
puts cmd
system(cmd)
end
end
end

if __FILE__ == $0
options = OptparseJobArguments.parse(ARGV)
run_job(options.dates, options.settings, options.hadoop_config, options.timezone, options.job, options.output, options.preprocessor)
end
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@ trait CassandraAggregatesConfig extends AggregatesConfig { self =>

def cassandraConfig: CassandraConfig
var topAnnotationsCf: String = "TopAnnotations"
var dependenciesCf: String = "Dependencies"

def apply(): CassandraAggregates = {
val _topAnnotations = cassandraConfig.keyspace.columnFamily[String, Long, String](
topAnnotationsCf,Utf8Codec, LongCodec, Utf8Codec
).consistency(WriteConsistency.One).consistency(ReadConsistency.One)

val _dependencies = cassandraConfig.keyspace.columnFamily[String, Long, String](
dependenciesCf, Utf8Codec, LongCodec, Utf8Codec
).consistency(WriteConsistency.One).consistency(ReadConsistency.One)

new CassandraAggregates {
val topAnnotations: ColumnFamily[String, Long, String] = _topAnnotations
val dependencies: ColumnFamily[String, Long, String] = _dependencies
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
}
}

def getDependencies(serviceName: String): Future[Seq[String]] = {
log.debug("getDependencies: " + serviceName)
call("getDependencies") {
aggregates.getDependencies(serviceName)
}
}

def getTopAnnotations(serviceName: String): Future[Seq[String]] = {
log.debug("getTopAnnotations: " + serviceName)
call("getTopAnnotations") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import com.twitter.util.Future
trait Aggregates {
def getTopAnnotations(serviceName: String): Future[Seq[String]]
def getTopKeyValueAnnotations(serviceName: String): Future[Seq[String]]
def getDependencies(serviceName: String): Future[Seq[String]]

def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit]
def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
}

class NullAggregates extends Aggregates {
def getDependencies(serviceName: String) = Future(Seq.empty[String])
def getTopAnnotations(serviceName: String) = Future(Seq.empty[String])
def getTopKeyValueAnnotations(serviceName: String) = Future(Seq.empty[String])

def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = Future.Unit
def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
}
Loading

0 comments on commit cf30f5a

Please sign in to comment.