Skip to content

Commit

Permalink
Switch to Zipkin v2 span format (#141)
Browse files Browse the repository at this point in the history
* Switch to Zipkin v2 span format

* Replace 'http.status' with 'http.status_code'

* Add shared

* Add todo to LOCAL_COMPONENT
  • Loading branch information
ykitamura-mdsol authored and jcarres-mdsol committed Mar 8, 2019
1 parent 76ea494 commit 4ff2a58
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 266 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.33.0
* Switch to Zipkin v2 span format

# 0.32.4
* Remove the ':service_port' configuration.
* Fix 'sa' annotation encoding.
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ It can be used to measure the performance of process, record value of variables,

When `local_component_span` method is called, it creates a new span and a local component, and provides the following methods to create annotations.
* record(key) - annotation
* record_tag(key, value) - binary annotation
* record_tag(key, value) - tag

Example:
```ruby
Expand All @@ -105,7 +105,7 @@ Only one of the following tracers can be used at a given time.

Sends traces as JSON over HTTP. This is the preferred tracer to use as the openzipkin project moves away from Thrift.

You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v1/spans` path.
You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v2/spans` path.


### Kafka
Expand Down Expand Up @@ -164,7 +164,7 @@ lambda do |span, env, status, response_headers, response_body|
span.record_tag('http.referrer', env['HTTP_REFERRER'])
# integer annotation
span.record_tag('http.content_size', env['CONTENT_SIZE'].to_s)
span.record_tag('http.status', status)
span.record_tag('http.status_code', status)
end
```

Expand Down
28 changes: 9 additions & 19 deletions lib/zipkin-tracer/excon/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def request_call(datum)
def response_call(datum)
if span = datum[:span]
status = response_status(datum)
if status
record_response_tags(span, status, local_endpoint)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
record_response_tags(span, status) if status
Trace.tracer.end_span(span)
end

Expand All @@ -53,12 +50,8 @@ def b3_headers
}
end

def local_endpoint
Trace.default_endpoint # The rack middleware set this up for us.
end

def remote_endpoint(url, service_name)
Trace::Endpoint.remote_endpoint(url, service_name, local_endpoint.ip_format) # The endpoint we are calling.
Trace::Endpoint.remote_endpoint(url, service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
end

def service_name(datum, default)
Expand All @@ -69,12 +62,9 @@ def response_status(datum)
datum[:response] && datum[:response][:status] && datum[:response][:status].to_s
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

def trace!(datum, trace_id)
Expand All @@ -85,10 +75,10 @@ def trace!(datum, trace_id)

span = Trace.tracer.start_span(trace_id, method.downcase)
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint(url, service_name))
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint(url, service_name)
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)

# store the span in the datum hash so it can be used in the response_call
datum[:span] = span
Expand Down
33 changes: 14 additions & 19 deletions lib/zipkin-tracer/faraday/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,37 @@ def trace!(env, trace_id)
# handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
method = env[:method].to_s
url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
local_endpoint = Trace.default_endpoint # The rack middleware set this up for us.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, local_endpoint.ip_format) # The endpoint we are calling.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
Trace.tracer.with_new_span(trace_id, method.downcase) do |span|
@span = span # So we can record on exceptions
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint)
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)
response = @app.call(env).on_complete do |renv|
record_response_tags(span, renv[:status].to_s, local_endpoint)
record_response_tags(span, renv[:status].to_s)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
end
response
rescue Net::ReadTimeout
record_error(@span, 'Request timed out.', local_endpoint)
record_error(@span, 'Request timed out.')
raise
rescue Faraday::ConnectionFailed
record_error(@span, 'Request connection failed.', local_endpoint)
record_error(@span, 'Request connection failed.')
raise
rescue Faraday::ClientError
record_error(@span, 'Generic Faraday client error.', local_endpoint)
record_error(@span, 'Generic Faraday client error.')
raise
end

