-
Notifications
You must be signed in to change notification settings - Fork 377
/
Copy pathsubscription.rb
159 lines (130 loc) · 4.46 KB
/
subscription.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
module Datadog
module Contrib
module ActiveSupport
module Notifications
# An ActiveSupport::Notification subscription that wraps events with tracing.
class Subscription
attr_accessor \
:span_name,
:options
def initialize(tracer, span_name, options, &block)
raise ArgumentError, 'Must be given a block!' unless block_given?
@tracer = tracer
@span_name = span_name
@options = options
@handler = Handler.new(&block)
@callbacks = Callbacks.new
end
def tracer
@tracer.is_a?(Proc) ? @tracer.call : @tracer
end
# ActiveSupport 3.x calls this
def call(name, start, finish, id, payload)
start_span(name, id, payload, start)
finish_span(name, id, payload, finish)
end
# ActiveSupport 4+ calls this on start
def start(name, id, payload)
start_span(name, id, payload)
end
# ActiveSupport 4+ calls this on finish
def finish(name, id, payload)
finish_span(name, id, payload)
end
def before_trace(&block)
callbacks.add(:before_trace, &block) if block_given?
end
def after_trace(&block)
callbacks.add(:after_trace, &block) if block_given?
end
def subscribe(pattern)
return false if subscribers.key?(pattern)
subscribers[pattern] = ::ActiveSupport::Notifications.subscribe(pattern, self)
true
end
def unsubscribe(pattern)
return false unless subscribers.key?(pattern)
::ActiveSupport::Notifications.unsubscribe(subscribers[pattern])
subscribers.delete(pattern)
true
end
def unsubscribe_all
return false if subscribers.empty?
subscribers.keys.each { |pattern| unsubscribe(pattern) }
true
end
protected
attr_reader \
:handler,
:callbacks
def start_span(name, id, payload, start = nil)
# Run callbacks
callbacks.run(name, :before_trace, id, payload, start)
# Start a trace
tracer.trace(@span_name, @options).tap do |span|
# Assign start time if provided
span.start_time = start unless start.nil?
payload[:datadog_span] = span
end
end
def finish_span(name, id, payload, finish = nil)
payload[:datadog_span].tap do |span|
# If no active span, return.
return nil if span.nil?
# Run handler for event
handler.run(span, name, id, payload)
# Finish the span
span.finish(finish)
# Run callbacks
callbacks.run(name, :after_trace, span, id, payload, finish)
end
end
# Pattern => ActiveSupport:Notifications::Subscribers
def subscribers
@subscribers ||= {}
end
# Wrapper for subscription handler
class Handler
attr_reader :block
def initialize(&block)
@block = block
end
def run(span, name, id, payload)
run!(span, name, id, payload)
rescue StandardError => e
Datadog.logger.debug("ActiveSupport::Notifications handler for '#{name}' failed: #{e.message}")
end
def run!(*args)
@block.call(*args)
end
end
# Wrapper for subscription callbacks
class Callbacks
attr_reader :blocks
def initialize
@blocks = {}
end
def add(key, &block)
blocks_for(key) << block if block_given?
end
def run(event, key, *args)
blocks_for(key).each do |callback|
begin
callback.call(event, key, *args)
rescue StandardError => e
Datadog.logger.debug(
"ActiveSupport::Notifications '#{key}' callback for '#{event}' failed: #{e.message}"
)
end
end
end
private
def blocks_for(key)
blocks[key] ||= []
end
end
end
end
end
end
end