Skip to content

Commit

Permalink
Allow users to "unset" SDK limits and evaluate default limits lazily …
Browse files Browse the repository at this point in the history
…instead of on import (#1839)
  • Loading branch information
owais authored May 25, 2021
1 parent eda57f7 commit 3dbbd1b
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1823](https://github.com/open-telemetry/opentelemetry-python/pull/1823))
- Added support for OTEL_SERVICE_NAME.
([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829))
- Lazily read/configure limits and allow limits to be unset.
([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839))

### Changed
- Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint.
Expand Down
120 changes: 102 additions & 18 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,16 @@
from opentelemetry.sdk.util import BoundedDict, BoundedList
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.trace import SpanContext
from opentelemetry.trace.propagation import SPAN_KEY
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util import types
from opentelemetry.util._time import _time_ns

logger = logging.getLogger(__name__)

SPAN_ATTRIBUTE_COUNT_LIMIT = int(
environ.get(OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, 128)
)
_DEFAULT_SPAN_EVENTS_LIMIT = 128
_DEFAULT_SPAN_LINKS_LIMIT = 128
_DEFAULT_SPAN_ATTRIBUTES_LIMIT = 128

_SPAN_EVENT_COUNT_LIMIT = int(environ.get(OTEL_SPAN_EVENT_COUNT_LIMIT, 128))
_SPAN_LINK_COUNT_LIMIT = int(environ.get(OTEL_SPAN_LINK_COUNT_LIMIT, 128))
# pylint: disable=protected-access
_TRACE_SAMPLER = sampling._get_from_env_or_default()

Expand Down Expand Up @@ -502,6 +499,87 @@ def _format_links(links):
return f_links


class _Limits:
"""The limits that should be enforce on recorded data such as events, links, attributes etc.
This class does not enforce any limits itself. It only provides an a way read limits from env,
default values and in future from user provided arguments.
All limit must be either a non-negative integer or ``None``.
Setting a limit to ``None`` will not set any limits for that field/type.
Args:
max_events: Maximum number of events that can be added to a Span.
max_links: Maximum number of links that can be added to a Span.
max_attributes: Maximum number of attributes that can be added to a Span.
"""

UNSET = -1

max_attributes: int
max_events: int
max_links: int

def __init__(
self,
max_attributes: Optional[int] = None,
max_events: Optional[int] = None,
max_links: Optional[int] = None,
):
self.max_attributes = self._from_env_if_absent(
max_attributes,
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
_DEFAULT_SPAN_ATTRIBUTES_LIMIT,
)
self.max_events = self._from_env_if_absent(
max_events, OTEL_SPAN_EVENT_COUNT_LIMIT, _DEFAULT_SPAN_EVENTS_LIMIT
)
self.max_links = self._from_env_if_absent(
max_links, OTEL_SPAN_LINK_COUNT_LIMIT, _DEFAULT_SPAN_LINKS_LIMIT
)

def __repr__(self):
return "max_attributes={}, max_events={}, max_links={}".format(
self.max_attributes, self.max_events, self.max_links
)

@classmethod
def _from_env_if_absent(
cls, value: Optional[int], env_var: str, default: Optional[int]
) -> Optional[int]:
if value is cls.UNSET:
return None

err_msg = "{0} must be a non-negative integer but got {}"

if value is None:
str_value = environ.get(env_var, "").strip().lower()
if not str_value:
return default
if str_value == "unset":
return None

try:
value = int(str_value)
except ValueError:
raise ValueError(err_msg.format(env_var, str_value))

if value < 0:
raise ValueError(err_msg.format(env_var, value))
return value


_UnsetLimits = _Limits(
max_attributes=_Limits.UNSET,
max_events=_Limits.UNSET,
max_links=_Limits.UNSET,
)

SPAN_ATTRIBUTE_COUNT_LIMIT = _Limits._from_env_if_absent(
None, OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT
)


class Span(trace_api.Span, ReadableSpan):
"""See `opentelemetry.trace.Span`.
Expand Down Expand Up @@ -566,7 +644,7 @@ def __init__(
self._attributes = self._new_attributes()
else:
self._attributes = BoundedDict.from_map(
SPAN_ATTRIBUTE_COUNT_LIMIT, attributes
self._limits.max_attributes, attributes
)

self._events = self._new_events()
Expand All @@ -582,24 +660,21 @@ def __init__(
if links is None:
self._links = self._new_links()
else:
self._links = BoundedList.from_seq(_SPAN_LINK_COUNT_LIMIT, links)
self._links = BoundedList.from_seq(self._limits.max_links, links)

def __repr__(self):
return '{}(name="{}", context={})'.format(
type(self).__name__, self._name, self._context
)

@staticmethod
def _new_attributes():
return BoundedDict(SPAN_ATTRIBUTE_COUNT_LIMIT)
def _new_attributes(self):
return BoundedDict(self._limits.max_attributes)

@staticmethod
def _new_events():
return BoundedList(_SPAN_EVENT_COUNT_LIMIT)
def _new_events(self):
return BoundedList(self._limits.max_events)

@staticmethod
def _new_links():
return BoundedList(_SPAN_LINK_COUNT_LIMIT)
def _new_links(self):
return BoundedList(self._limits.max_links)

def get_span_context(self):
return self._context
Expand Down Expand Up @@ -772,6 +847,10 @@ class _Span(Span):
by other mechanisms than through the `Tracer`.
"""

def __init__(self, *args, limits=_UnsetLimits, **kwargs):
self._limits = limits
super().__init__(*args, **kwargs)


class Tracer(trace_api.Tracer):
"""See `opentelemetry.trace.Tracer`."""
Expand All @@ -791,6 +870,7 @@ def __init__(
self.span_processor = span_processor
self.id_generator = id_generator
self.instrumentation_info = instrumentation_info
self._limits = None

@contextmanager
def start_as_current_span(
Expand Down Expand Up @@ -892,6 +972,7 @@ def start_span( # pylint: disable=too-many-locals
instrumentation_info=self.instrumentation_info,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
limits=self._limits,
)
span.start(start_time=start_time, parent_context=context)
else:
Expand Down Expand Up @@ -921,6 +1002,7 @@ def __init__(
self.id_generator = id_generator
self._resource = resource
self.sampler = sampler
self._limits = _Limits()
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)
Expand All @@ -937,7 +1019,7 @@ def get_tracer(
if not instrumenting_module_name: # Reject empty strings too.
instrumenting_module_name = ""
logger.error("get_tracer called with missing module name.")
return Tracer(
tracer = Tracer(
self.sampler,
self.resource,
self._active_span_processor,
Expand All @@ -946,6 +1028,8 @@ def get_tracer(
instrumenting_module_name, instrumenting_library_version
),
)
tracer._limits = self._limits
return tracer

def add_span_processor(self, span_processor: SpanProcessor) -> None:
"""Registers a new :class:`SpanProcessor` for this `TracerProvider`.
Expand Down
30 changes: 18 additions & 12 deletions opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
from collections import OrderedDict, deque
from collections.abc import MutableMapping, Sequence
from typing import Optional


def ns_to_iso_str(nanoseconds):
Expand Down Expand Up @@ -45,7 +46,7 @@ class BoundedList(Sequence):
not enough room.
"""

def __init__(self, maxlen):
def __init__(self, maxlen: Optional[int]):
self.dropped = 0
self._dq = deque(maxlen=maxlen) # type: deque
self._lock = threading.Lock()
Expand All @@ -67,15 +68,19 @@ def __iter__(self):

def append(self, item):
with self._lock:
if len(self._dq) == self._dq.maxlen:
if (
self._dq.maxlen is not None
and len(self._dq) == self._dq.maxlen
):
self.dropped += 1
self._dq.append(item)

def extend(self, seq):
with self._lock:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
if self._dq.maxlen is not None:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
self._dq.extend(seq)

@classmethod
Expand All @@ -93,11 +98,12 @@ class BoundedDict(MutableMapping):
added.
"""

def __init__(self, maxlen):
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
def __init__(self, maxlen: Optional[int]):
if maxlen is not None:
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
self.maxlen = maxlen
self.dropped = 0
self._dict = OrderedDict() # type: OrderedDict
Expand All @@ -113,13 +119,13 @@ def __getitem__(self, key):

def __setitem__(self, key, value):
with self._lock:
if self.maxlen == 0:
if self.maxlen is not None and self.maxlen == 0:
self.dropped += 1
return

if key in self._dict:
del self._dict[key]
elif len(self._dict) == self.maxlen:
elif self.maxlen is not None and len(self._dict) == self.maxlen:
del self._dict[next(iter(self._dict.keys()))]
self.dropped += 1
self._dict[key] = value
Expand Down
16 changes: 16 additions & 0 deletions opentelemetry-sdk/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def test_extend_drop(self):
self.assertEqual(len(blist), list_len)
self.assertEqual(blist.dropped, len(other_list))

def test_no_limit(self):
blist = BoundedList(maxlen=None)
for num in range(100):
blist.append(num)

for num in range(100):
self.assertEqual(blist[num], num)


class TestBoundedDict(unittest.TestCase):
base = collections.OrderedDict(
Expand Down Expand Up @@ -214,3 +222,11 @@ def test_bounded_dict(self):

with self.assertRaises(KeyError):
_ = bdict["new-name"]

def test_no_limit_code(self):
bdict = BoundedDict(maxlen=None)
for num in range(100):
bdict[num] = num

for num in range(100):
self.assertEqual(bdict[num], num)
Loading

0 comments on commit 3dbbd1b

Please sign in to comment.