Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LogEmitter.flush() to align with OTel Log Spec #2863

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2843](https://github.com/open-telemetry/opentelemetry-python/pull/2843))
- Instrument instances are always created through a Meter
([#2844](https://github.com/open-telemetry/opentelemetry-python/pull/2844))
- Fix: Remove LogEmitter.flush() to align with OTel Log spec
([#2863](https://github.com/open-telemetry/opentelemetry-python/pull/2863))
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2) - 2022-07-04

Expand Down
5 changes: 3 additions & 2 deletions docs/examples/logs/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@

exporter = OTLPLogExporter(insecure=True)
log_emitter_provider.add_log_processor(BatchLogProcessor(exporter))
log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1")
handler = LoggingHandler(level=logging.NOTSET, log_emitter=log_emitter)
handler = LoggingHandler(
level=logging.NOTSET, log_emitter_provider=log_emitter_provider
)

# Attach OTLP handler to root logger
logging.getLogger().addHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ def _init_logging(
BatchLogProcessor(exporter_class(**exporter_args))
)

log_emitter = provider.get_log_emitter(__name__)
handler = LoggingHandler(level=logging.NOTSET, log_emitter=log_emitter)
handler = LoggingHandler(
level=logging.NOTSET, log_emitter_provider=provider
)

logging.getLogger().addHandler(handler)

Expand Down
17 changes: 8 additions & 9 deletions opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,15 @@ class LoggingHandler(logging.Handler):
def __init__(
self,
level=logging.NOTSET,
log_emitter=None,
log_emitter_provider=None,
) -> None:
super().__init__(level=level)
self._log_emitter = log_emitter or get_log_emitter(__name__)
self._log_emitter_provider = (
log_emitter_provider or get_log_emitter_provider()
)
self._log_emitter = get_log_emitter(
__name__, log_emitter_provider=self._log_emitter_provider
)

@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
Expand Down Expand Up @@ -369,7 +374,7 @@ def flush(self) -> None:
"""
Flushes the logging output.
"""
self._log_emitter.flush()
self._log_emitter_provider.force_flush()


class LogEmitter:
Expand All @@ -396,12 +401,6 @@ def emit(self, record: LogRecord):
log_data = LogData(record, self._instrumentation_scope)
self._multi_log_processor.emit(log_data)

# TODO: Should this flush everything in pipeline?
# Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
def flush(self):
"""Ensure all logging output has been flushed."""
self._multi_log_processor.force_flush()


class LogEmitterProvider:
def __init__(
Expand Down
38 changes: 18 additions & 20 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ class TestSimpleLogProcessor(unittest.TestCase):
def test_simple_log_processor_default_level(self):
exporter = InMemoryLogExporter()
log_emitter_provider = LogEmitterProvider()
log_emitter = log_emitter_provider.get_log_emitter(__name__)

log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter))

logger = logging.getLogger("default_level")
logger.addHandler(LoggingHandler(log_emitter=log_emitter))
logger.addHandler(
LoggingHandler(log_emitter_provider=log_emitter_provider)
)

logger.warning("Something is wrong")
finished_logs = exporter.get_finished_logs()
Expand All @@ -68,13 +69,14 @@ def test_simple_log_processor_default_level(self):
def test_simple_log_processor_custom_level(self):
exporter = InMemoryLogExporter()
log_emitter_provider = LogEmitterProvider()
log_emitter = log_emitter_provider.get_log_emitter(__name__)

log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter))

logger = logging.getLogger("custom_level")
logger.setLevel(logging.ERROR)
logger.addHandler(LoggingHandler(log_emitter=log_emitter))
logger.addHandler(
LoggingHandler(log_emitter_provider=log_emitter_provider)
)

logger.warning("Warning message")
logger.debug("Debug message")
Expand All @@ -99,12 +101,13 @@ def test_simple_log_processor_custom_level(self):
def test_simple_log_processor_trace_correlation(self):
exporter = InMemoryLogExporter()
log_emitter_provider = LogEmitterProvider()
log_emitter = log_emitter_provider.get_log_emitter("name", "version")

log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter))

logger = logging.getLogger("trace_correlation")
logger.addHandler(LoggingHandler(log_emitter=log_emitter))
logger.addHandler(
LoggingHandler(log_emitter_provider=log_emitter_provider)
)

logger.warning("Warning message")
finished_logs = exporter.get_finished_logs()
Expand Down Expand Up @@ -137,12 +140,13 @@ def test_simple_log_processor_trace_correlation(self):
def test_simple_log_processor_shutdown(self):
exporter = InMemoryLogExporter()
log_emitter_provider = LogEmitterProvider()
log_emitter = log_emitter_provider.get_log_emitter(__name__)

log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter))

logger = logging.getLogger("shutdown")
logger.addHandler(LoggingHandler(log_emitter=log_emitter))
logger.addHandler(
LoggingHandler(log_emitter_provider=log_emitter_provider)
)

