From 3bb263fa964c5ab0e6d7a4c518835c1dce1e3382 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 11 May 2021 06:02:44 +0530 Subject: [PATCH] Added Span limits support to the tracing SDK - Added SpanLimits class - TracerProvider now optionally accepts an instance of SpanLimits which is passed all the way down to Spans. - Spans use the limits to created bounded lists or dicts for different fields. --- CHANGELOG.md | 2 + .../src/opentelemetry/sdk/trace/__init__.py | 115 +++++++++++++++--- .../src/opentelemetry/sdk/util/__init__.py | 34 +++--- opentelemetry-sdk/tests/test_util.py | 16 +++ opentelemetry-sdk/tests/trace/test_trace.py | 47 ++++++- 5 files changed, 180 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4fb4c5b200..d3d395f0b41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1823](https://github.com/open-telemetry/opentelemetry-python/pull/1823)) - Added support for OTEL_SERVICE_NAME. ([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829)) +- Lazily read/configure limits and allow limits to be unset. + ([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839)) ### Changed - Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index ece7da6d225..6fb2f7fd8a4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -58,13 +58,11 @@ logger = logging.getLogger(__name__) -SPAN_ATTRIBUTE_COUNT_LIMIT = int( - environ.get(OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, 128) -) - -_SPAN_EVENT_COUNT_LIMIT = int(environ.get(OTEL_SPAN_EVENT_COUNT_LIMIT, 128)) -_SPAN_LINK_COUNT_LIMIT = int(environ.get(OTEL_SPAN_LINK_COUNT_LIMIT, 128)) +_DEFAULT_SPAN_EVENTS_LIMIT = 128 +_DEFAULT_SPAN_LINKS_LIMIT = 128 +_DEFAULT_SPAN_ATTRIBUTES_LIMIT = 128 _VALID_ATTR_VALUE_TYPES = (bool, str, int, float) + # pylint: disable=protected-access _TRACE_SAMPLER = sampling._get_from_env_or_default() @@ -564,6 +562,81 @@ def _format_links(links): return f_links +class _Limits: + """The limits that should be enforce on recorded data such as events, links, attributes etc. + + This class does not enforce any limits itself. It only provides an a way read limits from env, + default values and in future from user provided arguments. + + All limit must be a either non-negative integers or None. + Setting a limit to ``None`` will not set an limits for that data type. + + Args: + max_events: Maximum number of events that can be added to a Span. + max_links: Maximum number of links that can be added to a Span. + max_attributes: Maximum number of attributes that can be added to a Span. + """ + + max_attributes: int + max_events: int + max_links: int + + def __init__( + self, + max_events: Optional[int] = None, + max_links: Optional[int] = None, + max_attributes: Optional[int] = None, + ): + self.max_attributes = max_attributes + self.max_events = max_events + self.max_links = max_links + + def __repr__(self): + return "max_attributes={}, max_events={}, max_links={}".format( + self.max_attributes, self.max_events, self.max_links + ) + + @classmethod + def _create(cls): + return cls( + max_attributes=cls._from_env( + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT + ), + max_events=cls._from_env( + OTEL_SPAN_EVENT_COUNT_LIMIT, _DEFAULT_SPAN_EVENTS_LIMIT + ), + max_links=cls._from_env( + OTEL_SPAN_LINK_COUNT_LIMIT, _DEFAULT_SPAN_LINKS_LIMIT + ), + ) + + @classmethod + def _from_env(cls, env_var: str, default: int) -> Optional[int]: + value = environ.get(env_var, "").strip().lower() + if not value: + return default + elif value == "unset": + return None + + err_msg = "{0} must be a non-negative number but got {1}".format( + env_var, value + ) + try: + value = int(value) + except ValueError: + raise ValueError(err_msg) + if value < 0: + raise ValueError(err_msg) + return value + + +_UnsetLimits = _Limits() + +SPAN_ATTRIBUTE_COUNT_LIMIT = _Limits._from_env( + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT +) + + class Span(trace_api.Span, ReadableSpan): """See `opentelemetry.trace.Span`. @@ -604,6 +677,7 @@ def __init__( links: Sequence[trace_api.Link] = (), kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, span_processor: SpanProcessor = SpanProcessor(), + limits: _Limits = _UnsetLimits, instrumentation_info: InstrumentationInfo = None, record_exception: bool = True, set_status_on_exception: bool = True, @@ -621,6 +695,7 @@ def __init__( self._record_exception = record_exception self._set_status_on_exception = set_status_on_exception self._span_processor = span_processor + self._limits = limits self._lock = threading.Lock() _filter_attribute_values(attributes) @@ -628,7 +703,7 @@ def __init__( self._attributes = self._new_attributes() else: self._attributes = BoundedDict.from_map( - SPAN_ATTRIBUTE_COUNT_LIMIT, attributes + self._limits.max_attributes, attributes ) self._events = self._new_events() @@ -644,24 +719,21 @@ def __init__( if links is None: self._links = self._new_links() else: - self._links = BoundedList.from_seq(_SPAN_LINK_COUNT_LIMIT, links) + self._links = BoundedList.from_seq(self._limits.max_links, links) def __repr__(self): return '{}(name="{}", context={})'.format( type(self).__name__, self._name, self._context ) - @staticmethod - def _new_attributes(): - return BoundedDict(SPAN_ATTRIBUTE_COUNT_LIMIT) + def _new_attributes(self): + return BoundedDict(self._limits.max_attributes) - @staticmethod - def _new_events(): - return BoundedList(_SPAN_EVENT_COUNT_LIMIT) + def _new_events(self): + return BoundedList(self._limits.max_events) - @staticmethod - def _new_links(): - return BoundedList(_SPAN_LINK_COUNT_LIMIT) + def _new_links(self): + return BoundedList(self._limits.max_links) def get_span_context(self): return self._context @@ -853,6 +925,11 @@ def __init__( self.span_processor = span_processor self.id_generator = id_generator self.instrumentation_info = instrumentation_info + self._span_limits = None + + def _with_limits(self, span_limits: _Limits) -> "Tracer": + self._span_limits = span_limits + return self @contextmanager def start_as_current_span( @@ -954,6 +1031,7 @@ def start_span( # pylint: disable=too-many-locals instrumentation_info=self.instrumentation_info, record_exception=record_exception, set_status_on_exception=set_status_on_exception, + limits=self._span_limits, ) span.start(start_time=start_time, parent_context=context) else: @@ -983,6 +1061,7 @@ def __init__( self.id_generator = id_generator self._resource = resource self.sampler = sampler + self._span_limits = _Limits._create() self._atexit_handler = None if shutdown_on_exit: self._atexit_handler = atexit.register(self.shutdown) @@ -1007,7 +1086,7 @@ def get_tracer( InstrumentationInfo( instrumenting_module_name, instrumenting_library_version ), - ) + )._with_limits(self._span_limits) def add_span_processor(self, span_processor: SpanProcessor) -> None: """Registers a new :class:`SpanProcessor` for this `TracerProvider`. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py index 1bb8ed264fc..c4c3d009017 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py @@ -16,6 +16,7 @@ import threading from collections import OrderedDict, deque from collections.abc import MutableMapping, Sequence +from typing import Optional def ns_to_iso_str(nanoseconds): @@ -45,7 +46,7 @@ class BoundedList(Sequence): not enough room. """ - def __init__(self, maxlen): + def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dq = deque(maxlen=maxlen) # type: deque self._lock = threading.Lock() @@ -67,21 +68,25 @@ def __iter__(self): def append(self, item): with self._lock: - if len(self._dq) == self._dq.maxlen: + if ( + self._dq.maxlen is not None + and len(self._dq) == self._dq.maxlen + ): self.dropped += 1 self._dq.append(item) def extend(self, seq): with self._lock: - to_drop = len(seq) + len(self._dq) - self._dq.maxlen - if to_drop > 0: - self.dropped += to_drop + if self._dq.maxlen is not None: + to_drop = len(seq) + len(self._dq) - self._dq.maxlen + if to_drop > 0: + self.dropped += to_drop self._dq.extend(seq) @classmethod def from_seq(cls, maxlen, seq): seq = tuple(seq) - if len(seq) > maxlen: + if maxlen is not None and len(seq) > maxlen: raise ValueError bounded_list = cls(maxlen) # pylint: disable=protected-access @@ -96,11 +101,12 @@ class BoundedDict(MutableMapping): added. """ - def __init__(self, maxlen): - if not isinstance(maxlen, int): - raise ValueError - if maxlen < 0: - raise ValueError + def __init__(self, maxlen: Optional[int]): + if maxlen is not None: + if not isinstance(maxlen, int): + raise ValueError + if maxlen < 0: + raise ValueError self.maxlen = maxlen self.dropped = 0 self._dict = OrderedDict() # type: OrderedDict @@ -116,13 +122,13 @@ def __getitem__(self, key): def __setitem__(self, key, value): with self._lock: - if self.maxlen == 0: + if self.maxlen is not None and self.maxlen == 0: self.dropped += 1 return if key in self._dict: del self._dict[key] - elif len(self._dict) == self.maxlen: + elif self.maxlen is not None and len(self._dict) == self.maxlen: del self._dict[next(iter(self._dict.keys()))] self.dropped += 1 self._dict[key] = value @@ -140,7 +146,7 @@ def __len__(self): @classmethod def from_map(cls, maxlen, mapping): mapping = OrderedDict(mapping) - if len(mapping) > maxlen: + if maxlen is not None and len(mapping) > maxlen: raise ValueError bounded_dict = cls(maxlen) # pylint: disable=protected-access diff --git a/opentelemetry-sdk/tests/test_util.py b/opentelemetry-sdk/tests/test_util.py index cacc77dbd98..7b37289613c 100644 --- a/opentelemetry-sdk/tests/test_util.py +++ b/opentelemetry-sdk/tests/test_util.py @@ -134,6 +134,14 @@ def test_extend_drop(self): self.assertEqual(len(blist), list_len) self.assertEqual(blist.dropped, len(other_list)) + def test_no_limit(self): + blist = BoundedList(maxlen=None) + for num in range(100): + blist.append(num) + + for num in range(100): + self.assertEqual(blist[num], num) + class TestBoundedDict(unittest.TestCase): base = collections.OrderedDict( @@ -211,3 +219,11 @@ def test_bounded_dict(self): with self.assertRaises(KeyError): _ = bdict["new-name"] + + def test_no_limit_code(self): + bdict = BoundedDict(maxlen=None) + for num in range(100): + bdict[num] = num + + for num in range(100): + self.assertEqual(bdict[num], num) diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index f12dc7c75cf..5a1376a5f41 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -18,6 +18,7 @@ import unittest from importlib import reload from logging import ERROR, WARNING +from random import randint from typing import Optional from unittest import mock @@ -1281,8 +1282,7 @@ class TestSpanLimits(unittest.TestCase): OTEL_SPAN_LINK_COUNT_LIMIT: "30", }, ) - def test_span_environment_limits(self): - reload(trace) + def test_span_limits_env(self): tracer = new_tracer() id_generator = RandomIdGenerator() some_links = [ @@ -1306,3 +1306,46 @@ def test_span_environment_limits(self): self.assertEqual(len(root.attributes), 10) self.assertEqual(len(root.events), 20) + + @mock.patch.dict( + "os.environ", + { + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: "unset", + OTEL_SPAN_EVENT_COUNT_LIMIT: "unset", + OTEL_SPAN_LINK_COUNT_LIMIT: "unset", + }, + ) + def test_span_no_limits_env(self): + # pylint: disable=protected-access + num_links = int(trace._DEFAULT_SPAN_LINKS_LIMIT) + randint(1, 100) + + tracer = new_tracer() + id_generator = RandomIdGenerator() + some_links = [ + trace_api.Link( + trace_api.SpanContext( + trace_id=id_generator.generate_trace_id(), + span_id=id_generator.generate_span_id(), + is_remote=False, + ) + ) + for _ in range(num_links) + ] + with tracer.start_as_current_span("root", links=some_links) as root: + self.assertEqual(len(root.links), num_links) + + num_events = int(trace._DEFAULT_SPAN_EVENTS_LIMIT) + randint(1, 100) + with tracer.start_as_current_span("root") as root: + for idx in range(num_events): + root.add_event("my_event_{}".format(idx)) + + self.assertEqual(len(root.events), num_events) + + num_attributes = int(trace._DEFAULT_SPAN_ATTRIBUTES_LIMIT) + randint( + 1, 100 + ) + with tracer.start_as_current_span("root") as root: + for idx in range(num_attributes): + root.set_attribute("my_attribute_{}".format(idx), 0) + + self.assertEqual(len(root.attributes), num_attributes)