Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove flush method on LogEmitter #2584

Closed
srikanthccv opened this issue Apr 3, 2022 · 11 comments · Fixed by #2863
Closed

Remove flush method on LogEmitter #2584

srikanthccv opened this issue Apr 3, 2022 · 11 comments · Fixed by #2863

Comments

@srikanthccv
Copy link
Member

Following this spec change open-telemetry/opentelemetry-specification@441bafa

@pranavmarla
Copy link
Contributor

Hey folks, I'm new to contributing, and was hoping to give this issue a shot. Skimming through the source code, it looks like solving this issue should be as simple as deleting this snippet of code:

# TODO: Should this flush everything in pipeline?
# Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
def flush(self):
    """Ensure all logging output has been flushed."""
    self._multi_log_processor.force_flush()

Feel free to let me know if I'm missing anything, or if you have any tips/recommendations!

@lzchen
Copy link
Contributor

lzchen commented Jul 19, 2022

@srikanthccv
trace/metrics spec don't say anything about a flush method but we have force_flush defined in TracerProvider and MeterProvider? Was there a reason to not be consistent in our language specific implementation?

@srikanthccv
Copy link
Member Author

Leighton, You are confusing the Emitter and EmitterProvider. The Emitter equivalent is Tracer/Meter and we don't have flush operation for them. The proposed change in the spec and this issue exists to make them consistent.

@pranavmarla deleting the code alone is not enough. SDK should complete the Handler interface and needs to implement flush on it. As I mentioned in the spec commit one way we could achieve is to let the handler get the access to provider.

@pranavmarla
Copy link
Contributor

Thanks @srikanthccv. I went through all the related PRs and issues to gather context -- if I understand correctly, this is what you're saying needs to be done:

In the file opentelemetry/sdk/_logs/__init__.py:

  1. Delete flush() method from LogEmitter class.
  2. Modify __init__() method of LoggingHandler class to take in an additional log_emitter_provider arg.
  3. Modify flush() method of LoggingHandler class to replace self._log_emitter.flush() with self._log_emitter_provider.force_flush()

Does that sound right?

@srikanthccv
Copy link
Member Author

LoggingHandler class to take in an additional log_emitter_provider arg

replace the log_emitter arg with provider instance and use it.

@pranavmarla
Copy link
Contributor

pranavmarla commented Jul 19, 2022

LoggingHandler class to take in an additional log_emitter_provider arg

replace the log_emitter arg with provider instance and use it.

But LoggingHandler class still needs access to log_emitter, so it can call self._log_emitter.emit(...) in the emit() function, right? So, I assumed I could not get rid of log_emitter arg

@srikanthccv
Copy link
Member Author

You need to obtain the emitter from the provider. It will look something like
self._emitter = get_log_emitter(name, log_emitter_provider=log_emitter_provider)

@pranavmarla
Copy link
Contributor

You need to obtain the emitter from the provider. It will look something like self._emitter = get_log_emitter(name, log_emitter_provider=log_emitter_provider)

Ahh gotcha -- thanks!

@pranavmarla
Copy link
Contributor

pranavmarla commented Jul 26, 2022

