-
Notifications
You must be signed in to change notification settings - Fork 417
/
Copy pathcontext.py
144 lines (124 loc) · 5.28 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import logging
import threading
log = logging.getLogger(__name__)
class Context(object):
"""
Context is used to keep track of a hierarchy of spans for the current
execution flow. During each logical execution, the same ``Context`` is
used to represent a single logical trace, even if the trace is built
asynchronously.
A single code execution may use multiple ``Context`` if part of the execution
must not be related to the current tracing. As example, a delayed job may
compose a standalone trace instead of being related to the same trace that
generates the job itself. On the other hand, if it's part of the same
``Context``, it will be related to the original trace.
This data structure is thread-safe.
"""
def __init__(self):
"""
Initialize a new thread-safe ``Context``.
"""
self._trace = []
self._sampled = False
self._finished_spans = 0
self._current_span = None
self._lock = threading.Lock()
def get_current_span(self):
"""
Return the last active span that corresponds to the last inserted
item in the trace list. This cannot be considered as the current active
span in asynchronous environments, because some spans can be closed
earlier while child spans still need to finish their traced execution.
"""
with self._lock:
return self._current_span
def add_span(self, span):
"""
Add a span to the context trace list, keeping it as the last active span.
"""
with self._lock:
self._current_span = span
self._sampled = span.sampled
self._trace.append(span)
span._context = self
def close_span(self, span):
"""
Mark a span as a finished, increasing the internal counter to prevent
cycles inside _trace list.
"""
with self._lock:
self._finished_spans += 1
self._current_span = span._parent
# notify if the trace is not closed properly; this check is executed only
# if the tracer debug_logging is enabled and when the root span is closed
# for an unfinished trace. This logging is meant to be used for debugging
# reasons, and it doesn't mean that the trace is wrongly generated.
# In asynchronous environments, it's legit to close the root span before
# some children. On the other hand, asynchronous web frameworks still expect
# to close the root span after all the children.
tracer = getattr(span, '_tracer', None)
if tracer and tracer.debug_logging and span._parent is None and not self._is_finished():
opened_spans = len(self._trace) - self._finished_spans
log.debug('Root span "%s" closed, but the trace has %d unfinished spans:', span.name, opened_spans)
spans = [x for x in self._trace if not x._finished]
for wrong_span in spans:
log.debug('\n%s', wrong_span.pprint())
def is_finished(self):
"""
Returns if the trace for the current Context is finished or not. A Context
is considered finished if all spans in this context are finished.
"""
with self._lock:
return self._is_finished()
def is_sampled(self):
"""
Returns if the ``Context`` contains sampled spans.
"""
with self._lock:
return self._sampled
def get(self):
"""
Returns a tuple containing the trace list generated in the current context and
if the context is sampled or not. It returns (None, None) if the ``Context`` is
not finished. If a trace is returned, the ``Context`` will be reset so that it
can be re-used immediately.
This operation is thread-safe.
"""
with self._lock:
if self._is_finished():
trace = self._trace
sampled = self._sampled
# clean the current state
self._trace = []
self._sampled = False
self._finished_spans = 0
self._current_span = None
return trace, sampled
else:
return None, None
def _is_finished(self):
"""
Internal method that checks if the ``Context`` is finished or not.
"""
num_traces = len(self._trace)
return num_traces > 0 and num_traces == self._finished_spans
class ThreadLocalContext(object):
"""
ThreadLocalContext can be used as a tracer global reference to create
a different ``Context`` for each thread. In synchronous tracer, this
is required to prevent multiple threads sharing the same ``Context``
in different executions.
"""
def __init__(self):
self._locals = threading.local()
def set(self, ctx):
setattr(self._locals, 'context', ctx)
def get(self):
ctx = getattr(self._locals, 'context', None)
if not ctx:
# create a new Context if it's not available; this action
# is done once because the Context has the reset() method
# to reuse the same instance
ctx = Context()
self._locals.context = ctx
return ctx