From 3dbbd1b7b2574a4c51254b6d7abb76f8097869d8 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 25 May 2021 09:29:59 +0530 Subject: [PATCH] Allow users to "unset" SDK limits and evaluate default limits lazily instead of on import (#1839) --- CHANGELOG.md | 2 + .../src/opentelemetry/sdk/trace/__init__.py | 120 +++++++++++++++--- .../src/opentelemetry/sdk/util/__init__.py | 30 +++-- opentelemetry-sdk/tests/test_util.py | 16 +++ opentelemetry-sdk/tests/trace/test_trace.py | 95 +++++++++++++- 5 files changed, 229 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed90e63bc85..5477ae7940d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1823](https://github.com/open-telemetry/opentelemetry-python/pull/1823)) - Added support for OTEL_SERVICE_NAME. ([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829)) +- Lazily read/configure limits and allow limits to be unset. + ([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839)) ### Changed - Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index cc0114a22de..c202f5cbea1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -56,19 +56,16 @@ from opentelemetry.sdk.util import BoundedDict, BoundedList from opentelemetry.sdk.util.instrumentation import InstrumentationInfo from opentelemetry.trace import SpanContext -from opentelemetry.trace.propagation import SPAN_KEY from opentelemetry.trace.status import Status, StatusCode from opentelemetry.util import types from opentelemetry.util._time import _time_ns logger = logging.getLogger(__name__) -SPAN_ATTRIBUTE_COUNT_LIMIT = int( - environ.get(OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, 128) -) +_DEFAULT_SPAN_EVENTS_LIMIT = 128 +_DEFAULT_SPAN_LINKS_LIMIT = 128 +_DEFAULT_SPAN_ATTRIBUTES_LIMIT = 128 -_SPAN_EVENT_COUNT_LIMIT = int(environ.get(OTEL_SPAN_EVENT_COUNT_LIMIT, 128)) -_SPAN_LINK_COUNT_LIMIT = int(environ.get(OTEL_SPAN_LINK_COUNT_LIMIT, 128)) # pylint: disable=protected-access _TRACE_SAMPLER = sampling._get_from_env_or_default() @@ -502,6 +499,87 @@ def _format_links(links): return f_links +class _Limits: + """The limits that should be enforce on recorded data such as events, links, attributes etc. + + This class does not enforce any limits itself. It only provides an a way read limits from env, + default values and in future from user provided arguments. + + All limit must be either a non-negative integer or ``None``. + Setting a limit to ``None`` will not set any limits for that field/type. + + Args: + max_events: Maximum number of events that can be added to a Span. + max_links: Maximum number of links that can be added to a Span. + max_attributes: Maximum number of attributes that can be added to a Span. + """ + + UNSET = -1 + + max_attributes: int + max_events: int + max_links: int + + def __init__( + self, + max_attributes: Optional[int] = None, + max_events: Optional[int] = None, + max_links: Optional[int] = None, + ): + self.max_attributes = self._from_env_if_absent( + max_attributes, + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, + _DEFAULT_SPAN_ATTRIBUTES_LIMIT, + ) + self.max_events = self._from_env_if_absent( + max_events, OTEL_SPAN_EVENT_COUNT_LIMIT, _DEFAULT_SPAN_EVENTS_LIMIT + ) + self.max_links = self._from_env_if_absent( + max_links, OTEL_SPAN_LINK_COUNT_LIMIT, _DEFAULT_SPAN_LINKS_LIMIT + ) + + def __repr__(self): + return "max_attributes={}, max_events={}, max_links={}".format( + self.max_attributes, self.max_events, self.max_links + ) + + @classmethod + def _from_env_if_absent( + cls, value: Optional[int], env_var: str, default: Optional[int] + ) -> Optional[int]: + if value is cls.UNSET: + return None + + err_msg = "{0} must be a non-negative integer but got {}" + + if value is None: + str_value = environ.get(env_var, "").strip().lower() + if not str_value: + return default + if str_value == "unset": + return None + + try: + value = int(str_value) + except ValueError: + raise ValueError(err_msg.format(env_var, str_value)) + + if value < 0: + raise ValueError(err_msg.format(env_var, value)) + return value + + +_UnsetLimits = _Limits( + max_attributes=_Limits.UNSET, + max_events=_Limits.UNSET, + max_links=_Limits.UNSET, +) + +SPAN_ATTRIBUTE_COUNT_LIMIT = _Limits._from_env_if_absent( + None, OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT +) + + class Span(trace_api.Span, ReadableSpan): """See `opentelemetry.trace.Span`. @@ -566,7 +644,7 @@ def __init__( self._attributes = self._new_attributes() else: self._attributes = BoundedDict.from_map( - SPAN_ATTRIBUTE_COUNT_LIMIT, attributes + self._limits.max_attributes, attributes ) self._events = self._new_events() @@ -582,24 +660,21 @@ def __init__( if links is None: self._links = self._new_links() else: - self._links = BoundedList.from_seq(_SPAN_LINK_COUNT_LIMIT, links) + self._links = BoundedList.from_seq(self._limits.max_links, links) def __repr__(self): return '{}(name="{}", context={})'.format( type(self).__name__, self._name, self._context ) - @staticmethod - def _new_attributes(): - return BoundedDict(SPAN_ATTRIBUTE_COUNT_LIMIT) + def _new_attributes(self): + return BoundedDict(self._limits.max_attributes) - @staticmethod - def _new_events(): - return BoundedList(_SPAN_EVENT_COUNT_LIMIT) + def _new_events(self): + return BoundedList(self._limits.max_events) - @staticmethod - def _new_links(): - return BoundedList(_SPAN_LINK_COUNT_LIMIT) + def _new_links(self): + return BoundedList(self._limits.max_links) def get_span_context(self): return self._context @@ -772,6 +847,10 @@ class _Span(Span): by other mechanisms than through the `Tracer`. """ + def __init__(self, *args, limits=_UnsetLimits, **kwargs): + self._limits = limits + super().__init__(*args, **kwargs) + class Tracer(trace_api.Tracer): """See `opentelemetry.trace.Tracer`.""" @@ -791,6 +870,7 @@ def __init__( self.span_processor = span_processor self.id_generator = id_generator self.instrumentation_info = instrumentation_info + self._limits = None @contextmanager def start_as_current_span( @@ -892,6 +972,7 @@ def start_span( # pylint: disable=too-many-locals instrumentation_info=self.instrumentation_info, record_exception=record_exception, set_status_on_exception=set_status_on_exception, + limits=self._limits, ) span.start(start_time=start_time, parent_context=context) else: @@ -921,6 +1002,7 @@ def __init__( self.id_generator = id_generator self._resource = resource self.sampler = sampler + self._limits = _Limits() self._atexit_handler = None if shutdown_on_exit: self._atexit_handler = atexit.register(self.shutdown) @@ -937,7 +1019,7 @@ def get_tracer( if not instrumenting_module_name: # Reject empty strings too. instrumenting_module_name = "" logger.error("get_tracer called with missing module name.") - return Tracer( + tracer = Tracer( self.sampler, self.resource, self._active_span_processor, @@ -946,6 +1028,8 @@ def get_tracer( instrumenting_module_name, instrumenting_library_version ), ) + tracer._limits = self._limits + return tracer def add_span_processor(self, span_processor: SpanProcessor) -> None: """Registers a new :class:`SpanProcessor` for this `TracerProvider`. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py index 981368049fb..746f100277d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py @@ -16,6 +16,7 @@ import threading from collections import OrderedDict, deque from collections.abc import MutableMapping, Sequence +from typing import Optional def ns_to_iso_str(nanoseconds): @@ -45,7 +46,7 @@ class BoundedList(Sequence): not enough room. """ - def __init__(self, maxlen): + def __init__(self, maxlen: Optional[int]): self.dropped = 0 self._dq = deque(maxlen=maxlen) # type: deque self._lock = threading.Lock() @@ -67,15 +68,19 @@ def __iter__(self): def append(self, item): with self._lock: - if len(self._dq) == self._dq.maxlen: + if ( + self._dq.maxlen is not None + and len(self._dq) == self._dq.maxlen + ): self.dropped += 1 self._dq.append(item) def extend(self, seq): with self._lock: - to_drop = len(seq) + len(self._dq) - self._dq.maxlen - if to_drop > 0: - self.dropped += to_drop + if self._dq.maxlen is not None: + to_drop = len(seq) + len(self._dq) - self._dq.maxlen + if to_drop > 0: + self.dropped += to_drop self._dq.extend(seq) @classmethod @@ -93,11 +98,12 @@ class BoundedDict(MutableMapping): added. """ - def __init__(self, maxlen): - if not isinstance(maxlen, int): - raise ValueError - if maxlen < 0: - raise ValueError + def __init__(self, maxlen: Optional[int]): + if maxlen is not None: + if not isinstance(maxlen, int): + raise ValueError + if maxlen < 0: + raise ValueError self.maxlen = maxlen self.dropped = 0 self._dict = OrderedDict() # type: OrderedDict @@ -113,13 +119,13 @@ def __getitem__(self, key): def __setitem__(self, key, value): with self._lock: - if self.maxlen == 0: + if self.maxlen is not None and self.maxlen == 0: self.dropped += 1 return if key in self._dict: del self._dict[key] - elif len(self._dict) == self.maxlen: + elif self.maxlen is not None and len(self._dict) == self.maxlen: del self._dict[next(iter(self._dict.keys()))] self.dropped += 1 self._dict[key] = value diff --git a/opentelemetry-sdk/tests/test_util.py b/opentelemetry-sdk/tests/test_util.py index e003e70789c..f90576afe70 100644 --- a/opentelemetry-sdk/tests/test_util.py +++ b/opentelemetry-sdk/tests/test_util.py @@ -135,6 +135,14 @@ def test_extend_drop(self): self.assertEqual(len(blist), list_len) self.assertEqual(blist.dropped, len(other_list)) + def test_no_limit(self): + blist = BoundedList(maxlen=None) + for num in range(100): + blist.append(num) + + for num in range(100): + self.assertEqual(blist[num], num) + class TestBoundedDict(unittest.TestCase): base = collections.OrderedDict( @@ -214,3 +222,11 @@ def test_bounded_dict(self): with self.assertRaises(KeyError): _ = bdict["new-name"] + + def test_no_limit_code(self): + bdict = BoundedDict(maxlen=None) + for num in range(100): + bdict[num] = num + + for num in range(100): + self.assertEqual(bdict[num], num) diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index a8b8330fe85..c9c523c3fab 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -18,6 +18,7 @@ import unittest from importlib import reload from logging import ERROR, WARNING +from random import randint from typing import Optional from unittest import mock @@ -538,7 +539,7 @@ def test_disallow_direct_span_creation(self): def test_surplus_span_links(self): # pylint: disable=protected-access - max_links = trace._SPAN_LINK_COUNT_LIMIT + max_links = trace._Limits().max_links links = [ trace_api.Link(trace_api.SpanContext(0x1, idx, is_remote=False)) for idx in range(0, 16 + max_links) @@ -548,7 +549,8 @@ def test_surplus_span_links(self): self.assertEqual(len(root.links), max_links) def test_surplus_span_attributes(self): - max_attrs = trace.SPAN_ATTRIBUTE_COUNT_LIMIT + # pylint: disable=protected-access + max_attrs = trace._Limits().max_attributes attributes = {str(idx): idx for idx in range(0, 16 + max_attrs)} tracer = new_tracer() with tracer.start_as_current_span( @@ -1270,6 +1272,50 @@ def test_attributes_to_json(self): class TestSpanLimits(unittest.TestCase): + # pylint: disable=protected-access + + def test_limits_defaults(self): + limits = trace._Limits() + self.assertEqual( + limits.max_attributes, trace._DEFAULT_SPAN_ATTRIBUTES_LIMIT + ) + self.assertEqual(limits.max_events, trace._DEFAULT_SPAN_EVENTS_LIMIT) + self.assertEqual(limits.max_links, trace._DEFAULT_SPAN_LINKS_LIMIT) + + def test_limits_values_code(self): + max_attributes, max_events, max_links = ( + randint(0, 10000), + randint(0, 10000), + randint(0, 10000), + ) + limits = trace._Limits( + max_attributes=max_attributes, + max_events=max_events, + max_links=max_links, + ) + self.assertEqual(limits.max_attributes, max_attributes) + self.assertEqual(limits.max_events, max_events) + self.assertEqual(limits.max_links, max_links) + + def test_limits_values_env(self): + max_attributes, max_events, max_links = ( + randint(0, 10000), + randint(0, 10000), + randint(0, 10000), + ) + with mock.patch.dict( + "os.environ", + { + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: str(max_attributes), + OTEL_SPAN_EVENT_COUNT_LIMIT: str(max_events), + OTEL_SPAN_LINK_COUNT_LIMIT: str(max_links), + }, + ): + limits = trace._Limits() + self.assertEqual(limits.max_attributes, max_attributes) + self.assertEqual(limits.max_events, max_events) + self.assertEqual(limits.max_links, max_links) + @mock.patch.dict( "os.environ", { @@ -1278,8 +1324,7 @@ class TestSpanLimits(unittest.TestCase): OTEL_SPAN_LINK_COUNT_LIMIT: "30", }, ) - def test_span_environment_limits(self): - reload(trace) + def test_span_limits_env(self): tracer = new_tracer() id_generator = RandomIdGenerator() some_links = [ @@ -1307,3 +1352,45 @@ def test_span_environment_limits(self): self.assertEqual(len(root.attributes), 10) self.assertEqual(len(root.events), 20) + + @mock.patch.dict( + "os.environ", + { + OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT: "unset", + OTEL_SPAN_EVENT_COUNT_LIMIT: "unset", + OTEL_SPAN_LINK_COUNT_LIMIT: "unset", + }, + ) + def test_span_no_limits_env(self): + num_links = int(trace._DEFAULT_SPAN_LINKS_LIMIT) + randint(1, 100) + + tracer = new_tracer() + id_generator = RandomIdGenerator() + some_links = [ + trace_api.Link( + trace_api.SpanContext( + trace_id=id_generator.generate_trace_id(), + span_id=id_generator.generate_span_id(), + is_remote=False, + ) + ) + for _ in range(num_links) + ] + with tracer.start_as_current_span("root", links=some_links) as root: + self.assertEqual(len(root.links), num_links) + + num_events = int(trace._DEFAULT_SPAN_EVENTS_LIMIT) + randint(1, 100) + with tracer.start_as_current_span("root") as root: + for idx in range(num_events): + root.add_event("my_event_{}".format(idx)) + + self.assertEqual(len(root.events), num_events) + + num_attributes = int(trace._DEFAULT_SPAN_ATTRIBUTES_LIMIT) + randint( + 1, 100 + ) + with tracer.start_as_current_span("root") as root: + for idx in range(num_attributes): + root.set_attribute("my_attribute_{}".format(idx), 0) + + self.assertEqual(len(root.attributes), num_attributes)