FYI, for my own future reference, here are some links I found containing important context for this issue:

  • class LoggingHandler(logging.Handler):
    """A handler class which writes logging records, in OTLP format, to
    a network destination or file. Supports signals from the `logging` module.
    https://docs.python.org/3/library/logging.html
    """
    def __init__(
    self,
    level=logging.NOTSET,
    log_emitter=None,
    ) -> None:
    super().__init__(level=level)
    self._log_emitter = log_emitter or get_log_emitter(__name__)
    @staticmethod
    def _get_attributes(record: logging.LogRecord) -> Attributes:
    attributes = {
    k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS
    }
    if record.exc_info:
    exc_type = ""
    message = ""
    stack_trace = ""
    exctype, value, tb = record.exc_info
    if exctype is not None:
    exc_type = exctype.__name__
    if value is not None and value.args:
    message = value.args[0]
    if tb is not None:
    # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation
    stack_trace = "".join(
    traceback.format_exception(*record.exc_info)
    )
    attributes[SpanAttributes.EXCEPTION_TYPE] = exc_type
    attributes[SpanAttributes.EXCEPTION_MESSAGE] = message
    attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stack_trace
    return attributes
    def _translate(self, record: logging.LogRecord) -> LogRecord:
    timestamp = int(record.created * 1e9)
    span_context = get_current_span().get_span_context()
    attributes = self._get_attributes(record)
    severity_number = std_to_otlp(record.levelno)
    return LogRecord(
    timestamp=timestamp,
    trace_id=span_context.trace_id,
    span_id=span_context.span_id,
    trace_flags=span_context.trace_flags,
    severity_text=record.levelname,
    severity_number=severity_number,
    body=record.getMessage(),
    resource=self._log_emitter.resource,
    attributes=attributes,
    )
    def emit(self, record: logging.LogRecord) -> None:
    """
    Emit a record.
    The record is translated to OTLP format, and then sent across the pipeline.
    """
    self._log_emitter.emit(self._translate(record))
    def flush(self) -> None:
    """
    Flushes the logging output.
    """
    self._log_emitter.flush()
    class LogEmitter:
    def __init__(
    self,
    resource: Resource,
    multi_log_processor: Union[
    SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
    ],
    instrumentation_scope: InstrumentationScope,
    ):
    self._resource = resource
    self._multi_log_processor = multi_log_processor
    self._instrumentation_scope = instrumentation_scope
    @property
    def resource(self):
    return self._resource
    def emit(self, record: LogRecord):
    """Emits the :class:`LogData` by associating :class:`LogRecord`
    and instrumentation info.
    """
    log_data = LogData(record, self._instrumentation_scope)
    self._multi_log_processor.emit(log_data)
    # TODO: Should this flush everything in pipeline?
    # Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
    def flush(self):
    """Ensure all logging output has been flushed."""
    self._multi_log_processor.force_flush()
    class LogEmitterProvider:
    def __init__(
    self,
    resource: Resource = Resource.create(),
    shutdown_on_exit: bool = True,
    multi_log_processor: Union[
    SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor
    ] = None,
    ):
    self._resource = resource
    self._multi_log_processor = (
    multi_log_processor or SynchronousMultiLogProcessor()
    )
    self._at_exit_handler = None
    if shutdown_on_exit:
    self._at_exit_handler = atexit.register(self.shutdown)
    @property
    def resource(self):
    return self._resource
    def get_log_emitter(
    self,
    instrumenting_module_name: str,
    instrumenting_module_version: str = "",
    ) -> LogEmitter:
    return LogEmitter(
    self._resource,
    self._multi_log_processor,
    InstrumentationScope(
    instrumenting_module_name, instrumenting_module_version
    ),
    )
    def add_log_processor(self, log_processor: LogProcessor):
    """Registers a new :class:`LogProcessor` for this `LogEmitterProvider` instance.
    The log processors are invoked in the same order they are registered.
    """
    self._multi_log_processor.add_log_processor(log_processor)
    def shutdown(self):
    """Shuts down the log processors."""
    self._multi_log_processor.shutdown()
    if self._at_exit_handler is not None:
    atexit.unregister(self._at_exit_handler)
    self._at_exit_handler = None
    def force_flush(self, timeout_millis: int = 30000) -> bool:
    """Force flush the log processors.
    Args:
    timeout_millis: The maximum amount of time to wait for logs to be
    exported.
    Returns:
    True if all the log processors flushes the logs within timeout,
    False otherwise.
    """
    return self._multi_log_processor.force_flush(timeout_millis)
    _LOG_EMITTER_PROVIDER = None
    def get_log_emitter_provider() -> LogEmitterProvider:
    """Gets the current global :class:`~.LogEmitterProvider` object."""
    global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement
    if _LOG_EMITTER_PROVIDER is None:
    if _OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ:
    _LOG_EMITTER_PROVIDER = LogEmitterProvider()
    return _LOG_EMITTER_PROVIDER
    _LOG_EMITTER_PROVIDER = cast(
    "LogEmitterProvider",
    _load_provider(
    _OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider"
    ),
    )
    return _LOG_EMITTER_PROVIDER
    def set_log_emitter_provider(log_emitter_provider: LogEmitterProvider) -> None:
    """Sets the current global :class:`~.LogEmitterProvider` object.
    This can only be done once, a warning will be logged if any furter attempt
    is made.
    """
    global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement
    if _LOG_EMITTER_PROVIDER is not None:
    _logger.warning(
    "Overriding of current LogEmitterProvider is not allowed"
    )
    return
    _LOG_EMITTER_PROVIDER = log_emitter_provider
    def get_log_emitter(
    instrumenting_module_name: str,
    instrumenting_library_version: str = "",
    log_emitter_provider: Optional[LogEmitterProvider] = None,
    ) -> LogEmitter:
    """Returns a `LogEmitter` for use within a python process.
    This function is a convenience wrapper for
    opentelemetry.sdk._logs.LogEmitterProvider.get_log_emitter.
    If log_emitter_provider param is omitted the current configured one is used.
    """
    if log_emitter_provider is None:
    log_emitter_provider = get_log_emitter_provider()
    return log_emitter_provider.get_log_emitter(
    instrumenting_module_name, instrumenting_library_version
    )
  • https://docs.python.org/3/library/logging.html#logging.Handler
  • Add LogProcessors implementation #1916 (comment)
  • Is LogEmitter.flush necessary? opentelemetry-specification#2342
  • open-telemetry/opentelemetry-specification@441bafa
  • https://github.com/srikanthccv/opentelemetry-specification/blob/ca593258953811eb33197976b8e565299b874853/specification/logs/logging-library-sdk.md

@pranavmarla
Copy link
Contributor

Update: I believe I've successfully completed the fix!
It still needs to undergo internal review at my company but, once that's done, I'll be able to open a pull request here.

@pranavmarla
Copy link
Contributor

pranavmarla commented Aug 6, 2022

@srikanthccv
Update: I've opened a pull request with my fix here: #2863
Please check it out when you get a chance -- feel free to let me know if you need me to change/add anything!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants