Skip to content

Commit

Permalink
Modified scripts to fix minor bug
Browse files Browse the repository at this point in the history
Modifications to run_job.sh amd run_all_jobs.sh and SpanSource to allow us to
run jobs remotely

Author: @jerryli9876
Fixes #79
URL: #79
  • Loading branch information
jerryli9876 committed Jul 25, 2012
1 parent b03bb86 commit ad22592
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.twitter.zipkin.hadoop
import com.twitter.scalding._
import cascading.pipe.joiner._
import com.twitter.zipkin.gen.{SpanServiceName, BinaryAnnotation, Span, Annotation}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSourceTest, PreprocessedSpanSource, Util}
import com.twitter.zipkin.hadoop.sources.{PrepTsvSource, PreprocessedSpanSource, Util}

/**
* Find out how often services call each other throughout the entire system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,27 @@ trait LzoTsv extends DelimitedScheme {
*/
case class SpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("/logs/zipkin/", dateRange)

case class SpanSource1(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("good_data", dateRange)

case class FixedSpanSource(p : String) extends FixedPathSource(p) with LzoThrift[Span] {
def column = classOf[Span]
}

/**
* This is the source for trace data that has been merged. Directories are like in SpanSource
*/
case class PrepNoNamesSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("test", dateRange)
case class PrepNoNamesSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[Span]("Preprocessed", dateRange)

/**
* This is the source for trace data that has been merged and for which we've found
* the best possible client side and service names. Directories are like in SpanSource
*/
case class PreprocessedSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("testagain", dateRange)

case class PreprocessedSpanSourceTest(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("testalpha", dateRange)
case class PreprocessedSpanSource(implicit dateRange: DateRange) extends HourlySuffixLzoThrift[SpanServiceName]("FindNames", dateRange)

/**
* This is the source for data of the form (id, service name)
*/

case class PrepTsvSource()(implicit dateRange : DateRange)
extends DailySuffixSource("id_names", dateRange)
extends HourlySuffixSource("FindIDtoName", dateRange)
with LzoTsv
with Mappable[(Long, String)]
with SuccessFileSource {
Expand Down
70 changes: 0 additions & 70 deletions zipkin-hadoop/src/scripts/run.sh

This file was deleted.

97 changes: 97 additions & 0 deletions zipkin-hadoop/src/scripts/run_all_jobs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bash/env ruby
# Runs all scalding jobs in package com.twitter.zipkin.hadoop

require 'optparse'
require 'ostruct'
require 'pp'
require 'date'

class OptparseAllJobArguments

#
# Return a structure describing the options.
#
def self.parse(args)
# The options specified on the command line will be collected in *options*.
# We set default values here.
options = OpenStruct.new
options.uses_hadoop_config = false
options.dates = []
options.output = ""

opts = OptionParser.new do |opts|
opts.banner = "Usage: run_job.rb -d DATE -o OUTPUT -c CONFIG"

opts.separator ""
opts.separator "Specific options:"

opts.on("-d", "--date STARTDATE,ENDDATE", Array, "The DATES to run the jobs over. Expected format for dates are is %Y-%m-%dT%H:%M") do |list|
options.dates = list
end

opts.on("-o", "--output OUTPUT",
"The OUTPUT file to write to") do |output|
options.output = output
end

opts.on("-c", "--config [CONFIG]", "Optional hadoop configurations for the job.") do |config|
options.uses_hadoop_config = true
options.hadoop_config = config || ''
end

opts.separator ""
opts.separator "Common options:"
opts.on_tail("-h", "--help", "Show this message") do
puts opts
exit
end
end
opts.parse!(args)
options
end
end

options = OptparseAllJobArguments.parse(ARGV)

date_cmd = options.dates.join(" ")
end_date_cmd = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
config_string = options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
run_job_cmd = "ruby " + File.dirname(__FILE__) + "/run_job.rb" + config_string

puts "Run_job_command = " + run_job_cmd
puts "Date = " + date_cmd

def run_jobs_in_parallel(prep, jobs)
threads = []
prepThread = Thread.new(prep) { |prep| system(prep) }
for job in jobs
threads << Thread.new(job) { |job| system(job) }
end
prepThread.join
return threads
# threads.each { |aThread| aThread.join }
end

#Run the commands
system(run_job_cmd + " -j Preprocessed -p -d " + date_cmd)

jobs_set_1 = Array[ run_job_cmd + " -j WorstRuntimes -o " + options.output + "/WorstRuntimes -d " + end_date_cmd,
run_job_cmd + " -j MemcacheRequest -o " + options.output + "/MemcacheRequest -d " + end_date_cmd]

jobs_set_2 = Array[ run_job_cmd + " -j PopularKeys -o " + options.output + "/PopularKeys -d " + end_date_cmd,
run_job_cmd + " -j PopularAnnotations -o " + options.output + "/PopularAnnotations -d " + end_date_cmd
# run_job_cmd + " -j WhaleReport -o " + options.output + "/WhaleReport -d " + end_date_cmd,
]

jobs_set_3 = Array[ run_job_cmd + " -j DependencyTree -o " + options.output + "/DependencyTree -d " + end_date_cmd,
run_job_cmd + " -j Timeouts -s \" --error_type finagle.timeout \" -o " + options.output + "/Timeouts -d " + end_date_cmd,
run_job_cmd + " -j Timeouts -s \" --error_type finagle.retry \" -o " + options.output + "/Retries -d " + end_date_cmd ]

jobs = run_jobs_in_parallel(run_job_cmd + " -j FindNames -p -d " + end_date_cmd, jobs_set_1)
jobs += run_jobs_in_parallel(run_job_cmd + " -j FindIDtoName -p -d " + end_date_cmd, jobs_set_2)
jobs += run_jobs_in_parallel("", jobs_set_3)

jobs.each { |aThread| aThread.join }

puts "All jobs finished!"

125 changes: 125 additions & 0 deletions zipkin-hadoop/src/scripts/run_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bash/env ruby
# Script that rxuns a single scalding job

require 'optparse'
require 'ostruct'
require 'pp'
require 'date'

$HOST = "my.remote.host"

class OptparseJobArguments

#
# Return a structure describing the options.
#
def self.parse(args)
# The options specified on the command line will be collected in *options*.
# We set default values here.
options = OpenStruct.new
options.job = nil
options.uses_settings = false
options.uses_hadoop_config = false
options.dates = []
options.output = ""
options.preprocessor = false

opts = OptionParser.new do |opts|
opts.banner = "Usage: run_job.rb -j JOB -d DATE -o OUTPUT -p -s SETTINGS -c CONFIG"

opts.separator ""
opts.separator "Specific options:"

opts.on("-j", "--job JOBNAME",
"The JOBNAME to run") do |job|
options.job = job
end

opts.on("-d", "--date DATES", Array,
"The DATES to run the job over. Expected format is %Y-%m-%dT%H:%M") do |dates|
options.dates = dates.map{|date| DateTime.strptime(date, '%Y-%m-%dT%H:%M')}
end

opts.on("-o", "--output OUTPUT",
"The OUTPUT file to write to") do |output|
options.output = output
end

opts.on("-p", "--[no-]prep", "Run as preprocessor") do |v|
options.preprocessor = true
end

opts.on("-s", "--settings [SETTINGS]", "Optional settings for the job") do |settings|
options.uses_settings = true
options.settings = settings || ''
end

opts.on("-c", "--config [CONFIG]", "Optional hadoop configurations for the job.") do |config|
options.uses_hadoop_config = true
options.hadoop_config = config || ''
end


opts.separator ""
opts.separator "Common options:"
opts.on_tail("-h", "--help", "Show this message") do
puts opts
exit
end
end
opts.parse!(args)
options
end
end

options = OptparseJobArguments.parse(ARGV)
start_date = options.dates.at(0)
end_date = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)

def time_to_remote_file(time, prefix)
return prefix + time.year.to_s() + "/" + append_zero(time.month) + "/" + append_zero(time.day) + "/" + append_zero(time.hour)
end

def append_zero(x)
if 0 <= x and x <= 9
0.to_s() + x.to_s()
else
x.to_s()
end
end

# TODO: So hacky OMG what is this I don't even
def is_hadoop_local_machine?()
return system("hadoop dfs -test -e .")
end

def remote_file_exists?(pathname, options)
cmd = is_hadoop_local_machine?() ? "" : "ssh -C " + $HOST + " "
cmd += "hadoop"
cmd += options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
cmd += " dfs -test -e " + pathname
result = system(cmd)
puts "In run_job, remote_file_exists for " + pathname + ": " + result.to_s()
return result
end

def date_to_cmd(date)
return date.to_s()[0..18]
end

cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
settings_string = options.uses_settings ? " " + options.settings : ""
cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
cmd_args = options.job + settings_string + " --date " + cmd_date + " --tz UTC"

if options.preprocessor
if not remote_file_exists?(time_to_remote_file(end_date, options.job + "/") + "/_SUCCESS", options)
cmd = cmd_head + "sources." + cmd_args
system(cmd)
end
else
if not remote_file_exists?(options.output + "/_SUCCESS", options)
cmd = cmd_head + cmd_args + " --output " + options.output
system(cmd)
end
end
Loading

0 comments on commit ad22592

Please sign in to comment.