logger.warning("Something is wrong")
finished_logs = exporter.get_finished_logs()
Expand All @@ -167,9 +171,8 @@ def test_emit_call_log_record(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("emit_call")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

logger.error("error")
self.assertEqual(log_processor.emit.call_count, 1)
Expand All @@ -181,9 +184,8 @@ def test_shutdown(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("shutdown")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

logger.warning("warning message: %s", "possible upcoming heatwave")
logger.error("Very high rise in temperatures across the globe")
Expand Down Expand Up @@ -214,9 +216,8 @@ def test_force_flush(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("force_flush")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

logger.critical("Earth is burning")
log_processor.force_flush()
Expand All @@ -233,9 +234,8 @@ def test_log_processor_too_many_logs(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("many_logs")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

for log_no in range(1000):
logger.critical("Log no: %s", log_no)
Expand All @@ -251,9 +251,8 @@ def test_with_multiple_threads(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("threads")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

def bulk_log_and_flush(num_logs):
for _ in range(num_logs):
Expand Down Expand Up @@ -286,9 +285,8 @@ def test_batch_log_processor_fork(self):
provider = LogEmitterProvider()
provider.add_log_processor(log_processor)

emitter = provider.get_log_emitter(__name__)
logger = logging.getLogger("test-fork")
logger.addHandler(LoggingHandler(log_emitter=emitter))
logger.addHandler(LoggingHandler(log_emitter_provider=provider))

logger.critical("yolo")
time.sleep(0.5) # give some time for the exporter to upload
Expand Down
63 changes: 46 additions & 17 deletions opentelemetry-sdk/tests/logs/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@
from unittest.mock import Mock

from opentelemetry.sdk import trace
from opentelemetry.sdk._logs import LogEmitter, LoggingHandler
from opentelemetry.sdk._logs import (
LogEmitterProvider,
LoggingHandler,
get_log_emitter,
)
from opentelemetry.sdk._logs.severity import SeverityNumber
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import INVALID_SPAN_CONTEXT


def get_logger(level=logging.NOTSET, log_emitter=None):
def get_logger(level=logging.NOTSET, log_emitter_provider=None):
logger = logging.getLogger(__name__)
handler = LoggingHandler(level=level, log_emitter=log_emitter)
handler = LoggingHandler(
level=level, log_emitter_provider=log_emitter_provider
)
logger.addHandler(handler)
return logger


class TestLoggingHandler(unittest.TestCase):
def test_handler_default_log_level(self):
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)
# Make sure debug messages are ignored by default
logger.debug("Debug message")
self.assertEqual(emitter_mock.emit.call_count, 0)
Expand All @@ -41,8 +50,13 @@ def test_handler_default_log_level(self):
self.assertEqual(emitter_mock.emit.call_count, 1)

def test_handler_custom_log_level(self):
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(level=logging.ERROR, log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(
level=logging.ERROR, log_emitter_provider=emitter_provider_mock
)
logger.warning("Warning message test custom log level")
# Make sure any log with level < ERROR is ignored
self.assertEqual(emitter_mock.emit.call_count, 0)
Expand All @@ -51,8 +65,11 @@ def test_handler_custom_log_level(self):
self.assertEqual(emitter_mock.emit.call_count, 2)

def test_log_record_no_span_context(self):
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)
# Assert emit gets called for warning message
logger.warning("Warning message")
args, _ = emitter_mock.emit.call_args_list[0]
Expand All @@ -67,8 +84,11 @@ def test_log_record_no_span_context(self):

def test_log_record_user_attributes(self):
"""Attributes can be injected into logs by adding them to the LogRecord"""
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)
# Assert emit gets called for warning message
logger.warning("Warning message", extra={"http.status_code": 200})
args, _ = emitter_mock.emit.call_args_list[0]
Expand All @@ -79,8 +99,11 @@ def test_log_record_user_attributes(self):

def test_log_record_exception(self):
"""Exception information will be included in attributes"""
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)
try:
raise ZeroDivisionError("division by zero")
except ZeroDivisionError:
Expand Down Expand Up @@ -109,8 +132,11 @@ def test_log_record_exception(self):

def test_log_exc_info_false(self):
"""Exception information will be included in attributes"""
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)
try:
raise ZeroDivisionError("division by zero")
except ZeroDivisionError:
Expand All @@ -129,8 +155,11 @@ def test_log_exc_info_false(self):
)

def test_log_record_trace_correlation(self):
emitter_mock = Mock(spec=LogEmitter)
logger = get_logger(log_emitter=emitter_mock)
emitter_provider_mock = Mock(spec=LogEmitterProvider)
emitter_mock = get_log_emitter(
__name__, log_emitter_provider=emitter_provider_mock
)
logger = get_logger(log_emitter_provider=emitter_provider_mock)

tracer = trace.TracerProvider().get_tracer(__name__)
with tracer.start_as_current_span("test") as span:
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def force_flush(self, timeout_millis=30000):
class TestLogProcessor(unittest.TestCase):
def test_log_processor(self):
provider = LogEmitterProvider()
log_emitter = provider.get_log_emitter(__name__)
handler = LoggingHandler(log_emitter=log_emitter)
handler = LoggingHandler(log_emitter_provider=provider)

logs_list_1 = []
processor1 = AnotherLogProcessor(Mock(), logs_list_1)
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/tests/test_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ def __init__(self, resource=None):
def add_log_processor(self, processor):
self.processor = processor

def get_log_emitter(self, name):
def get_log_emitter(self, name, *args, **kwargs):
return DummyLogEmitter(name, self.resource, self.processor)

def force_flush(self, *args, **kwargs):
pass


class DummyMeterProvider(MeterProvider):
pass
Expand All @@ -85,9 +88,6 @@ def __init__(self, name, resource, processor):
def emit(self, record):
self.processor.emit(record)

def flush(self):
pass


class DummyLogProcessor:
def __init__(self, exporter):
Expand Down