-
Notifications
You must be signed in to change notification settings - Fork 38
/
trace.rb
293 lines (245 loc) · 7.95 KB
/
trace.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# frozen_string_literal: true
require 'zipkin-tracer/zipkin_sender_base'
require 'zipkin-tracer/trace_container'
# Most of this code is copied from Finagle
# https://github.com/twitter/finagle/blob/finagle-6.39.0/finagle-thrift/src/main/ruby/lib/finagle-thrift/trace.rb
# But moved and improved here.
module Trace
# These methods and attr_accessor below are used as global configuration of this gem
# Most of these are set by the config class and then used around.
# TODO: Move this out of the Trace module , take out that extend self and be happier
extend self
attr_accessor :trace_id_128bit, :write_b3_single_format
# This method is deprecated, please use TraceGenerator.current
# Note that this method will always return a trace, it will
# generate a new one if none was available.
def id
ZipkinTracer::TraceGenerator.new.current
end
def self.tracer
@tracer
end
def self.sample_rate
@sample_rate
end
def self.tracer=(tracer)
@tracer = tracer
end
def self.sample_rate=(sample_rate)
if sample_rate > 1 || sample_rate < 0
raise ArgumentError.new("sample rate must be [0,1]")
end
@sample_rate = sample_rate
end
def default_endpoint=(endpoint)
@default_endpoint = endpoint
end
def default_endpoint
@default_endpoint
end
# These classes all come from Finagle-thrift + some needed modifications (.to_h)
# Moved here as a first step, eventually move them out of the Trace module
class Annotation
attr_reader :value, :timestamp
def initialize(value)
@timestamp = (Time.now.to_f * 1000 * 1000).to_i # micros
@value = value
end
def to_h
{
value: @value,
timestamp: @timestamp
}
end
end
class Flags
# no flags set
EMPTY = 0
# the debug flag is used to ensure we pass all the sampling layers and that the trace is stored
DEBUG = 1
end
class SpanId
HEX_REGEX = /^[a-f0-9]{16,32}$/i
MAX_SIGNED_I64 = 9223372036854775807
MASK = (2 ** 64) - 1
def self.from_value(v)
if v.is_a?(String) && v =~ HEX_REGEX
# drops any bits higher than 64 by selecting right-most 16 characters
new(v.length > 16 ? v[v.length - 16, 16].hex : v.hex)
elsif v.is_a?(Numeric)
new(v)
elsif v.is_a?(SpanId)
v
end
end
def initialize(value)
@value = value
@i64 = if @value > MAX_SIGNED_I64
-1 * ((@value ^ MASK) + 1)
else
@value
end
end
def to_s; "%016x" % @value; end
def to_i; @i64; end
end
# A TraceId contains all the information of a given trace id
class TraceId
attr_reader :trace_id, :parent_id, :span_id, :sampled, :flags, :shared
def initialize(trace_id, parent_id, span_id, sampled, flags, shared = false)
@trace_id = Trace.trace_id_128bit ? TraceId128Bit.from_value(trace_id) : SpanId.from_value(trace_id)
@parent_id = parent_id.nil? ? nil : SpanId.from_value(parent_id)
@span_id = SpanId.from_value(span_id)
@sampled = sampled
@flags = flags
@shared = shared
end
def next_id
TraceId.new(trace_id, span_id, ZipkinTracer::TraceGenerator.new.generate_id, sampled, flags)
end
# the debug flag is used to ensure the trace passes ALL samplers
def debug?
flags & Flags::DEBUG == Flags::DEBUG
end
def sampled?
debug? || %w[1 true].include?(sampled)
end
def to_s
"TraceId(trace_id = #{trace_id}, parent_id = #{parent_id}, span_id = #{span_id}," \
" sampled = #{sampled}, flags = #{flags}, shared = #{shared})"
end
end
class TraceId128Bit < SpanId
HEX_REGEX_16 = /^[a-f0-9]{16}$/i
HEX_REGEX_32 = /^[a-f0-9]{32}$/i
MAX_SIGNED_I128 = (2 ** 128 / 2) -1
MASK = (2 ** 128) - 1
def self.from_value(v)
if v.is_a?(String) && v =~ HEX_REGEX_16
SpanId.new(v.hex)
elsif v.is_a?(String) && v =~ HEX_REGEX_32
new(v.hex)
elsif v.is_a?(Numeric)
new(v)
elsif v.is_a?(SpanId)
v
end
end
def initialize(value)
@value = value
@i128 = if @value > MAX_SIGNED_I128
-1 * ((@value ^ MASK) + 1)
else
@value
end
end
def to_s; '%032x' % @value; end
def to_i; @i128; end
end
# A span may contain many annotations
class Span
module Tag
METHOD = "http.method"
PATH = "http.path"
STATUS = "http.status_code"
LOCAL_COMPONENT = "lc" # TODO: Remove LOCAL_COMPONENT and related methods when no longer needed
ERROR = "error"
end
module Kind
CLIENT = "CLIENT"
SERVER = "SERVER"
# When present, "timestamp" is the moment a producer sent a message to a destination.
# "duration" represents delay sending the message, such as batching, while
# "remote_endpoint" indicates the destination, such as a broker.
#
# Unlike CLIENT, messaging spans never share a span ID. For example, the
# CONSUMER of the same message has "parent_id" set to this span's id.
PRODUCER = "PRODUCER"
# When present, "timestamp" is the moment a consumer received a message from an origin.
# "duration" represents delay consuming the message, such as from backlog,
# while "remote_endpoint" indicates the origin, such as a broker.
#
# Unlike SERVER, messaging spans never share a span ID. For example, the
# PRODUCER of this message is the "parent_id" of this span.
CONSUMER = "CONSUMER"
end
attr_accessor :name, :kind, :local_endpoint, :remote_endpoint, :annotations, :tags, :debug
def initialize(name, span_id, timestamp = Time.now)
@name = name
@span_id = span_id
@kind = nil
@local_endpoint = nil
@remote_endpoint = nil
@annotations = []
@tags = {}
@debug = span_id.debug?
@timestamp = to_microseconds(timestamp)
@duration = UNKNOWN_DURATION
end
def close(timestamp = Time.now)
@duration = to_microseconds(timestamp) - @timestamp
end
def to_h
h = {
name: @name,
traceId: @span_id.trace_id.to_s,
id: @span_id.span_id.to_s,
localEndpoint: @local_endpoint.to_h,
timestamp: @timestamp,
duration: @duration,
debug: @debug
}
h[:parentId] = @span_id.parent_id.to_s unless @span_id.parent_id.nil?
h[:kind] = @kind unless @kind.nil?
h[:remoteEndpoint] = @remote_endpoint.to_h unless @remote_endpoint.nil?
h[:annotations] = @annotations.map(&:to_h) unless @annotations.empty?
h[:tags] = @tags unless @tags.empty?
h[:shared] = true if @span_id.shared && @kind == Kind::SERVER
h
end
# We record information into spans, then we send these spans to zipkin
def record(value)
annotations << Trace::Annotation.new(value.to_s)
end
def record_tag(key, value)
@tags[key] = value.to_s
end
def record_local_component(value)
record_tag(Tag::LOCAL_COMPONENT, value)
end
def has_parent_span?
!@span_id.parent_id.nil?
end
STATUS_ERROR_REGEXP = /\A(4.*|5.*)\z/.freeze
def record_status(status)
return if status.nil?
status = status.to_s
record_tag(Tag::STATUS, status)
record_tag(Tag::ERROR, status) if STATUS_ERROR_REGEXP.match(status)
end
private
UNKNOWN_DURATION = 0 # mark duration was not set
def to_microseconds(time)
(time.to_f * 1_000_000).to_i
end
end
class Endpoint < Struct.new(:ipv4, :port, :service_name)
UNKNOWN_URL = 'unknown'.freeze
def self.local_endpoint(service_name)
hostname = Socket.gethostname
Endpoint.new(hostname, nil, service_name)
end
def self.remote_endpoint(url, remote_service_name)
service_name = remote_service_name || url&.host&.split('.')&.first || UNKNOWN_URL # default to url-derived service name
Endpoint.new(url&.host, url&.port, service_name)
end
def to_h
hsh = {
ipv4: ipv4,
serviceName: service_name
}
hsh[:port] = port if port
hsh
end
end
end