Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Zipkin v2 span format #141

Merged
merged 4 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.33.0
* Switch to Zipkin v2 span format

# 0.32.4
* Remove the ':service_port' configuration.
* Fix 'sa' annotation encoding.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ It can be used to measure the performance of process, record value of variables,

When `local_component_span` method is called, it creates a new span and a local component, and provides the following methods to create annotations.
* record(key) - annotation
* record_tag(key, value) - binary annotation
* record_tag(key, value) - tag

Example:
```ruby
Expand All @@ -105,7 +105,7 @@ Only one of the following tracers can be used at a given time.

Sends traces as JSON over HTTP. This is the preferred tracer to use as the openzipkin project moves away from Thrift.

You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v1/spans` path.
You need to specify the `:json_api_host` parameter to wherever your zipkin collector is running. It will POST traces to the `/api/v2/spans` path.


### Kafka
Expand Down
28 changes: 9 additions & 19 deletions lib/zipkin-tracer/excon/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def request_call(datum)
def response_call(datum)
if span = datum[:span]
status = response_status(datum)
if status
record_response_tags(span, status, local_endpoint)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
record_response_tags(span, status) if status
Trace.tracer.end_span(span)
end

Expand All @@ -53,12 +50,8 @@ def b3_headers
}
end

def local_endpoint
Trace.default_endpoint # The rack middleware set this up for us.
end

def remote_endpoint(url, service_name)
Trace::Endpoint.remote_endpoint(url, service_name, local_endpoint.ip_format) # The endpoint we are calling.
Trace::Endpoint.remote_endpoint(url, service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
end

def service_name(datum, default)
Expand All @@ -69,12 +62,9 @@ def response_status(datum)
datum[:response] && datum[:response][:status] && datum[:response][:status].to_s
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

def trace!(datum, trace_id)
Expand All @@ -85,10 +75,10 @@ def trace!(datum, trace_id)

span = Trace.tracer.start_span(trace_id, method.downcase)
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint(url, service_name))
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint(url, service_name)
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)

# store the span in the datum hash so it can be used in the response_call
datum[:span] = span
Expand Down
33 changes: 14 additions & 19 deletions lib/zipkin-tracer/faraday/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,37 @@ def trace!(env, trace_id)
# handle either a URI object (passed by Faraday v0.8.x in testing), or something string-izable
method = env[:method].to_s
url = env[:url].respond_to?(:host) ? env[:url] : URI.parse(env[:url].to_s)
local_endpoint = Trace.default_endpoint # The rack middleware set this up for us.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, local_endpoint.ip_format) # The endpoint we are calling.
remote_endpoint = Trace::Endpoint.remote_endpoint(url, @service_name, Trace.default_endpoint.ip_format) # The endpoint we are calling.
Trace.tracer.with_new_span(trace_id, method.downcase) do |span|
@span = span # So we can record on exceptions
# annotate with method (GET/POST/etc.) and uri path
span.record_tag(Trace::BinaryAnnotation::METHOD, method.upcase, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::PATH, url.path, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::SERVER_ADDRESS, SERVER_ADDRESS_SPECIAL_VALUE, Trace::BinaryAnnotation::Type::BOOL, remote_endpoint)
span.record(Trace::Annotation::CLIENT_SEND, local_endpoint)
span.kind = Trace::Span::Kind::CLIENT
span.remote_endpoint = remote_endpoint
span.record_tag(Trace::Span::Tag::METHOD, method.upcase)
span.record_tag(Trace::Span::Tag::PATH, url.path)
response = @app.call(env).on_complete do |renv|
record_response_tags(span, renv[:status].to_s, local_endpoint)
record_response_tags(span, renv[:status].to_s)
end
span.record(Trace::Annotation::CLIENT_RECV, local_endpoint)
end
response
rescue Net::ReadTimeout
record_error(@span, 'Request timed out.', local_endpoint)
record_error(@span, 'Request timed out.')
raise
rescue Faraday::ConnectionFailed
record_error(@span, 'Request connection failed.', local_endpoint)
record_error(@span, 'Request connection failed.')
raise
rescue Faraday::ClientError
record_error(@span, 'Generic Faraday client error.', local_endpoint)
record_error(@span, 'Generic Faraday client error.')
raise
end

