diff --git a/docs/implementation/metrics.md b/docs/implementation/metrics.md index 87678cbe5..3b2a2a6a5 100644 --- a/docs/implementation/metrics.md +++ b/docs/implementation/metrics.md @@ -1,8 +1,14 @@ -# Tap Metrics +# Tap and Target Metrics Metrics logging is specified in the -[Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically -emit metrics for `record_count`, `http_request_duration` and `sync_duration`. +[Singer Spec](https://hub.meltano.com/singer/spec#metrics). + +The SDK will automatically emit the following metrics: + +- `record_count`: The number of records processed by the tap or target. +- `http_request_duration`: The duration of HTTP requests made by the tap. +- `sync_duration`: The duration of the sync operation. +- `batch_processing_time`: The duration of processing a batch of records. ## Customization options diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 50d7d3926..fb97e693c 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -47,6 +47,7 @@ class Tag(str, enum.Enum): JOB_TYPE = "job_type" HTTP_STATUS_CODE = "http_status_code" STATUS = "status" + PID = "pid" class Metric(str, enum.Enum): @@ -58,6 +59,7 @@ class Metric(str, enum.Enum): HTTP_REQUEST_COUNT = "http_request_count" JOB_DURATION = "job_duration" SYNC_DURATION = "sync_duration" + BATCH_PROCESSING_TIME = "batch_processing_time" @dataclass @@ -116,6 +118,7 @@ def __init__(self, metric: Metric, tags: dict | None = None) -> None: """ self.metric = metric self.tags = tags or {} + self.tags[Tag.PID] = os.getpid() self.logger = get_metrics_logger() @property @@ -182,6 +185,10 @@ def __init__( self.log_interval = log_interval self.last_log_time = time() + def exit(self) -> None: + """Exit the counter context.""" + self._pop() + def __enter__(self) -> Counter: """Enter the counter context. @@ -204,7 +211,7 @@ def __exit__( exc_val: The exception value. exc_tb: The exception traceback. """ - self._pop() + self.exit() def _pop(self) -> None: """Log and reset the counter.""" diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e3c1ef566..5a936a634 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -17,6 +17,7 @@ import jsonschema.validators from typing_extensions import override +from singer_sdk import metrics from singer_sdk._singerlib.json import deserialize_json from singer_sdk.exceptions import ( InvalidJSONSchema, @@ -193,6 +194,31 @@ def __init__( ) self._validator: BaseJSONSchemaValidator | None = self.get_validator() + self._record_counter: metrics.Counter = metrics.record_counter(self.stream_name) + self._batch_timer = metrics.Timer( + metrics.Metric.BATCH_PROCESSING_TIME, + tags={ + metrics.Tag.STREAM: self.stream_name, + }, + ) + + @property + def record_counter_metric(self) -> metrics.Counter: + """Get the record counter for this sink. + + Returns: + The Meter instance for the record counter. + """ + return self._record_counter + + @property + def batch_processing_timer(self) -> metrics.Timer: + """Get the batch processing timer for this sink. + + Returns: + The Meter instance for the batch processing timer. + """ + return self._batch_timer @cached_property def validate_schema(self) -> bool: @@ -685,6 +711,7 @@ def clean_up(self) -> None: should not be relied on, it's recommended to use a uuid as well. """ self.logger.info("Cleaning up %s", self.stream_name) + self.record_counter_metric.exit() def process_batch_files( self, diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 81c991a09..22ad28176 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -359,6 +359,7 @@ def _process_record_message(self, message_dict: dict) -> None: sink.tally_record_read() sink.process_record(transformed_record, context) + sink.record_counter_metric.increment() sink._after_process_record(context) # noqa: SLF001 if sink.is_full: @@ -510,7 +511,8 @@ def drain_one(self, sink: Sink) -> None: # noqa: PLR6301 return draining_status = sink.start_drain() - sink.process_batch(draining_status) + with sink.batch_processing_timer: + sink.process_batch(draining_status) sink.mark_drained() def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None: diff --git a/tests/core/test_metrics.py b/tests/core/test_metrics.py index c78969f86..b0de4c9bb 100644 --- a/tests/core/test_metrics.py +++ b/tests/core/test_metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import os import pytest import time_machine @@ -18,6 +19,8 @@ def __str__(self) -> str: def test_meter(): + pid = os.getpid() + class _MyMeter(metrics.Meter): def __enter__(self): return self @@ -27,11 +30,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): meter = _MyMeter(metrics.Metric.RECORD_COUNT) - assert meter.tags == {} + assert meter.tags == {metrics.Tag.PID: pid} stream_context = {"parent_id": 1} meter.context = stream_context - assert meter.tags == {metrics.Tag.CONTEXT: stream_context} + assert meter.tags == { + metrics.Tag.CONTEXT: stream_context, + metrics.Tag.PID: pid, + } meter.context = None assert metrics.Tag.CONTEXT not in meter.tags @@ -39,6 +45,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def test_record_counter(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + pid = os.getpid() custom_object = CustomObject("test", 1) with metrics.record_counter( @@ -68,6 +75,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture): assert point.tags == { metrics.Tag.STREAM: "test_stream", metrics.Tag.ENDPOINT: "test_endpoint", + metrics.Tag.PID: pid, "custom_tag": "pytest", "custom_obj": custom_object, } @@ -79,6 +87,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture): def test_sync_timer(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + pid = os.getpid() traveler = time_machine.travel(0, tick=False) traveler.start() @@ -100,6 +109,7 @@ def test_sync_timer(caplog: pytest.LogCaptureFixture): assert point.tags == { metrics.Tag.STREAM: "test_stream", metrics.Tag.STATUS: "succeeded", + metrics.Tag.PID: pid, "custom_tag": "pytest", }