Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add speced out environment variables and arguments for BatchLogRecordProcessor #3237

Merged
merged 5 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
- Fix exporting of ExponentialBucketHistogramAggregation from opentelemetry.sdk.metrics.view
([#3240](https://github.com/open-telemetry/opentelemetry-python/pull/3240))

- Fix headers types mismatch for OTLP Exporters
([#3226](https://github.com/open-telemetry/opentelemetry-python/pull/3226))
- Fix suppress instrumentation for log batch processor
([#3223](https://github.com/open-telemetry/opentelemetry-python/pull/3223))
- Add speced out environment variables and arguments for BatchLogRecordProcessor
([#3237](https://github.com/open-telemetry/opentelemetry-python/pull/3237))

## Version 1.17.0/0.38b0 (2023-03-22)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os
import sys
import threading
from os import linesep
from os import environ, linesep
from time import time_ns
from typing import IO, Callable, Deque, List, Optional, Sequence

Expand All @@ -30,8 +30,22 @@
set_value,
)
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
from opentelemetry.sdk.environment_variables import (
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_SCHEDULE_DELAY,
)
from opentelemetry.util._once import Once

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
_DEFAULT_MAX_QUEUE_SIZE = 2048
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
"Unable to parse value for %s as integer. Defaulting to %s."
)

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -142,20 +156,54 @@ class BatchLogRecordProcessor(LogRecordProcessor):
"""This is an implementation of LogRecordProcessor which creates batches of
received logs in the export-friendly LogData representation and
send to the configured LogExporter, as soon as they are emitted.

`BatchLogRecordProcessor` is configurable with the following environment
variables which correspond to constructor parameters:

- :envvar:`OTEL_BLRP_SCHEDULE_DELAY`
- :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
- :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
- :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
"""

def __init__(
self,
exporter: LogExporter,
schedule_delay_millis: int = 5000,
max_export_batch_size: int = 512,
export_timeout_millis: int = 30000,
schedule_delay_millis: float = None,
max_export_batch_size: int = None,
export_timeout_millis: float = None,
max_queue_size: int = None,
):
if max_queue_size is None:
max_queue_size = BatchLogRecordProcessor._default_max_queue_size()

if schedule_delay_millis is None:
schedule_delay_millis = (
BatchLogRecordProcessor._default_schedule_delay_millis()
)

if max_export_batch_size is None:
max_export_batch_size = (
BatchLogRecordProcessor._default_max_export_batch_size()
)

if export_timeout_millis is None:
export_timeout_millis = (
BatchLogRecordProcessor._default_export_timeout_millis()
)

BatchLogRecordProcessor._validate_arguments(
max_queue_size, schedule_delay_millis, max_export_batch_size
)

self._exporter = exporter
self._max_queue_size = max_queue_size
self._schedule_delay_millis = schedule_delay_millis
self._max_export_batch_size = max_export_batch_size
self._export_timeout_millis = export_timeout_millis
self._queue = collections.deque() # type: Deque[LogData]
self._queue = collections.deque(
[], max_queue_size
) # type: Deque[LogData]
self._worker_thread = threading.Thread(
name="OtelBatchLogRecordProcessor",
target=self.worker,
Expand Down Expand Up @@ -333,3 +381,86 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if not ret:
_logger.warning("Timeout was exceeded in force_flush().")
return ret

@staticmethod
def _default_max_queue_size():
try:
return int(
environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE)
)
except ValueError:
_logger.exception(
_ENV_VAR_INT_VALUE_ERROR_MESSAGE,
OTEL_BLRP_MAX_QUEUE_SIZE,
_DEFAULT_MAX_QUEUE_SIZE,
)
return _DEFAULT_MAX_QUEUE_SIZE

@staticmethod
def _default_schedule_delay_millis():
try:
return int(
environ.get(
OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS
)
)
except ValueError:
_logger.exception(
_ENV_VAR_INT_VALUE_ERROR_MESSAGE,
OTEL_BLRP_SCHEDULE_DELAY,
_DEFAULT_SCHEDULE_DELAY_MILLIS,
)
return _DEFAULT_SCHEDULE_DELAY_MILLIS

@staticmethod
def _default_max_export_batch_size():
try:
return int(
environ.get(
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
_DEFAULT_MAX_EXPORT_BATCH_SIZE,
)
)
except ValueError:
_logger.exception(
_ENV_VAR_INT_VALUE_ERROR_MESSAGE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
_DEFAULT_MAX_EXPORT_BATCH_SIZE,
)
return _DEFAULT_MAX_EXPORT_BATCH_SIZE

@staticmethod
def _default_export_timeout_millis():
try:
return int(
environ.get(
OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS
)
)
except ValueError:
_logger.exception(
_ENV_VAR_INT_VALUE_ERROR_MESSAGE,
OTEL_BLRP_EXPORT_TIMEOUT,
_DEFAULT_EXPORT_TIMEOUT_MILLIS,
)
return _DEFAULT_EXPORT_TIMEOUT_MILLIS

@staticmethod
def _validate_arguments(
max_queue_size, schedule_delay_millis, max_export_batch_size
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")

if schedule_delay_millis <= 0:
raise ValueError("schedule_delay_millis must be positive.")

if max_export_batch_size <= 0:
raise ValueError(
"max_export_batch_size must be a positive integer."
)

if max_export_batch_size > max_queue_size:
raise ValueError(
"max_export_batch_size must be less than or equal to max_queue_size."
)
Original file line number Diff line number Diff line change
Expand Up @@ -66,35 +66,67 @@
i.e. the SDK behaves as if OTEL_TRACES_SAMPLER_ARG is not set.
"""

OTEL_BLRP_SCHEDULE_DELAY = "OTEL_BLRP_SCHEDULE_DELAY"
"""
.. envvar:: OTEL_BLRP_SCHEDULE_DELAY

The :envvar:`OTEL_BLRP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchLogRecordProcessor.
Default: 5000
"""

OTEL_BLRP_EXPORT_TIMEOUT = "OTEL_BLRP_EXPORT_TIMEOUT"
"""
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT

The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
Default: 30000
"""

OTEL_BLRP_MAX_QUEUE_SIZE = "OTEL_BLRP_MAX_QUEUE_SIZE"
"""
.. envvar:: OTEL_BLRP_MAX_QUEUE_SIZE

The :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchLogRecordProcessor.
Default: 2048
"""

OTEL_BLRP_MAX_EXPORT_BATCH_SIZE = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
"""
.. envvar:: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE

The :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchLogRecordProcessor.
Default: 512
"""

OTEL_BSP_SCHEDULE_DELAY = "OTEL_BSP_SCHEDULE_DELAY"
"""
.. envvar:: OTEL_BSP_SCHEDULE_DELAY

The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports.
The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchSpanProcessor.
Default: 5000
"""

OTEL_BSP_EXPORT_TIMEOUT = "OTEL_BSP_EXPORT_TIMEOUT"
"""
.. envvar:: OTEL_BSP_EXPORT_TIMEOUT

The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data.
The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchSpanProcessor.
Default: 30000
"""

OTEL_BSP_MAX_QUEUE_SIZE = "OTEL_BSP_MAX_QUEUE_SIZE"
"""
.. envvar:: OTEL_BSP_MAX_QUEUE_SIZE

The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export.
The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchSpanProcessor.
Default: 2048
"""

OTEL_BSP_MAX_EXPORT_BATCH_SIZE = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
"""
.. envvar:: OTEL_BSP_MAX_EXPORT_BATCH_SIZE

The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export.
The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchSpanProcessor.
Default: 512
"""

Expand Down
Loading