Skip to content

Commit

Permalink
Added Span limits support to the tracing SDK
Browse files Browse the repository at this point in the history
- Added SpanLimits class
- TracerProvider now optionally accepts an instance of SpanLimits which
is passed all the way down to Spans.
- Spans use the limits to created bounded lists or dicts for different
fields.
  • Loading branch information
owais committed May 11, 2021
1 parent cc18b73 commit 518a6cb
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1803](https://github.com/open-telemetry/opentelemetry-python/pull/1803))
- Added support for OTEL_SERVICE_NAME.
([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829))
- Lazily configure Span limits and allow limits limits to be configured via code.
([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839))

### Changed
- Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint.
Expand Down
111 changes: 94 additions & 17 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@

logger = logging.getLogger(__name__)

SPAN_ATTRIBUTE_COUNT_LIMIT = int(
environ.get(OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, 128)
)

_SPAN_EVENT_COUNT_LIMIT = int(environ.get(OTEL_SPAN_EVENT_COUNT_LIMIT, 128))
_SPAN_LINK_COUNT_LIMIT = int(environ.get(OTEL_SPAN_LINK_COUNT_LIMIT, 128))
_DEFAULT_SPAN_EVENTS_LIMIT = 128
_DEFAULT_SPAN_LINKS_LIMIT = 128
_DEFAULT_SPAN_ATTRIBUTES_LIMIT = 128
_VALID_ATTR_VALUE_TYPES = (bool, str, int, float)

# pylint: disable=protected-access
_TRACE_SAMPLER = sampling._get_from_env_or_default()

Expand Down Expand Up @@ -564,6 +562,80 @@ def _format_links(links):
return f_links


class SpanLimits:
"""The limits Spans should enforce on recorded data such as events, links, attributes etc.
This class does not enforce any limits itself. It only provides an API to set the limits
when creating a tracer provider.
All arguments must either be a non-negative integer or a reference to :attr:`~UNSET`.
Setting a limit to `SpanLimits.UNSET` will not set an limits for that data type.
Args:
max_events: Maximum number of events that can be added to a Span.
max_links: Maximum number of links that can be added to a Span.
max_attributes: Maximum number of attributes that can be added to a Span.
"""

UNSET = -1

max_events: int
max_links: int
max_attributes: int

def __init__(
self,
max_events: Optional[int] = None,
max_links: Optional[int] = None,
max_attributes: Optional[int] = None,
):
self.max_events = SpanLimits._value_or_default(
max_events, OTEL_SPAN_EVENT_COUNT_LIMIT, _DEFAULT_SPAN_EVENTS_LIMIT
)
self.max_links = SpanLimits._value_or_default(
max_links, OTEL_SPAN_LINK_COUNT_LIMIT, _DEFAULT_SPAN_LINKS_LIMIT
)
self.max_attributes = SpanLimits._value_or_default(
max_attributes,
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
_DEFAULT_SPAN_ATTRIBUTES_LIMIT,
)

@classmethod
def _value_or_default(
cls, value: Optional[int], name: str, default: int
) -> Optional[int]:
if value is SpanLimits.UNSET:
return None

if value is not None:
if not isinstance(value, int):
raise ValueError("SpanLimit value must be an integer")
if value < 0:
raise ValueError(
"SpanLimit value must be a non-negative number"
)
return value

value = environ.get(name, "").strip().lower()
if value == "unset":
return None
if value:
return int(value)
return default


_UnsetSpanLimits = SpanLimits(
max_events=SpanLimits.UNSET,
max_links=SpanLimits.UNSET,
max_attributes=SpanLimits.UNSET,
)

SPAN_ATTRIBUTE_COUNT_LIMIT = SpanLimits._value_or_default(
None, OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, _DEFAULT_SPAN_ATTRIBUTES_LIMIT
)


class Span(trace_api.Span, ReadableSpan):
"""See `opentelemetry.trace.Span`.
Expand Down Expand Up @@ -604,6 +676,7 @@ def __init__(
links: Sequence[trace_api.Link] = (),
kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL,
span_processor: SpanProcessor = SpanProcessor(),
limits: SpanLimits = _UnsetSpanLimits,
instrumentation_info: InstrumentationInfo = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
Expand All @@ -621,14 +694,15 @@ def __init__(
self._record_exception = record_exception
self._set_status_on_exception = set_status_on_exception
self._span_processor = span_processor
self._limits = limits
self._lock = threading.Lock()

_filter_attribute_values(attributes)
if not attributes:
self._attributes = self._new_attributes()
else:
self._attributes = BoundedDict.from_map(
SPAN_ATTRIBUTE_COUNT_LIMIT, attributes
self._limits.max_attributes, attributes
)

self._events = self._new_events()
Expand All @@ -644,24 +718,21 @@ def __init__(
if links is None:
self._links = self._new_links()
else:
self._links = BoundedList.from_seq(_SPAN_LINK_COUNT_LIMIT, links)
self._links = BoundedList.from_seq(self._limits.max_links, links)

def __repr__(self):
return '{}(name="{}", context={})'.format(
type(self).__name__, self._name, self._context
)

@staticmethod
def _new_attributes():
return BoundedDict(SPAN_ATTRIBUTE_COUNT_LIMIT)
def _new_attributes(self):
return BoundedDict(self._limits.max_attributes)

@staticmethod
def _new_events():
return BoundedList(_SPAN_EVENT_COUNT_LIMIT)
def _new_events(self):
return BoundedList(self._limits.max_events)

@staticmethod
def _new_links():
return BoundedList(_SPAN_LINK_COUNT_LIMIT)
def _new_links(self):
return BoundedList(self._limits.max_links)

def get_span_context(self):
return self._context
Expand Down Expand Up @@ -847,11 +918,13 @@ def __init__(
],
id_generator: IdGenerator,
instrumentation_info: InstrumentationInfo,
span_limits: SpanLimits,
) -> None:
self.sampler = sampler
self.resource = resource
self.span_processor = span_processor
self.id_generator = id_generator
self._span_limits = span_limits
self.instrumentation_info = instrumentation_info

@contextmanager
Expand Down Expand Up @@ -954,6 +1027,7 @@ def start_span( # pylint: disable=too-many-locals
instrumentation_info=self.instrumentation_info,
record_exception=record_exception,
set_status_on_exception=set_status_on_exception,
limits=self._span_limits,
)
span.start(start_time=start_time, parent_context=context)
else:
Expand All @@ -973,6 +1047,7 @@ def __init__(
SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor
] = None,
id_generator: IdGenerator = None,
span_limits=None,
):
self._active_span_processor = (
active_span_processor or SynchronousMultiSpanProcessor()
Expand All @@ -983,6 +1058,7 @@ def __init__(
self.id_generator = id_generator
self._resource = resource
self.sampler = sampler
self._span_limits = span_limits or SpanLimits()
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)
Expand All @@ -1007,6 +1083,7 @@ def get_tracer(
InstrumentationInfo(
instrumenting_module_name, instrumenting_library_version
),
self._span_limits,
)

def add_span_processor(self, span_processor: SpanProcessor) -> None:
Expand Down
34 changes: 20 additions & 14 deletions opentelemetry-sdk/src/opentelemetry/sdk/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import threading
from collections import OrderedDict, deque
from collections.abc import MutableMapping, Sequence
from typing import Optional


def ns_to_iso_str(nanoseconds):
Expand Down Expand Up @@ -45,7 +46,7 @@ class BoundedList(Sequence):
not enough room.
"""

def __init__(self, maxlen):
def __init__(self, maxlen: Optional[int]):
self.dropped = 0
self._dq = deque(maxlen=maxlen) # type: deque
self._lock = threading.Lock()
Expand All @@ -67,21 +68,25 @@ def __iter__(self):

def append(self, item):
with self._lock:
if len(self._dq) == self._dq.maxlen:
if (
self._dq.maxlen is not None
and len(self._dq) == self._dq.maxlen
):
self.dropped += 1
self._dq.append(item)

def extend(self, seq):
with self._lock:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
if self._dq.maxlen is not None:
to_drop = len(seq) + len(self._dq) - self._dq.maxlen
if to_drop > 0:
self.dropped += to_drop
self._dq.extend(seq)

@classmethod
def from_seq(cls, maxlen, seq):
seq = tuple(seq)
if len(seq) > maxlen:
if maxlen is not None and len(seq) > maxlen:
raise ValueError
bounded_list = cls(maxlen)
# pylint: disable=protected-access
Expand All @@ -96,11 +101,12 @@ class BoundedDict(MutableMapping):
added.
"""

def __init__(self, maxlen):
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
def __init__(self, maxlen: Optional[int]):
if maxlen is not None:
if not isinstance(maxlen, int):
raise ValueError
if maxlen < 0:
raise ValueError
self.maxlen = maxlen
self.dropped = 0
self._dict = OrderedDict() # type: OrderedDict
Expand All @@ -116,13 +122,13 @@ def __getitem__(self, key):

def __setitem__(self, key, value):
with self._lock:
if self.maxlen == 0:
if self.maxlen is not None and self.maxlen == 0:
self.dropped += 1
return

if key in self._dict:
del self._dict[key]
elif len(self._dict) == self.maxlen:
elif self.maxlen is not None and len(self._dict) == self.maxlen:
del self._dict[next(iter(self._dict.keys()))]
self.dropped += 1
self._dict[key] = value
Expand All @@ -140,7 +146,7 @@ def __len__(self):
@classmethod
def from_map(cls, maxlen, mapping):
mapping = OrderedDict(mapping)
if len(mapping) > maxlen:
if maxlen is not None and len(mapping) > maxlen:
raise ValueError
bounded_dict = cls(maxlen)
# pylint: disable=protected-access
Expand Down
16 changes: 16 additions & 0 deletions opentelemetry-sdk/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ def test_extend_drop(self):
self.assertEqual(len(blist), list_len)
self.assertEqual(blist.dropped, len(other_list))

def test_no_limit(self):
blist = BoundedList(maxlen=None)
for num in range(100):
blist.append(num)

for num in range(100):
self.assertEqual(blist[num], num)


class TestBoundedDict(unittest.TestCase):
base = collections.OrderedDict(
Expand Down Expand Up @@ -211,3 +219,11 @@ def test_bounded_dict(self):

with self.assertRaises(KeyError):
_ = bdict["new-name"]

def test_no_limit_code(self):
bdict = BoundedDict(maxlen=None)
for num in range(100):
bdict[num] = num

for num in range(100):
self.assertEqual(bdict[num], num)
Loading

0 comments on commit 518a6cb

Please sign in to comment.