Skip to content

Commit

Permalink
Added Span limits support to the tracing SDK
Browse files Browse the repository at this point in the history
- Added SpanLimits class
- TracerProvider now optionally accepts an instance of SpanLimits which
is passed all the way down to Spans.
- Spans use the limits to created bounded lists or dicts for different
fields.
  • Loading branch information
owais committed May 15, 2021
1 parent f816287 commit e7e77c1
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1823](https://github.com/open-telemetry/opentelemetry-python/pull/1823))
- Added support for OTEL_SERVICE_NAME.
([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829))
- Lazily read/configure limits and allow limits to be unset.
([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839))

### Changed
- Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint.
Expand Down
116 changes: 98 additions & 18 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@

logger = logging.getLogger(__name__)

SPAN_ATTRIBUTE_COUNT_LIMIT = int(
environ.get(OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, 128)
)

_SPAN_EVENT_COUNT_LIMIT = int(environ.get(OTEL_SPAN_EVENT_COUNT_LIMIT, 128))
_SPAN_LINK_COUNT_LIMIT = int(environ.get(OTEL_SPAN_LINK_COUNT_LIMIT, 128))
_DEFAULT_SPAN_EVENTS_LIMIT = 128
_DEFAULT_SPAN_LINKS_LIMIT = 128
_DEFAULT_SPAN_ATTRIBUTES_LIMIT = 128
_VALID_ATTR_VALUE_TYPES = (bool, str, int, float)

# pylint: disable=protected-access
_TRACE_SAMPLER = sampling._get_from_env_or_default()

Expand Down Expand Up @@ -564,6 +562,82 @@ def _format_links(links):
return f_links


class _Limits:
"""The limits that should be enforce on recorded data such as events, links, attributes etc.
This class does not enforce any limits itself. It only provides an a way read limits from env,
default values and in future from user provided arguments.
All limit must be a either non-negative integers or None.
Setting a limit to ``None`` will not set an limits for that data type.
Args:
max_events: Maximum number of events that can be added to a Span.
max_links: Maximum number of links that can be added to a Span.
max_attributes: Maximum number of attributes that can be added to a Span.
"""

max_attributes: int
max_events: int
max_links: int

def __init__(
self,
max_events: Optional[int] = None,
max_links: Optional[int] = None,
max_attributes: Optional[int] = None,
):
self.max_attributes = max_attributes
self.max_events = max_events
self.max_links = max_links

def __repr__(self):
return "max_attributes={}, max_events={}, max_links={}".format(
self.max_attributes, self.max_events, self.max_links
)

@classmethod
def _create(cls):
return cls(
max_attributes=cls._from_env(
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT
),
max_events=cls._from_env(
OTEL_SPAN_EVENT_COUNT_LIMIT, _DEFAULT_SPAN_EVENTS_LIMIT
),
max_links=cls._from_env(
OTEL_SPAN_LINK_COUNT_LIMIT, _DEFAULT_SPAN_LINKS_LIMIT
),
)

@classmethod
def _from_env(cls, env_var: str, default: int) -> Optional[int]:
value = environ.get(env_var, "").strip().lower()
if not value:
return default

if value == "unset":
return None

err_msg = "{0} must be a non-negative number but got {1}".format(
env_var, value
)
try:
value = int(value)
except ValueError:
raise ValueError(err_msg)
if value < 0:
raise ValueError(err_msg)
return value


_UnsetLimits = _Limits()

SPAN_ATTRIBUTE_COUNT_LIMIT = _Limits._from_env(
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT
)


class Span(trace_api.Span, ReadableSpan):
"""See `opentelemetry.trace.Span`.
Expand Down Expand Up @@ -604,6 +678,7 @@ def __init__(
links: Sequence[trace_api.Link] = (),
kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
span_processor: SpanProcessor = SpanProcessor(),
limits: _Limits = _UnsetLimits,
instrumentation_info: InstrumentationInfo = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
Expand All @@ -621,14 +696,15 @@ def __init__(
self._record_exception = record_exception
self._set_status_on_exception = set_status_on_exception
self._span_processor = span_processor
self._limits = limits
self._lock = threading.Lock()

_filter_attribute_values(attributes)
if not attributes:
self._attributes = self._new_attributes()
else:
self._attributes = BoundedDict.from_map(
SPAN_ATTRIBUTE_COUNT_LIMIT, attributes
self._limits.max_attributes, attributes
)

self._events = self._new_events()
Expand All @@ -644,24 +720,21 @@ def __init__(
if links is None:
self._links = self._new_links()
else:
self._links = BoundedList.from_seq(_SPAN_LINK_COUNT_LIMIT, links)
self._links = BoundedList.from_seq(self._limits.max_links, links)

def __repr__(self):
return '{}(name="{}", context={})'.format(
type(self).__name__, self._name, self._context
)

@staticmethod
def _new_attributes():
return BoundedDict(SPAN_ATTRIBUTE_COUNT_LIMIT)
def _new_attributes(self):
return BoundedDict(self._limits.max_attributes)

@staticmethod
def _new_events():
return BoundedList(_SPAN_EVENT_COUNT_LIMIT)
def _new_events(self):
return BoundedList(self._limits.max_events)

@staticmethod
def _new_links():
return BoundedList(_SPAN_LINK_COUNT_LIMIT)
def _new_links(self):
return BoundedList(self._limits.max_links)

def get_span_context(self):
return self._context
Expand Down Expand Up @@ -853,6 +926,11 @@ def __init__(
self.span_processor = span_processor
self.id_generator = id_generator
self.instrumentation_info = instrumentation_info
self._span_limits = None

def _with_limits(self, span_limits: _Limits) -> "Tracer":
self._span_limits = span_limits
return self

@contextmanager
def start_as_current_span(
Expand Down Expand Up @@ -954,6 +1032,7 @@ def start_span( # pylint: disable=too-many-locals
instrumentation_info=self.instrumentation_info,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
limits=self._span_limits,
)
span.start(start_time=start_time, parent_context=context)
else:
Expand Down Expand Up @@ -983,6 +1062,7 @@ def __init__(
self.id_generator = id_generator
self._resource = resource
self.sampler = sampler
self._span_limits = _Limits._create()
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)
Expand All @@ -1007,7 +1087,7 @@ def get_tracer(
InstrumentationInfo(
instrumenting_module_name, instrumenting_library_version
),
)
)._with_limits(self._span_limits)

def add_span_processor(self, span_processor: SpanProcessor) -> None:
"""Registers a new :class:`SpanProcessor` for this `TracerProvider`.
Expand Down
34 changes: 20 additions & 14 deletions opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
from collections import OrderedDict, deque
from collections.abc import MutableMapping, Sequence
from typing import Optional


def ns_to_iso_str(nanoseconds):
Expand Down Expand Up @@ -45,7 +46,7 @@ class BoundedList(Sequence):
not enough room.
"""

def __init__(self, maxlen):
def __init__(self, maxlen: Optional[int]):
self.dropped = 0
self._dq = deque(maxlen=maxlen) # type: deque
self._lock = threading.Lock()
Expand All @@ -67,21 +68,25 @@ def __iter__(self):

def append(self, item):
with self._lock:
if len(self._dq) == self._dq.maxlen:
if (
self._dq.maxlen is not None
and len(self._dq) == self._dq.maxlen
):
self.dropped += 1
self._dq.append(item)

def extend(self, seq):
with self._lock:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
if self._dq.maxlen is not None:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
self._dq.extend(seq)

@classmethod
def from_seq(cls, maxlen, seq):
seq = tuple(seq)
if len(seq) > maxlen:
if maxlen is not None and len(seq) > maxlen:
raise ValueError
bounded_list = cls(maxlen)
# pylint: disable=protected-access
Expand All @@ -96,11 +101,12 @@ class BoundedDict(MutableMapping):
added.
"""

def __init__(self, maxlen):
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
def __init__(self, maxlen: Optional[int]):
if maxlen is not None:
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
self.maxlen = maxlen
self.dropped = 0
self._dict = OrderedDict() # type: OrderedDict
Expand All @@ -116,13 +122,13 @@ def __getitem__(self, key):

def __setitem__(self, key, value):
with self._lock:
if self.maxlen == 0:
if self.maxlen is not None and self.maxlen == 0:
self.dropped += 1
return

if key in self._dict:
del self._dict[key]
elif len(self._dict) == self.maxlen:
elif self.maxlen is not None and len(self._dict) == self.maxlen:
del self._dict[next(iter(self._dict.keys()))]
self.dropped += 1
self._dict[key] = value
Expand All @@ -140,7 +146,7 @@ def __len__(self):
@classmethod
def from_map(cls, maxlen, mapping):
mapping = OrderedDict(mapping)
if len(mapping) > maxlen:
if maxlen is not None and len(mapping) > maxlen:
raise ValueError
bounded_dict = cls(maxlen)
# pylint: disable=protected-access
Expand Down
16 changes: 16 additions & 0 deletions opentelemetry-sdk/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ def test_extend_drop(self):
self.assertEqual(len(blist), list_len)
self.assertEqual(blist.dropped, len(other_list))

def test_no_limit(self):
blist = BoundedList(maxlen=None)
for num in range(100):
blist.append(num)

for num in range(100):
self.assertEqual(blist[num], num)


class TestBoundedDict(unittest.TestCase):
base = collections.OrderedDict(
Expand Down Expand Up @@ -211,3 +219,11 @@ def test_bounded_dict(self):

with self.assertRaises(KeyError):
_ = bdict["new-name"]

def test_no_limit_code(self):
bdict = BoundedDict(maxlen=None)
for num in range(100):
bdict[num] = num

for num in range(100):
self.assertEqual(bdict[num], num)
47 changes: 45 additions & 2 deletions opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import unittest
from importlib import reload
from logging import ERROR, WARNING
from random import randint
from typing import Optional
from unittest import mock

Expand Down Expand Up @@ -1281,8 +1282,7 @@ class TestSpanLimits(unittest.TestCase):
OTEL_SPAN_LINK_COUNT_LIMIT: "30",
},
)
def test_span_environment_limits(self):
reload(trace)
def test_span_limits_env(self):
tracer = new_tracer()
id_generator = RandomIdGenerator()
some_links = [
Expand All @@ -1306,3 +1306,46 @@ def test_span_environment_limits(self):

self.assertEqual(len(root.attributes), 10)
self.assertEqual(len(root.events), 20)

@mock.patch.dict(
"os.environ",
{
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: "unset",
OTEL_SPAN_EVENT_COUNT_LIMIT: "unset",
OTEL_SPAN_LINK_COUNT_LIMIT: "unset",
},
)
def test_span_no_limits_env(self):
# pylint: disable=protected-access
num_links = int(trace._DEFAULT_SPAN_LINKS_LIMIT) + randint(1, 100)

tracer = new_tracer()
id_generator = RandomIdGenerator()
some_links = [
trace_api.Link(
trace_api.SpanContext(
trace_id=id_generator.generate_trace_id(),
span_id=id_generator.generate_span_id(),
is_remote=False,
)
)
for _ in range(num_links)
]
with tracer.start_as_current_span("root", links=some_links) as root:
self.assertEqual(len(root.links), num_links)

num_events = int(trace._DEFAULT_SPAN_EVENTS_LIMIT) + randint(1, 100)
with tracer.start_as_current_span("root") as root:
for idx in range(num_events):
root.add_event("my_event_{}".format(idx))

self.assertEqual(len(root.events), num_events)

num_attributes = int(trace._DEFAULT_SPAN_ATTRIBUTES_LIMIT) + randint(
1, 100
)
with tracer.start_as_current_span("root") as root:
for idx in range(num_attributes):
root.set_attribute("my_attribute_{}".format(idx), 0)

self.assertEqual(len(root.attributes), num_attributes)

0 comments on commit e7e77c1

Please sign in to comment.