Skip to content

Commit

Permalink
Switch to Zipkin v2 span format
Browse files Browse the repository at this point in the history
  • Loading branch information
ykitamura-mdsol committed Mar 5, 2019
1 parent 76ea494 commit f0435f9
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 242 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.33.0
* Switch to Zipkin v2 span format

# 0.32.4
* Remove the ':service_port' configuration.
* Fix 'sa' annotation encoding.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ It can be used to measure the performance of process, record value of variables,

When `local_component_span` method is called, it creates a new span and a local component, and provides the following methods to create annotations.
* record(key) - annotation
* record_tag(key, value) - binary annotation
* record_tag(key, value) - tag

Example:
```ruby
Expand All @@ -105,7 +105,7 @@ Only one of the following tracers can be used at a given time.

Sends traces as JSON over HTTP. This is the preferred tracer to use as the openzipkin project moves away from Thrift.

You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v1/spans` path.
You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v2/spans` path.


### Kafka
Expand Down
28 changes: 9 additions & 19 deletions lib/zipkin-tracer/excon/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def request_call(datum)
def response_call(datum)
if span = datum[:span]
status = response_status(datum)
if status
record_response_tags(span, status, local_endpoint)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
record_response_tags(span, status) if status
Trace.tracer.end_span(span)
end

Expand All @@ -53,12 +50,8 @@ def b3_headers
}
end

def local_endpoint
Trace.default_endpoint # The rack middleware set this up for us.
end

def remote_endpoint(url, service_name)
Trace::Endpoint.remote_endpoint(url, service_name, local_endpoint.ip_format) # The endpoint we are calling.
Trace::Endpoint.remote_endpoint(url, service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
end

def service_name(datum, default)
Expand All @@ -69,12 +62,9 @@ def response_status(datum)
datum[:response] && datum[:response][:status] && datum[:response][:status].to_s
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

def trace!(datum, trace_id)
Expand All @@ -85,10 +75,10 @@ def trace!(datum, trace_id)

span = Trace.tracer.start_span(trace_id, method.downcase)
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint(url, service_name))
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint(url, service_name)
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)

# store the span in the datum hash so it can be used in the response_call
datum[:span] = span
Expand Down
33 changes: 14 additions & 19 deletions lib/zipkin-tracer/faraday/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,37 @@ def trace!(env, trace_id)
# handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
method = env[:method].to_s
url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
local_endpoint = Trace.default_endpoint # The rack middleware set this up for us.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, local_endpoint.ip_format) # The endpoint we are calling.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
Trace.tracer.with_new_span(trace_id, method.downcase) do |span|
@span = span # So we can record on exceptions
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint)
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)
response = @app.call(env).on_complete do |renv|
record_response_tags(span, renv[:status].to_s, local_endpoint)
record_response_tags(span, renv[:status].to_s)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
end
response
rescue Net::ReadTimeout
record_error(@span, 'Request timed out.', local_endpoint)
record_error(@span, 'Request timed out.')
raise
rescue Faraday::ConnectionFailed
record_error(@span, 'Request connection failed.', local_endpoint)
record_error(@span, 'Request connection failed.')
raise
rescue Faraday::ClientError
record_error(@span, 'Generic Faraday client error.', local_endpoint)
record_error(@span, 'Generic Faraday client error.')
raise
end

def record_error(span, msg, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::ERROR, msg, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
def record_error(span, msg)
span.record_tag(Trace::Span::Tag::ERROR, msg)
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

end
Expand Down
25 changes: 10 additions & 15 deletions lib/zipkin-tracer/hostname_resolver.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
require 'resolv'

module ZipkinTracer
# Resolves hostnames in the endpoints of the annotations.
# Resolves hostnames in the endpoints of the spans.
# Resolving hostnames is a very expensive operation. We want to store them raw in the main thread
# and resolve them in a different thread where we do not affect execution times.
class HostnameResolver
def spans_with_ips(spans)
host_to_ip = hosts_to_ipv4(spans)

each_annotation(spans) do |annotation|
hostname = annotation.host.ipv4
each_endpoint(spans) do |endpoint|
hostname = endpoint.ipv4
unless resolved_ip_address?(hostname.to_s)
annotation.host.ipv4 = host_to_ip[hostname]
endpoint.ipv4 = host_to_ip[hostname]
end
end
end
Expand All @@ -28,24 +28,19 @@ def resolved_ip_address?(ip_string)
ip_string.to_i.to_s == ip_string
end

def each_annotation(spans, &block)
def each_endpoint(spans, &block)
spans.each do |span|
span.annotations.each do |annotation|
yield annotation
end
span.binary_annotations.each do |annotation|
yield annotation
[span.local_endpoint, span.remote_endpoint].each do |endpoint|
yield endpoint if endpoint
end
end
end

# Annotations come in pairs like CS/CR, SS/SR.
# Each annnotation has a hostname so we, for sure, will have the same host multiple times.
# Using this to resolve only once per host
def hosts_to_ipv4(spans)
hosts = []
each_annotation(spans) do |annotation|
hosts.push(annotation.host)
each_endpoint(spans) do |endpoint|
hosts.push(endpoint)
end
hosts.uniq!
resolve(hosts)
Expand All @@ -61,7 +56,7 @@ def resolve(hosts)
end

def host_to_ip(hostname, ip_format)
ipv4 = begin
begin
ip_format == :string ? Socket.getaddrinfo(hostname, nil, :INET).first[IP_FIELD] : Trace::Endpoint.host_to_i32(hostname)
rescue
ip_format == :string ? LOCALHOST : LOCALHOST_I32
Expand Down
5 changes: 2 additions & 3 deletions lib/zipkin-tracer/rack/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class RackHandler
REQUEST_METHOD = Rack::REQUEST_METHOD rescue 'REQUEST_METHOD'.freeze

DEFAULT_SERVER_RECV_TAGS = {
Trace::BinaryAnnotation::PATH => PATH_INFO
Trace::Span::Tag::PATH => PATH_INFO
}.freeze

def initialize(app, config = nil)
Expand Down Expand Up @@ -55,12 +55,11 @@ def annotate_plugin(span, env, status, response_headers, response_body)

def trace!(span, zipkin_env, &block)
trace_request_information(span, zipkin_env)
span.record(Trace::Annotation::SERVER_RECV)
span.kind = Trace::Span::Kind::SERVER
span.record('whitelisted') if zipkin_env.force_sample?
status, headers, body = yield
ensure
annotate_plugin(span, zipkin_env.env, status, headers, body)
span.record(Trace::Annotation::SERVER_SEND)
end

def trace_request_information(span, zipkin_env)
Expand Down
85 changes: 33 additions & 52 deletions lib/zipkin-tracer/trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,17 @@ def default_endpoint
# Moved here as a first step, eventually move them out of the Trace module

class Annotation
CLIENT_SEND = "cs"
CLIENT_RECV = "cr"
SERVER_SEND = "ss"
SERVER_RECV = "sr"

attr_reader :value, :host, :timestamp
def initialize(value, host)
attr_reader :value, :timestamp

def initialize(value)
@timestamp = (Time.now.to_f * 1000 * 1000).to_i # micros
@value = value
@host = host
end

def to_h
{
value: @value,
timestamp: @timestamp,
endpoint: host.to_h
}
end
end

class BinaryAnnotation
SERVER_ADDRESS = 'sa'.freeze
URI = 'http.url'.freeze
METHOD = 'http.method'.freeze
PATH = 'http.path'.freeze
STATUS = 'http.status'.freeze
LOCAL_COMPONENT = 'lc'.freeze
ERROR = 'error'.freeze

module Type
BOOL = "BOOL"
STRING = "STRING"
end
attr_reader :key, :value, :host

def initialize(key, value, annotation_type, host)
@key = key
@value = value
@annotation_type = annotation_type
@host = host
end

def to_h
{
key: @key,
value: @value,
endpoint: host.to_h
timestamp: @timestamp
}
end
end
Expand Down Expand Up @@ -199,12 +162,29 @@ def to_i; @i128; end

# A span may contain many annotations
class Span
attr_accessor :name, :annotations, :binary_annotations, :debug
module Tag
METHOD = "http.method".freeze
PATH = "http.path".freeze
STATUS = "http.status".freeze
LOCAL_COMPONENT = "lc".freeze
ERROR = "error".freeze
end

module Kind
CLIENT = "CLIENT".freeze
SERVER = "SERVER".freeze
end

attr_accessor :name, :kind, :local_endpoint, :remote_endpoint, :annotations, :tags, :debug

def initialize(name, span_id)
@name = name
@span_id = span_id
@kind = nil
@local_endpoint = nil
@remote_endpoint = nil
@annotations = []
@binary_annotations = []
@tags = {}
@debug = span_id.debug?
@timestamp = to_microseconds(Time.now)
@duration = UNKNOWN_DURATION
Expand All @@ -219,28 +199,30 @@ def to_h
name: @name,
traceId: @span_id.trace_id.to_s,
id: @span_id.span_id.to_s,
annotations: @annotations.map(&:to_h),
binaryAnnotations: @binary_annotations.map(&:to_h),
localEndpoint: @local_endpoint.to_h,
timestamp: @timestamp,
duration: @duration,
debug: @debug
}
h[:parentId] = @span_id.parent_id.to_s unless @span_id.parent_id.nil?
h[:kind] = @kind unless @kind.nil?
h[:remoteEndpoint] = @remote_endpoint.to_h unless @remote_endpoint.nil?
h[:annotations] = @annotations.map(&:to_h) unless @annotations.empty?
h[:tags] = @tags unless @tags.empty?
h
end

# We record information into spans, then we send these spans to zipkin
def record(value, endpoint = Trace.default_endpoint)
annotations << Trace::Annotation.new(value.to_s, endpoint)
def record(value)
annotations << Trace::Annotation.new(value.to_s)
end

def record_tag(key, value, type = Trace::BinaryAnnotation::Type::STRING, endpoint = Trace.default_endpoint)
value = value.to_s if type == Trace::BinaryAnnotation::Type::STRING
binary_annotations << Trace::BinaryAnnotation.new(key, value, type, endpoint)
def record_tag(key, value)
@tags[key] = value
end

def record_local_component(value)
record_tag(BinaryAnnotation::LOCAL_COMPONENT, value)
record_tag(Tag::LOCAL_COMPONENT, value)
end

def has_parent_span?
Expand Down Expand Up @@ -293,6 +275,5 @@ def to_h
hsh[:port] = port if port
hsh
end

end
end
2 changes: 1 addition & 1 deletion lib/zipkin-tracer/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module ZipkinTracer
VERSION = '0.32.4'.freeze
VERSION = '0.33.0'.freeze
end
2 changes: 1 addition & 1 deletion lib/zipkin-tracer/zipkin_json_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class AsyncJsonApiClient
include SuckerPunch::Job
SPANS_PATH = '/api/v1/spans'
SPANS_PATH = '/api/v2/spans'

def perform(json_api_host, spans)
spans_with_ips = ::ZipkinTracer::HostnameResolver.new.spans_with_ips(spans).map(&:to_h)
Expand Down
7 changes: 3 additions & 4 deletions lib/zipkin-tracer/zipkin_tracer_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ def with_new_span(trace_id, name)

def end_span(span)
span.close
# If in a thread not handling incoming http requests, it will not have Annotation::SERVER_SEND, so the span
# If in a thread not handling incoming http requests, it will not have Kind::SERVER, so the span
# will never be flushed and will cause memory leak.
# It will have CLIENT_SEND and CLIENT_RECV if the thread sends out http requests, so use CLIENT_RECV as the sign
# to flush the span.
# If no parent span, then current span needs to flush when it ends.
if !span.has_parent_span? || span.annotations.any? { |ann| ann.value == Annotation::SERVER_SEND }
if !span.has_parent_span? || span.kind == Trace::Span::Kind::SERVER
flush!
reset
end
end

def start_span(trace_id, name)
span = Span.new(name, trace_id)
span.local_endpoint = Trace.default_endpoint
store_span(trace_id, span)
span
end
Expand Down
Loading

0 comments on commit f0435f9

Please sign in to comment.