def record_error(span, msg, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::ERROR, msg, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
def record_error(span, msg)
span.record_tag(Trace::Span::Tag::ERROR, msg)
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

end
Expand Down
25 changes: 10 additions & 15 deletions lib/zipkin-tracer/hostname_resolver.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
require 'resolv'

module ZipkinTracer
# Resolves hostnames in the endpoints of the annotations.
# Resolves hostnames in the endpoints of the spans.
# Resolving hostnames is a very expensive operation. We want to store them raw in the main thread
# and resolve them in a different thread where we do not affect execution times.
class HostnameResolver
def spans_with_ips(spans)
host_to_ip = hosts_to_ipv4(spans)

each_annotation(spans) do |annotation|
hostname = annotation.host.ipv4
each_endpoint(spans) do |endpoint|
hostname = endpoint.ipv4
unless resolved_ip_address?(hostname.to_s)
annotation.host.ipv4 = host_to_ip[hostname]
endpoint.ipv4 = host_to_ip[hostname]
end
end
end
Expand All @@ -28,24 +28,19 @@ def resolved_ip_address?(ip_string)
ip_string.to_i.to_s == ip_string
end

def each_annotation(spans, &block)
def each_endpoint(spans, &block)
spans.each do |span|
span.annotations.each do |annotation|
yield annotation
end
span.binary_annotations.each do |annotation|
yield annotation
[span.local_endpoint, span.remote_endpoint].each do |endpoint|
yield endpoint if endpoint
end
end
end

# Annotations come in pairs like CS/CR, SS/SR.
# Each annnotation has a hostname so we, for sure, will have the same host multiple times.
# Using this to resolve only once per host
def hosts_to_ipv4(spans)
hosts = []
each_annotation(spans) do |annotation|
hosts.push(annotation.host)
each_endpoint(spans) do |endpoint|
hosts.push(endpoint)
end
hosts.uniq!
resolve(hosts)
Expand All @@ -61,7 +56,7 @@ def resolve(hosts)
end

def host_to_ip(hostname, ip_format)
ipv4 = begin
begin
ip_format == :string ? Socket.getaddrinfo(hostname, nil, :INET).first[IP_FIELD] : Trace::Endpoint.host_to_i32(hostname)
rescue
ip_format == :string ? LOCALHOST : LOCALHOST_I32
Expand Down
5 changes: 2 additions & 3 deletions lib/zipkin-tracer/rack/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class RackHandler
REQUEST_METHOD = Rack::REQUEST_METHOD rescue 'REQUEST_METHOD'.freeze

DEFAULT_SERVER_RECV_TAGS = {
Trace::BinaryAnnotation::PATH => PATH_INFO
Trace::Span::Tag::PATH => PATH_INFO
}.freeze

def initialize(app, config = nil)
Expand Down Expand Up @@ -55,12 +55,11 @@ def annotate_plugin(span, env, status, response_headers, response_body)

def trace!(span, zipkin_env, &block)
trace_request_information(span, zipkin_env)
span.record(Trace::Annotation::SERVER_RECV)
span.kind = Trace::Span::Kind::SERVER
span.record('whitelisted') if zipkin_env.force_sample?
status, headers, body = yield
ensure
annotate_plugin(span, zipkin_env.env, status, headers, body)
span.record(Trace::Annotation::SERVER_SEND)
end

def trace_request_information(span, zipkin_env)
Expand Down
8 changes: 5 additions & 3 deletions lib/zipkin-tracer/rack/zipkin_env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ def initialize(env, config)
end

def trace_id(default_flags = Trace::Flags::EMPTY)
trace_id, span_id, parent_span_id = retrieve_or_generate_ids
trace_id, span_id, parent_span_id, shared = retrieve_or_generate_ids
sampled = sampled_header_value(@env['HTTP_X_B3_SAMPLED'])
flags = (@env['HTTP_X_B3_FLAGS'] || default_flags).to_i
Trace::TraceId.new(trace_id, parent_span_id, span_id, sampled, flags)
Trace::TraceId.new(trace_id, parent_span_id, span_id, sampled, flags, shared)
end

def called_with_zipkin_headers?
Expand All @@ -33,12 +33,14 @@ def retrieve_or_generate_ids
if called_with_zipkin_headers?
trace_id, span_id = @env.values_at(*B3_REQUIRED_HEADERS)
parent_span_id = @env['HTTP_X_B3_PARENTSPANID']
shared = true
else
span_id = TraceGenerator.new.generate_id
trace_id = TraceGenerator.new.generate_id_from_span_id(span_id)
parent_span_id = nil
shared = false
end
[trace_id, span_id, parent_span_id]
[trace_id, span_id, parent_span_id, shared]
end

def new_sampled_header_value(sampled)
Expand Down
Loading

0 comments on commit 4ff2a58

Please sign in to comment.