diff --git a/zipkin-hadoop-job-runner/src/main/resources/email.mustache b/zipkin-hadoop-job-runner/src/main/resources/email.mustache
index 9ed9639eb04..7713f019c2c 100644
--- a/zipkin-hadoop-job-runner/src/main/resources/email.mustache
+++ b/zipkin-hadoop-job-runner/src/main/resources/email.mustache
@@ -4,12 +4,13 @@
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
- Service Report for {{serviceName}}
+ Service Report for {{standardServiceName}}
{{/header}}
- {{#body}}
+ {{#services}}
+ {{serviceName}}
{{#oneLineResults}}
{{result}}
{{/oneLineResults}}
@@ -45,8 +46,8 @@
{{/tableUrlRows}}
{{/tableResults}}
+ {{/services}}
- {{/body}}
{{/html}}
diff --git a/zipkin-hadoop-job-runner/src/scripts/has-sampled.rb b/zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
index bcf2d2df9cc..f60b0738dac 100644
--- a/zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
+++ b/zipkin-hadoop-job-runner/src/scripts/has-sampled.rb
@@ -63,14 +63,13 @@ def self.parse(args)
end
end
-options = OptparseHasSampledArguments.parse(ARGV)
-
$config = {
:zipkin_query_host => "localhost", #whatever the collector is
:zipkin_query_port => 9411,
:skip_zookeeper => true
}
+# Queries the client to ask whether the trace is stored
def sampled_traces(trace_ids)
result = false
traces = nil
@@ -80,39 +79,28 @@ def sampled_traces(trace_ids)
return traces
end
+# Gets trace id from input line
def get_trace_id(line)
return line.split("\t")[1].to_i
end
-File.open(options.output, 'w') do |out_file|
- trace_list = []
- File.open(options.input, 'r').each do |line|
- trace_list = trace_list << get_trace_id(line)
- end
- sampled = sampled_traces(trace_list)
- File.open(options.input, 'r').each do |line|
- if (sampled.include?(get_trace_id(line)))
- out_file.print line
- puts line
+# Reads the input from the first file, and then outputs the lines from the input which are sampled to the outputfile
+def sample(inputfile, outputfile)
+ File.open(outputfile, 'w') do |out_file|
+ trace_list = []
+ File.open(inputfile, 'r').each do |line|
+ trace_list = trace_list << get_trace_id(line)
end
- end
-end
-
-=begin
-h = Hash.new
-
-File.open(options.input, 'r').each do |line|
- ary = line.split("\t")
- if h[ary[0]] == nil
- h[ary[0]] = Array.new(1, ary[1].to_i)
- else
- h[ary[0]] = h[ary[0]] << ary[1].to_i
+ sampled = sampled_traces(trace_list)
+ File.open(inputfile, 'r').each do |line|
+ if (sampled.include?(get_trace_id(line)))
+ out_file.print line
+ end
+ end
end
end
-ary = Array.new()
-
-h.each do |service, traces|
- p sampled_traces(traces)
+if __FILE__ == $0
+ options = OptparseHasSampledArguments.parse(ARGV)
+ sample(options.input, options.output)
end
-=end
diff --git a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala
index 5e2bc079494..a5ae9511f86 100644
--- a/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala
+++ b/zipkin-hadoop/src/main/scala/com/twitter/zipkin/hadoop/sources/Preprocessed.scala
@@ -28,8 +28,8 @@ import scala.collection.JavaConverters._
class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
val preprocessed = SpanSource()
.read
- .mapTo(0 ->('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
- s: Span => (s.trace_id, s.name, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
+ .mapTo(0 ->('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations)) {
+ s: Span => (s.trace_id, s.id, s.parent_id, s.annotations.toList, s.binary_annotations.toList)
}
.groupBy('trace_id, 'name, 'id, 'parent_id) {
_.reduce('annotations, 'binary_annotations) {
@@ -39,11 +39,11 @@ class Preprocessed(args : Args) extends Job(args) with DefaultDateRangeJob {
}
val onlyMerge = preprocessed
- .mapTo(('trace_id, 'name, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) {
- a : (Long, String, Long, Long, List[Annotation], List[BinaryAnnotation]) =>
+ .mapTo(('trace_id, 'id, 'parent_id, 'annotations, 'binary_annotations) -> 'span) {
+ a : (Long, Long, Long, List[Annotation], List[BinaryAnnotation]) =>
a match {
- case (tid, name, id, pid, annotations, binary_annotations) =>
- new gen.Span(tid, name, id, annotations.asJava, binary_annotations.asJava).setParent_id(pid)
+ case (tid, id, pid, annotations, binary_annotations) =>
+ new gen.Span(tid, "", id, annotations.asJava, binary_annotations.asJava).setParent_id(pid)
}
}.write(PrepNoNamesSpanSource())
}
diff --git a/zipkin-hadoop/src/scripts/run_all_jobs.rb b/zipkin-hadoop/src/scripts/run_all_jobs.rb
index eddd4feeac2..034330f64a9 100755
--- a/zipkin-hadoop/src/scripts/run_all_jobs.rb
+++ b/zipkin-hadoop/src/scripts/run_all_jobs.rb
@@ -1,4 +1,4 @@
-#!/usr/bash/env ruby
+#!/usr/bin/env ruby
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,6 +19,7 @@
require 'ostruct'
require 'pp'
require 'date'
+require 'run_job.rb'
class OptparseAllJobArguments
@@ -40,6 +41,7 @@ def self.parse(args)
opts.separator "Specific options:"
opts.on("-d", "--date STARTDATE,ENDDATE", Array, "The DATES to run the jobs over. Expected format for dates are is %Y-%m-%dT%H:%M") do |list|
+ options.dates = list.map{|date| DateTime.strptime(date, '%Y-%m-%dT%H:%M')}
options.dates = list
end
@@ -65,51 +67,47 @@ def self.parse(args)
end
end
-options = OptparseAllJobArguments.parse(ARGV)
-
-date_cmd_UTC = options.dates.join(" ") + " -t UTC"
-end_date_cmd = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
-end_date_cmd_UTC = end_date_cmd + " -t UTC"
-config_string = options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
-run_job_cmd = "ruby " + File.dirname(__FILE__) + "/run_job.rb" + config_string
-
-puts "Run_job_command = " + run_job_cmd
-puts "Date = " + date_cmd_UTC
+#Run the commands
def run_jobs_in_parallel(prep, jobs)
threads = []
- prepThread = Thread.new(prep) { |prep| system(prep) }
+ prepThread = Thread.new(prep) { |prep| run_job(prep) }
for job in jobs
- threads << Thread.new(job) { |job| system(job) }
+ threads << Thread.new(job) { |job| run_job(job) }
end
prepThread.join
return threads
-# threads.each { |aThread| aThread.join }
end
-#Run the commands
-system(run_job_cmd + " -j Preprocessed -p -d " + date_cmd_UTC)
-
-jobs_set_1 = Array[ ]
-
-jobs_set_2 = Array[ run_job_cmd + " -j PopularKeys -o " + options.output + "/PopularKeys -d " + end_date_cmd_UTC,
- run_job_cmd + " -j PopularAnnotations -o " + options.output + "/PopularAnnotations -d " + end_date_cmd_UTC,
- run_job_cmd + " -j MemcacheRequest -o " + options.output + "/MemcacheRequest -d " + end_date_cmd_UTC,
- run_job_cmd + " -j WorstRuntimes -o " + options.output + "/WorstRuntimes -d " + end_date_cmd_UTC,
- run_job_cmd + " -j WorstRuntimesPerTrace -o " + options.output + "/WorstRuntimesPerTrace -d " + end_date_cmd_UTC,
- run_job_cmd + " -j WhaleReport -o " + options.output + "/WhaleReport -d " + end_date_cmd_UTC
- ]
-
-jobs_set_3 = Array[ run_job_cmd + " -j DependencyTree -o " + options.output + "/DependencyTree -d " + end_date_cmd_UTC,
- run_job_cmd + " -j ExpensiveEndpoints -o " + options.output + "/ExpensiveEndpoints -d " + end_date_cmd_UTC,
- run_job_cmd + " -j Timeouts -s \" --error_type finagle.timeout \" -o " + options.output + "/Timeouts -d " + end_date_cmd_UTC,
- run_job_cmd + " -j Timeouts -s \" --error_type finagle.retry \" -o " + options.output + "/Retries -d " + end_date_cmd_UTC ]
-
-jobs = run_jobs_in_parallel(run_job_cmd + " -j FindNames -p -d " + end_date_cmd_UTC, jobs_set_1)
-jobs += run_jobs_in_parallel(run_job_cmd + " -j FindIDtoName -p -d " + end_date_cmd_UTC, jobs_set_2)
-jobs += run_jobs_in_parallel("", jobs_set_3)
-
-jobs.each { |aThread| aThread.join }
-
-puts "All jobs finished!"
+def run_all_jobs(dates, output, hadoop_config)
+ uses_hadoop_config = hadoop_config != nil
+ config_string = uses_hadoop_config ? " --config " + hadoop_config : nil
+
+ jobs_set_1 = []
+
+ jobs_set_2 = Array[JobArguments.new(dates, nil, config_string, "UTC", "PopularKeys", output + "/PopularKeys", false),
+ JobArguments.new(dates, nil, config_string, "UTC", "PopularAnnotations", output + "/PopularAnnotations", false),
+ JobArguments.new(dates, nil, config_string, "UTC", "MemcacheRequest", output + "/MemcacheRequest",false),
+ JobArguments.new(dates, nil, config_string, "UTC", "WorstRuntimes", output + "/WorstRuntimes", false),
+ JobArguments.new(dates, nil, config_string, "UTC", "WorstRuntimesPerTrace", output + "/WorstRuntimesPerTrace", false),
+ JobArguments.new(dates, nil, config_string, "UTC", "WhaleReport", output + "/WhaleReport", false)]
+
+ jobs_set_3 = Array[JobArguments.new(dates, nil, config_string, "UTC", "DependencyTree", output + "/DependencyTree", false),
+ JobArguments.new(dates, nil, config_string, "UTC", "ExpensiveEndpoints", output + "/ExpensiveEndpoints", false),
+ JobArguments.new(dates, "\" --error_type finagle.timeout \"", config_string, "UTC", "Timeouts", output + "/Timeouts", false),
+ JobArguments.new(dates, " \" --error_type finagle.retry \"", config_string, "UTC", "Timeouts", output + "/Retries", false),]
+
+ run_job(JobArguments.new(dates, nil, config_string, "UTC", "Preprocessed", nil, true))
+ jobs = run_jobs_in_parallel(JobArguments.new(dates, nil, config_string, "UTC", "FindNames", nil, true), jobs_set_1)
+ jobs += run_jobs_in_parallel(JobArguments.new(dates, nil, config_string, "UTC", "FindIDtoName", nil, true), jobs_set_2)
+ jobs += run_jobs_in_parallel(nil, jobs_set_3)
+
+ jobs.each { |aThread| aThread.join }
+
+ puts "All jobs finished!"
+end
+if __FILE__ == $0
+ options = OptparseAllJobArguments.parse(ARGV)
+ run_all_jobs(options.dates, options.output, options.hadoop_config)
+end
diff --git a/zipkin-hadoop/src/scripts/run_job.rb b/zipkin-hadoop/src/scripts/run_job.rb
index 114c5add1ec..4c18c4e82b9 100755
--- a/zipkin-hadoop/src/scripts/run_job.rb
+++ b/zipkin-hadoop/src/scripts/run_job.rb
@@ -92,9 +92,21 @@ def self.parse(args)
end
end
-options = OptparseJobArguments.parse(ARGV)
-start_date = options.dates.at(0)
-end_date = options.dates.length > 1 ? options.dates.at(1) : options.dates.at(0)
+class JobArguments
+ attr_accessor :dates, :settings, :hadoop_config, :timezone, :job, :output, :prep
+
+ def initialize(dates, settings, hadoop_config, timezone, job, output, prep)
+ @dates = dates
+ @settings = settings
+ @hadoop_config = hadoop_config
+ @timezone = timezone
+ @job = job
+ @output = output
+ @prep = prep
+ end
+
+end
+
def time_to_remote_file(time, prefix)
return prefix + time.year.to_s() + "/" + append_zero(time.month) + "/" + append_zero(time.day) + "/" + append_zero(time.hour)
@@ -113,34 +125,51 @@ def is_hadoop_local_machine?()
return system("hadoop dfs -test -e .")
end
-def remote_file_exists?(pathname, options)
+def remote_file_exists?(pathname, hadoop_config)
cmd = is_hadoop_local_machine?() ? "" : "ssh -C " + $HOST + " "
- cmd += "hadoop"
- cmd += options.uses_hadoop_config ? " --config " + options.hadoop_config : ""
+ cmd += "hadoop "
+ cmd += (hadoop_config == nil) ? "" : hadoop_config
cmd += " dfs -test -e " + pathname
result = system(cmd)
puts "In run_job, remote_file_exists for " + pathname + ": " + result.to_s()
return result
end
-def date_to_cmd(date)
- return date.to_s()[0..18]
+def date_to_cmd(time)
+ return time.year.to_s() + "-" + append_zero(time.month) + "-" + append_zero(time.day) + "T" + append_zero(time.hour) + ":00"
end
-cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
-settings_string = options.uses_settings ? " " + options.settings : ""
-cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
-timezone_cmd = options.set_timezone ? " --tz " + options.timezone : ""
-cmd_args = options.job + settings_string + " --date " + cmd_date + timezone_cmd
-
-if options.preprocessor
- if not remote_file_exists?(time_to_remote_file(end_date, options.job + "/") + "/_SUCCESS", options)
- cmd = cmd_head + "sources." + cmd_args
- system(cmd)
+def run_job(args)
+ if (args == nil)
+ return
end
-else
- if not remote_file_exists?(options.output + "/_SUCCESS", options)
- cmd = cmd_head + cmd_args + " --output " + options.output
- system(cmd)
+ uses_settings = args.settings != nil
+ set_timezone = args.timezone != nil
+ start_date = args.dates.at(0)
+ end_date = args.dates.length > 1 ? args.dates.at(1) : args.dates.at(0)
+ hadoop_config_cmd = (args.hadoop_config == nil) ? "" : args.hadoop_config
+ cmd_head = File.dirname(__FILE__) + "/scald.rb --hdfs com.twitter.zipkin.hadoop."
+ settings_string = uses_settings ? " " + args.settings : ""
+ cmd_date = date_to_cmd(start_date) + " " + date_to_cmd(end_date)
+ timezone_cmd = set_timezone ? " --tz " + args.timezone : ""
+ cmd_args = args.job + settings_string + " " + hadoop_config_cmd + " --date " + cmd_date + timezone_cmd
+
+ if args.prep
+ if not remote_file_exists?(time_to_remote_file(end_date, args.job + "/") + "/_SUCCESS", args.hadoop_config)
+ cmd = cmd_head + "sources." + cmd_args
+ puts cmd
+ system(cmd)
+ end
+ else
+ if not remote_file_exists?(args.output + "/_SUCCESS", args.hadoop_config)
+ cmd = cmd_head + cmd_args + " --output " + args.output
+ puts cmd
+ system(cmd)
+ end
end
end
+
+if __FILE__ == $0
+ options = OptparseJobArguments.parse(ARGV)
+ run_job(options.dates, options.settings, options.hadoop_config, options.timezone, options.job, options.output, options.preprocessor)
+end
\ No newline at end of file
diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/config/CassandraAggregatesConfig.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/config/CassandraAggregatesConfig.scala
index bcf6c487de0..3f26c2ec22a 100644
--- a/zipkin-server/src/main/scala/com/twitter/zipkin/config/CassandraAggregatesConfig.scala
+++ b/zipkin-server/src/main/scala/com/twitter/zipkin/config/CassandraAggregatesConfig.scala
@@ -23,14 +23,20 @@ trait CassandraAggregatesConfig extends AggregatesConfig { self =>
def cassandraConfig: CassandraConfig
var topAnnotationsCf: String = "TopAnnotations"
+ var dependenciesCf: String = "Dependencies"
def apply(): CassandraAggregates = {
val _topAnnotations = cassandraConfig.keyspace.columnFamily[String, Long, String](
topAnnotationsCf,Utf8Codec, LongCodec, Utf8Codec
).consistency(WriteConsistency.One).consistency(ReadConsistency.One)
+ val _dependencies = cassandraConfig.keyspace.columnFamily[String, Long, String](
+ dependenciesCf, Utf8Codec, LongCodec, Utf8Codec
+ ).consistency(WriteConsistency.One).consistency(ReadConsistency.One)
+
new CassandraAggregates {
val topAnnotations: ColumnFamily[String, Long, String] = _topAnnotations
+ val dependencies: ColumnFamily[String, Long, String] = _dependencies
}
}
}
diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala
index 3d2c3ef81e0..9eb018bcbb1 100644
--- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala
+++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala
@@ -256,6 +256,13 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus
}
}
+ def getDependencies(serviceName: String): Future[Seq[String]] = {
+ log.debug("getDependencies: " + serviceName)
+ call("getDependencies") {
+ aggregates.getDependencies(serviceName)
+ }
+ }
+
def getTopAnnotations(serviceName: String): Future[Seq[String]] = {
log.debug("getTopAnnotations: " + serviceName)
call("getTopAnnotations") {
diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Aggregates.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Aggregates.scala
index 591781b1d62..603e1e14ea1 100644
--- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Aggregates.scala
+++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/Aggregates.scala
@@ -24,15 +24,19 @@ import com.twitter.util.Future
trait Aggregates {
def getTopAnnotations(serviceName: String): Future[Seq[String]]
def getTopKeyValueAnnotations(serviceName: String): Future[Seq[String]]
+ def getDependencies(serviceName: String): Future[Seq[String]]
+ def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit]
def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit]
}
class NullAggregates extends Aggregates {
+ def getDependencies(serviceName: String) = Future(Seq.empty[String])
def getTopAnnotations(serviceName: String) = Future(Seq.empty[String])
def getTopKeyValueAnnotations(serviceName: String) = Future(Seq.empty[String])
+ def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = Future.Unit
def storeTopAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
def storeTopKeyValueAnnotations(serviceName: String, a: Seq[String]): Future[Unit] = Future.Unit
}
diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala
index 315ec3912de..afba7107d13 100644
--- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala
+++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregates.scala
@@ -32,9 +32,19 @@ import com.twitter.cassie.{Column, ColumnFamily}
trait CassandraAggregates extends Aggregates with Cassandra {
val topAnnotations: ColumnFamily[String, Long, String]
+ val dependencies: ColumnFamily[String, Long, String]
val Delimiter: String = ":"
+ /**
+ * Get the top annotations for a service name
+ */
+ def getDependencies(serviceName: String): Future[Seq[String]] = {
+ dependencies.getRow(serviceName).map {
+ _.values().asScala.map { _.value }.toSeq
+ }
+ }
+
/**
* Get the top annotations for a service name
*/
@@ -69,6 +79,17 @@ trait CassandraAggregates extends Aggregates with Cassandra {
}
}
+ /** Synchronize these so we don't do concurrent writes from the same box */
+ def storeDependencies(serviceName: String, endpoints: Seq[String]): Future[Unit] = synchronized {
+ val remove = dependencies.removeRow(serviceName)
+ val batch = dependencies.batch()
+ endpoints.zipWithIndex.foreach { case (endpoint: String, index: Int) =>
+ batch.insert(serviceName, new Column[Long, String](index, endpoint))
+ }
+ remove()
+ Future.join(Seq(batch.execute()))
+ }
+
/** Synchronize these so we don't do concurrent writes from the same box */
private[cassandra] def storeAnnotations(key: String, annotations: Seq[String]): Future[Unit] = synchronized {
val remove = topAnnotations.removeRow(key)
diff --git a/zipkin-server/src/schema/cassandra-schema.txt b/zipkin-server/src/schema/cassandra-schema.txt
index b2b0164e6fe..251d8730c3c 100644
--- a/zipkin-server/src/schema/cassandra-schema.txt
+++ b/zipkin-server/src/schema/cassandra-schema.txt
@@ -13,4 +13,10 @@ create column family ServiceNameIndex with comparator = LongType;
create column family AnnotationsIndex with comparator = LongType;
create column family DurationIndex with comparator = LongType;
-create column family TopAnnotations with comparator = LongType;
\ No newline at end of file
+/*
+TopAnnotations stores the top normal and key value annotations per service,
+and dependencies stores the parents and number of calls to parents per service
+*/
+
+create column family TopAnnotations with comparator = LongType;
+create column family Dependencies with comparator = LongType;
diff --git a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregatesSpec.scala b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregatesSpec.scala
index f934c50f54e..7566176bf81 100644
--- a/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregatesSpec.scala
+++ b/zipkin-server/src/test/scala/com/twitter/zipkin/storage/cassandra/CassandraAggregatesSpec.scala
@@ -27,10 +27,12 @@ import scala.collection.JavaConverters._
class CassandraAggregatesSpec extends Specification with JMocker with ClassMocker {
- val mockCf = mock[ColumnFamily[String, Long, String]]
+ val mockAnnotationsCf = mock[ColumnFamily[String, Long, String]]
+ val mockDependenciesCf = mock[ColumnFamily[String, Long, String]]
def cassandraAggregates = new CassandraAggregates {
- val topAnnotations = mockCf
+ val topAnnotations = mockAnnotationsCf
+ val dependencies = mockDependenciesCf
}
def column(name: Long, value: String) = new Column[Long, String](name, value)
@@ -41,6 +43,11 @@ class CassandraAggregatesSpec extends Specification with JMocker with ClassMocke
index.toLong -> column(index, ann)
}.toMap.asJava
+ val serviceCallsSeq = Seq("parent1:10, parent2:20")
+ val serviceCalls = serviceCallsSeq.zipWithIndex.map {case (ann, index) =>
+ index.toLong -> column(index, ann)
+ }.toMap.asJava
+
"retrieval" in {
"getTopAnnotations" in {
val agg = cassandraAggregates
@@ -48,7 +55,7 @@ class CassandraAggregatesSpec extends Specification with JMocker with ClassMocke
val rowKey = agg.topAnnotationRowKey(serviceName)
expect {
- one(mockCf).getRow(rowKey) willReturn Future.value(topAnns)
+ one(mockAnnotationsCf).getRow(rowKey) willReturn Future.value(topAnns)
}
agg.getTopAnnotations(serviceName)() mustEqual topAnnsSeq
@@ -60,11 +67,20 @@ class CassandraAggregatesSpec extends Specification with JMocker with ClassMocke
val rowKey = agg.topKeyValueRowKey(serviceName)
expect {
- one(mockCf).getRow(rowKey) willReturn Future.value(topAnns)
+ one(mockAnnotationsCf).getRow(rowKey) willReturn Future.value(topAnns)
}
agg.getTopKeyValueAnnotations(serviceName)() mustEqual topAnnsSeq
}
+
+ "getDependencies" in {
+ val agg = cassandraAggregates
+ val serviceName = "mockingbird"
+ expect {
+ one(mockDependenciesCf).getRow(serviceName) willReturn Future.value(serviceCalls)
+ }
+ agg.getDependencies(serviceName)() mustEqual serviceCallsSeq
+ }
}
"storage" in {
@@ -96,15 +112,24 @@ class CassandraAggregatesSpec extends Specification with JMocker with ClassMocke
agg.getTopKeyValueAnnotations(serviceName).apply() mustEqual topAnnsSeq
}
+ "storeDependencies" in {
+ agg.storeDependencies(serviceName, serviceCallsSeq).apply()
+ agg.getDependencies(serviceName).apply() mustEqual serviceCallsSeq
+ }
+
"clobber old entries" in {
val anns1 = Seq("a1", "a2", "a3", "a4")
val anns2 = Seq("a5", "a6")
+ val calls1 = Seq("sc1", "sc2")
agg.storeTopAnnotations(serviceName, anns1).apply()
agg.getTopAnnotations(serviceName).apply() mustEqual anns1
agg.storeTopAnnotations(serviceName, anns2).apply()
agg.getTopAnnotations(serviceName).apply() mustEqual anns2
+
+ agg.storeDependencies(serviceName, calls1).apply()
+ agg.getDependencies(serviceName).apply() mustEqual calls1
}
}
}
diff --git a/zipkin-thrift/src/main/thrift/zipkinCollector.thrift b/zipkin-thrift/src/main/thrift/zipkinCollector.thrift
index b615747f63e..e528ed0fc6a 100644
--- a/zipkin-thrift/src/main/thrift/zipkinCollector.thrift
+++ b/zipkin-thrift/src/main/thrift/zipkinCollector.thrift
@@ -29,6 +29,7 @@ service ZipkinCollector extends scribe.scribe {
/** Aggregates methods */
void storeTopAnnotations(1: string service_name, 2: list annotations) throws (1: StoreAggregatesException e);
void storeTopKeyValueAnnotations(1: string service_name, 2: list annotations) throws (1: StoreAggregatesException e);
+ void storeDependencies(1: string service_name, 2: list endpoints) throws (1: StoreAggregatesException e);
//************** ZK config changes **************
diff --git a/zipkin-thrift/src/main/thrift/zipkinQuery.thrift b/zipkin-thrift/src/main/thrift/zipkinQuery.thrift
index 01c391cae83..31bfda91196 100644
--- a/zipkin-thrift/src/main/thrift/zipkinQuery.thrift
+++ b/zipkin-thrift/src/main/thrift/zipkinQuery.thrift
@@ -194,6 +194,7 @@ service ZipkinQuery {
i32 getDataTimeToLive() throws (1: QueryException qe);
/** Aggregates related */
+ list getDependencies(1: string service_name) throws (1: QueryException qe);
list getTopAnnotations(1: string service_name) throws (1: QueryException qe);
list getTopKeyValueAnnotations(1: string service_name) throws (1: QueryException qe);
}
diff --git a/zipkin-web/app/controllers/traces_controller.rb b/zipkin-web/app/controllers/traces_controller.rb
index 0de86243604..3b1df5594ac 100644
--- a/zipkin-web/app/controllers/traces_controller.rb
+++ b/zipkin-web/app/controllers/traces_controller.rb
@@ -117,6 +117,15 @@ def spans_json
render :json => Names.get_span_names(service_name).to_a
end
+ def dependencies
+ service_name = params[:service_name] || ""
+ dependencies = nil
+ ZipkinQuery::Client.with_transport(Rails.configuration.zookeeper) do |client|
+ dependencies = client.getDependencies(service_name)
+ end
+ render :json => dependencies || []
+ end
+
def top_annotations
service_name = params[:service_name] || ""
top_annotations = nil