def record_error(span, msg, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::ERROR, msg, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
def record_error(span, msg)
span.record_tag(Trace::Span::Tag::ERROR, msg)
end

def record_response_tags(span, status, local_endpoint)
span.record_tag(Trace::BinaryAnnotation::STATUS, status, Trace::BinaryAnnotation::Type::STRING, local_endpoint)
if STATUS_ERROR_REGEXP.match(status)
span.record_tag(Trace::BinaryAnnotation::ERROR, status,
Trace::BinaryAnnotation::Type::STRING, local_endpoint)
end
def record_response_tags(span, status)
span.record_tag(Trace::Span::Tag::STATUS, status)
span.record_tag(Trace::Span::Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end

end
Expand Down
25 changes: 10 additions & 15 deletions lib/zipkin-tracer/hostname_resolver.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
require 'resolv'

module ZipkinTracer
# Resolves hostnames in the endpoints of the annotations.
# Resolves hostnames in the endpoints of the spans.
# Resolving hostnames is a very expensive operation. We want to store them raw in the main thread
# and resolve them in a different thread where we do not affect execution times.
class HostnameResolver
def spans_with_ips(spans)
host_to_ip = hosts_to_ipv4(spans)

each_annotation(spans) do |annotation|
hostname = annotation.host.ipv4
each_endpoint(spans) do |endpoint|
hostname = endpoint.ipv4
unless resolved_ip_address?(hostname.to_s)
annotation.host.ipv4 = host_to_ip[hostname]
endpoint.ipv4 = host_to_ip[hostname]
end
end
end
Expand All @@ -28,24 +28,19 @@ def resolved_ip_address?(ip_string)
ip_string.to_i.to_s == ip_string
end

def each_annotation(spans, &block)
def each_endpoint(spans, &block)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment about what this is doing? I'm guessing it is lazy parsing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is explaining:

# Resolving hostnames is a very expensive operation. We want to store them raw in the main thread
# and resolve them in a different thread where we do not affect execution times.

spans.each do |span|
span.annotations.each do |annotation|
yield annotation
end
span.binary_annotations.each do |annotation|
yield annotation
[span.local_endpoint, span.remote_endpoint].each do |endpoint|
yield endpoint if endpoint
end
end
end

# Annotations come in pairs like CS/CR, SS/SR.
# Each annnotation has a hostname so we, for sure, will have the same host multiple times.
# Using this to resolve only once per host
def hosts_to_ipv4(spans)
hosts = []
each_annotation(spans) do |annotation|
hosts.push(annotation.host)
each_endpoint(spans) do |endpoint|
hosts.push(endpoint)
end
hosts.uniq!
resolve(hosts)
Expand All @@ -61,7 +56,7 @@ def resolve(hosts)
end

def host_to_ip(hostname, ip_format)
ipv4 = begin
begin
ip_format == :string ? Socket.getaddrinfo(hostname, nil, :INET).first[IP_FIELD] : Trace::Endpoint.host_to_i32(hostname)
rescue
ip_format == :string ? LOCALHOST : LOCALHOST_I32
Expand Down
5 changes: 2 additions & 3 deletions lib/zipkin-tracer/rack/zipkin-tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class RackHandler
REQUEST_METHOD = Rack::REQUEST_METHOD rescue 'REQUEST_METHOD'.freeze

DEFAULT_SERVER_RECV_TAGS = {
Trace::BinaryAnnotation::PATH => PATH_INFO
Trace::Span::Tag::PATH => PATH_INFO
}.freeze

def initialize(app, config = nil)
Expand Down Expand Up @@ -55,12 +55,11 @@ def annotate_plugin(span, env, status, response_headers, response_body)

def trace!(span, zipkin_env, &block)
trace_request_information(span, zipkin_env)
span.record(Trace::Annotation::SERVER_RECV)
span.kind = Trace::Span::Kind::SERVER
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to tell if the incoming span ID was shared (from b3 headers)? if so, add span.shared=true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in 1139cc5

span.record('whitelisted') if zipkin_env.force_sample?
status, headers, body = yield
ensure
annotate_plugin(span, zipkin_env.env, status, headers, body)
span.record(Trace::Annotation::SERVER_SEND)
end

def trace_request_information(span, zipkin_env)
Expand Down
85 changes: 33 additions & 52 deletions lib/zipkin-tracer/trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,17 @@ def default_endpoint
# Moved here as a first step, eventually move them out of the Trace module

class Annotation
CLIENT_SEND = "cs"
CLIENT_RECV = "cr"
SERVER_SEND = "ss"
SERVER_RECV = "sr"

attr_reader :value, :host, :timestamp
def initialize(value, host)
attr_reader :value, :timestamp

def initialize(value)
@timestamp = (Time.now.to_f * 1000 * 1000).to_i # micros
@value = value
@host = host
end

def to_h
{
value: @value,
timestamp: @timestamp,
endpoint: host.to_h
}
end
end

class BinaryAnnotation
SERVER_ADDRESS = 'sa'.freeze
URI = 'http.url'.freeze
METHOD = 'http.method'.freeze
PATH = 'http.path'.freeze
STATUS = 'http.status'.freeze
LOCAL_COMPONENT = 'lc'.freeze
ERROR = 'error'.freeze

module Type
BOOL = "BOOL"
STRING = "STRING"
end
attr_reader :key, :value, :host

def initialize(key, value, annotation_type, host)
@key = key
@value = value
@annotation_type = annotation_type
@host = host
end

def to_h
{
key: @key,
value: @value,
endpoint: host.to_h
timestamp: @timestamp
}
end
end
Expand Down Expand Up @@ -199,12 +162,29 @@ def to_i; @i128; end

# A span may contain many annotations
class Span
attr_accessor :name, :annotations, :binary_annotations, :debug
module Tag
METHOD = "http.method".freeze
PATH = "http.path".freeze
STATUS = "http.status".freeze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, it should be http.status_code though possibly was always incorrect here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated d4b6254

LOCAL_COMPONENT = "lc".freeze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ls is no longer needed unless there's a value for it. sometimes we had "lc"->"" just to add an endpoint to a span.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are still using it in our library. Added TODO in 2cd021e

ERROR = "error".freeze
end

module Kind
CLIENT = "CLIENT".freeze
SERVER = "SERVER".freeze
end

attr_accessor :name, :kind, :local_endpoint, :remote_endpoint, :annotations, :tags, :debug

def initialize(name, span_id)
@name = name
@span_id = span_id
@kind = nil
@local_endpoint = nil
@remote_endpoint = nil
@annotations = []
@binary_annotations = []
@tags = {}
@debug = span_id.debug?
@timestamp = to_microseconds(Time.now)
@duration = UNKNOWN_DURATION
Expand All @@ -219,28 +199,30 @@ def to_h
name: @name,
traceId: @span_id.trace_id.to_s,
id: @span_id.span_id.to_s,
annotations: @annotations.map(&:to_h),
binaryAnnotations: @binary_annotations.map(&:to_h),
localEndpoint: @local_endpoint.to_h,
timestamp: @timestamp,
duration: @duration,
debug: @debug
}
h[:parentId] = @span_id.parent_id.to_s unless @span_id.parent_id.nil?
h[:kind] = @kind unless @kind.nil?
h[:remoteEndpoint] = @remote_endpoint.to_h unless @remote_endpoint.nil?
h[:annotations] = @annotations.map(&:to_h) unless @annotations.empty?
h[:tags] = @tags unless @tags.empty?
h
end

# We record information into spans, then we send these spans to zipkin
def record(value, endpoint = Trace.default_endpoint)
annotations << Trace::Annotation.new(value.to_s, endpoint)
def record(value)
annotations << Trace::Annotation.new(value.to_s)
end

def record_tag(key, value, type = Trace::BinaryAnnotation::Type::STRING, endpoint = Trace.default_endpoint)
value = value.to_s if type == Trace::BinaryAnnotation::Type::STRING
binary_annotations << Trace::BinaryAnnotation.new(key, value, type, endpoint)
def record_tag(key, value)
@tags[key] = value
end

def record_local_component(value)
record_tag(BinaryAnnotation::LOCAL_COMPONENT, value)
record_tag(Tag::LOCAL_COMPONENT, value)
end

def has_parent_span?
Expand Down Expand Up @@ -293,6 +275,5 @@ def to_h
hsh[:port] = port if port
hsh
end

end
end
2 changes: 1 addition & 1 deletion lib/zipkin-tracer/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module ZipkinTracer
VERSION = '0.32.4'.freeze
VERSION = '0.33.0'.freeze
end
2 changes: 1 addition & 1 deletion lib/zipkin-tracer/zipkin_json_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class AsyncJsonApiClient
include SuckerPunch::Job
SPANS_PATH = '/api/v1/spans'
SPANS_PATH = '/api/v2/spans'

def perform(json_api_host, spans)
spans_with_ips = ::ZipkinTracer::HostnameResolver.new.spans_with_ips(spans).map(&:to_h)
Expand Down
7 changes: 3 additions & 4 deletions lib/zipkin-tracer/zipkin_tracer_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ def with_new_span(trace_id, name)

def end_span(span)
span.close
# If in a thread not handling incoming http requests, it will not have Annotation::SERVER_SEND, so the span
# If in a thread not handling incoming http requests, it will not have Kind::SERVER, so the span
# will never be flushed and will cause memory leak.
# It will have CLIENT_SEND and CLIENT_RECV if the thread sends out http requests, so use CLIENT_RECV as the sign
# to flush the span.
# If no parent span, then current span needs to flush when it ends.
if !span.has_parent_span? || span.annotations.any? { |ann| ann.value == Annotation::SERVER_SEND }
if !span.has_parent_span? || span.kind == Trace::Span::Kind::SERVER
flush!
reset
end
end

def start_span(trace_id, name)
span = Span.new(name, trace_id)
span.local_endpoint = Trace.default_endpoint
store_span(trace_id, span)
span
end
Expand Down
Loading