Skip to content

Commit

Permalink
bugfix: batch processor doesn't work with uwsgi (#2277)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Jun 8, 2022
1 parent b83c2ae commit 9fbc93b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from opentelemetry.context import attach, detach, set_value
from opentelemetry.sdk._logs import LogData, LogProcessor, LogRecord
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,6 +130,9 @@ def __init__(self):
self.num_log_records = 0


_BSP_RESET_ONCE = Once()


class BatchLogProcessor(LogProcessor):
"""This is an implementation of LogProcessor which creates batches of
received logs in the export-friendly LogData representation and
Expand Down Expand Up @@ -164,6 +168,7 @@ def __init__(
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access
self._pid = os.getpid()

def _at_fork_reinit(self):
self._condition = threading.Condition(threading.Lock())
Expand All @@ -174,6 +179,7 @@ def _at_fork_reinit(self):
daemon=True,
)
self._worker_thread.start()
self._pid = os.getpid()

def worker(self):
timeout = self._schedule_delay_millis / 1e3
Expand Down Expand Up @@ -293,6 +299,9 @@ def emit(self, log_data: LogData) -> None:
"""
if self._shutdown:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

self._queue.appendleft(log_data)
if len(self._queue) >= self._max_export_batch_size:
with self._condition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.util._once import Once
from opentelemetry.util._time import _time_ns

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,6 +120,9 @@ def __init__(self):
self.num_spans = 0


_BSP_RESET_ONCE = Once()


class BatchSpanProcessor(SpanProcessor):
"""Batch span processor implementation.
Expand Down Expand Up @@ -203,6 +207,7 @@ def __init__(
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access
self._pid = os.getpid()

def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
Expand All @@ -215,6 +220,9 @@ def on_end(self, span: ReadableSpan) -> None:
return
if not span.context.trace_flags.sampled:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)

if len(self.queue) == self.max_queue_size:
if not self._spans_dropped:
logger.warning("Queue is full, likely spans will be dropped.")
Expand All @@ -236,6 +244,7 @@ def _at_fork_reinit(self):
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.worker_thread.start()
self._pid = os.getpid()

def worker(self):
timeout = self.schedule_delay_millis / 1e3
Expand Down
9 changes: 3 additions & 6 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import logging
import multiprocessing
import os
import sys
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -44,8 +43,6 @@
from opentelemetry.trace import TraceFlags
from opentelemetry.trace.span import INVALID_SPAN_CONTEXT

supports_register_at_fork = hasattr(os, "fork") and sys.version_info >= (3, 7)


class TestSimpleLogProcessor(unittest.TestCase):
def test_simple_log_processor_default_level(self):
Expand Down Expand Up @@ -274,9 +271,9 @@ def bulk_log_and_flush(num_logs):
finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 2415)

@unittest.skipIf(
not supports_register_at_fork,
"needs *nix and minor version 7 or later",
@unittest.skipUnless(
hasattr(os, "fork"),
"needs *nix",
)
def test_batch_log_processor_fork(self):
# pylint: disable=invalid-name
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import multiprocessing
import os
import sys
import threading
import time
import unittest
Expand Down Expand Up @@ -369,8 +368,8 @@ def _check_fork_trace(self, exporter, expected):
self.assertIn(span.name, expected)

@unittest.skipUnless(
hasattr(os, "fork") and sys.version_info >= (3, 7),
"needs *nix and minor version 7 or later",
hasattr(os, "fork"),
"needs *nix",
)
def test_batch_span_processor_fork(self):
# pylint: disable=invalid-name
Expand Down

0 comments on commit 9fbc93b

Please sign in to comment.