diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ed8a038b08..efd5539cf1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2964](https://github.com/open-telemetry/opentelemetry-python/pull/2964)) - Add OpenCensus trace bridge/shim ([#3210](https://github.com/open-telemetry/opentelemetry-python/pull/3210)) +- Add speced out environment variables and arguments for BatchLogRecordProcessor + ([#3237](https://github.com/open-telemetry/opentelemetry-python/pull/3237)) ## Version 1.16.0/0.37b0 (2023-02-17) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 3f19b79e100..aecf74b5a9e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -19,12 +19,18 @@ import os import sys import threading -from os import linesep +from os import environ, linesep from time import time_ns from typing import IO, Callable, Deque, List, Optional, Sequence from opentelemetry.context import attach, detach, set_value from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) from opentelemetry.util._once import Once _logger = logging.getLogger(__name__) @@ -137,20 +143,66 @@ class BatchLogRecordProcessor(LogRecordProcessor): """This is an implementation of LogRecordProcessor which creates batches of received logs in the export-friendly LogData representation and send to the configured LogExporter, as soon as they are emitted. + + `BatchLogRecordProcessor` is configurable with the following environment + variables which correspond to constructor parameters: + + - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` + - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` + - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` + - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` """ def __init__( self, exporter: LogExporter, - schedule_delay_millis: int = 5000, - max_export_batch_size: int = 512, - export_timeout_millis: int = 30000, + max_queue_size: int = None, + schedule_delay_millis: int = None, + max_export_batch_size: int = None, + export_timeout_millis: int = None, ): + if max_queue_size is None: + max_queue_size = int(environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, 2048)) + + if schedule_delay_millis is None: + schedule_delay_millis = int( + environ.get(OTEL_BLRP_SCHEDULE_DELAY, 5000) + ) + + if max_export_batch_size is None: + max_export_batch_size = int( + environ.get(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, 512) + ) + + if export_timeout_millis is None: + export_timeout_millis = int( + environ.get(OTEL_BLRP_EXPORT_TIMEOUT, 30000) + ) + + if max_queue_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + if max_export_batch_size <= 0: + raise ValueError( + "max_export_batch_size must be a positive integer." + ) + + if max_export_batch_size > max_queue_size: + raise ValueError( + "max_export_batch_size must be less than or equal to max_queue_size." + ) + self._exporter = exporter + self._max_queue_size = max_queue_size self._schedule_delay_millis = schedule_delay_millis self._max_export_batch_size = max_export_batch_size self._export_timeout_millis = export_timeout_millis - self._queue = collections.deque() # type: Deque[LogData] + self._queue = collections.deque( + [], max_queue_size + ) # type: Deque[LogData] self._worker_thread = threading.Thread( name="OtelBatchLogRecordProcessor", target=self.worker, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 38819943265..0d90270a53b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -66,11 +66,43 @@ i.e. the SDK behaves as if OTEL_TRACES_SAMPLER_ARG is not set. """ +OTEL_BLRP_SCHEDULE_DELAY = "OTEL_BLRP_SCHEDULE_DELAY" +""" +.. envvar:: OTEL_BLRP_SCHEDULE_DELAY + +The :envvar:`OTEL_BLRP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchLogRecordProcessor. +Default: 5000 +""" + +OTEL_BLRP_EXPORT_TIMEOUT = "OTEL_BLRP_EXPORT_TIMEOUT" +""" +.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT + +The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor. +Default: 30000 +""" + +OTEL_BLRP_MAX_QUEUE_SIZE = "OTEL_BLRP_MAX_QUEUE_SIZE" +""" +.. envvar:: OTEL_BLRP_MAX_QUEUE_SIZE + +The :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchLogRecordProcessor. +Default: 2048 +""" + +OTEL_BLRP_MAX_EXPORT_BATCH_SIZE = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" +""" +.. envvar:: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE + +The :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchLogRecordProcessor. +Default: 512 +""" + OTEL_BSP_SCHEDULE_DELAY = "OTEL_BSP_SCHEDULE_DELAY" """ .. envvar:: OTEL_BSP_SCHEDULE_DELAY -The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports. +The :envvar:`OTEL_BSP_SCHEDULE_DELAY` represents the delay interval between two consecutive exports of the BatchSpanProcessor. Default: 5000 """ @@ -78,7 +110,7 @@ """ .. envvar:: OTEL_BSP_EXPORT_TIMEOUT -The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data. +The :envvar:`OTEL_BSP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchSpanProcessor. Default: 30000 """ @@ -86,7 +118,7 @@ """ .. envvar:: OTEL_BSP_MAX_QUEUE_SIZE -The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export. +The :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` represents the maximum queue size for the data export of the BatchSpanProcessor. Default: 2048 """ @@ -94,7 +126,7 @@ """ .. envvar:: OTEL_BSP_MAX_EXPORT_BATCH_SIZE -The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export. +The :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` represents the maximum batch size for the data export of the BatchSpanProcessor. Default: 512 """ diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 6a6b1d5bf86..09a328b5f2f 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -35,6 +35,12 @@ InMemoryLogExporter, SimpleLogRecordProcessor, ) +from opentelemetry.sdk.environment_variables import ( + OTEL_BLRP_EXPORT_TIMEOUT, + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_MAX_QUEUE_SIZE, + OTEL_BLRP_SCHEDULE_DELAY, +) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.concurrency_test import ConcurrencyTestBase @@ -175,6 +181,109 @@ def test_emit_call_log_record(self): logger.error("error") self.assertEqual(log_record_processor.emit.call_count, 1) + def test_args(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_queue_size=1024, + schedule_delay_millis=2500, + max_export_batch_size=256, + export_timeout_millis=15000, + ) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 1024) + self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._max_export_batch_size, 256) + self.assertEqual(log_record_processor._export_timeout_millis, 15000) + + @patch.dict( + "os.environ", + { + OTEL_BLRP_MAX_QUEUE_SIZE: "1024", + OTEL_BLRP_SCHEDULE_DELAY: "2500", + OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: "256", + OTEL_BLRP_EXPORT_TIMEOUT: "15000", + }, + ) + def test_env_vars(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor(exporter) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 1024) + self.assertEqual(log_record_processor._schedule_delay_millis, 2500) + self.assertEqual(log_record_processor._max_export_batch_size, 256) + self.assertEqual(log_record_processor._export_timeout_millis, 15000) + + def test_args_defaults(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor(exporter) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 2048) + self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._max_export_batch_size, 512) + self.assertEqual(log_record_processor._export_timeout_millis, 30000) + + def test_args_none_defaults(self): + exporter = InMemoryLogExporter() + log_record_processor = BatchLogRecordProcessor( + exporter, + max_queue_size=None, + schedule_delay_millis=None, + max_export_batch_size=None, + export_timeout_millis=None, + ) + self.assertEqual(log_record_processor._exporter, exporter) + self.assertEqual(log_record_processor._max_queue_size, 2048) + self.assertEqual(log_record_processor._schedule_delay_millis, 5000) + self.assertEqual(log_record_processor._max_export_batch_size, 512) + self.assertEqual(log_record_processor._export_timeout_millis, 30000) + + def test_validation_negative_max_queue_size(self): + exporter = InMemoryLogExporter() + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + schedule_delay_millis=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + schedule_delay_millis=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_export_batch_size=0, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_export_batch_size=-1, + ) + self.assertRaises( + ValueError, + BatchLogRecordProcessor, + exporter, + max_queue_size=100, + max_export_batch_size=101, + ) + def test_shutdown(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor(exporter)