From 32df79343167b2ede9bf7a8a8927c1de003e2305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 18 Jun 2024 17:02:54 +0200 Subject: [PATCH 1/7] feat: Emit some target metrics --- singer_sdk/io_base.py | 53 +++++++++++++++++++++++++-------------- singer_sdk/metrics.py | 8 +++++- singer_sdk/sinks/core.py | 20 +++++++++++++++ singer_sdk/target_base.py | 1 + 4 files changed, 62 insertions(+), 20 deletions(-) diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 2c5698e29..3ccec3738 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -6,10 +6,12 @@ import decimal import json import logging +import os import sys import typing as t from collections import Counter, defaultdict +from singer_sdk import metrics from singer_sdk._singerlib.messages import Message, SingerMessageType from singer_sdk._singerlib.messages import format_message as singer_format_message from singer_sdk._singerlib.messages import write_message as singer_write_message @@ -33,8 +35,14 @@ def listen(self, file_input: t.IO[str] | None = None) -> None: if not file_input: file_input = sys.stdin - self._process_lines(file_input) - self._process_endofpipe() + load_timer = metrics.Timer( + metrics.Metric.SYNC_DURATION, + {"pid": os.getpid()}, + ) + + with load_timer: + self._process_lines(file_input) + self._process_endofpipe() @staticmethod def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: @@ -83,30 +91,37 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: A counter object for the processed lines. """ stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) + record_message_counter = metrics.Counter( + metrics.Metric.MESSAGE_COUNT, + {"pid": os.getpid()}, + log_interval=metrics.DEFAULT_LOG_INTERVAL, + ) + with record_message_counter: + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) + elif record_type == SingerMessageType.RECORD: + record_message_counter.increment() + self._process_record_message(line_dict) - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) - else: - self._process_unknown_message(line_dict) + else: + self._process_unknown_message(line_dict) # pragma: no cover - stats[record_type] += 1 + stats[record_type] += 1 return Counter(**stats) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 990285ae0..9bb5dd759 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -45,6 +45,7 @@ class Tag(str, enum.Enum): JOB_TYPE = "job_type" HTTP_STATUS_CODE = "http_status_code" STATUS = "status" + PID = "pid" class Metric(str, enum.Enum): @@ -56,6 +57,7 @@ class Metric(str, enum.Enum): HTTP_REQUEST_COUNT = "http_request_count" JOB_DURATION = "job_duration" SYNC_DURATION = "sync_duration" + MESSAGE_COUNT = "message_count" @dataclass @@ -180,6 +182,10 @@ def __init__( self.log_interval = log_interval self.last_log_time = time() + def exit(self) -> None: + """Exit the counter context.""" + self._pop() + def __enter__(self) -> Counter: """Enter the counter context. @@ -202,7 +208,7 @@ def __exit__( exc_val: The exception value. exc_tb: The exception traceback. """ - self._pop() + self.exit() def _pop(self) -> None: """Log and reset the counter.""" diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index a2f54c8ca..c4787ce56 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -7,6 +7,7 @@ import datetime import importlib.util import json +import os import time import typing as t from functools import cached_property @@ -17,6 +18,7 @@ import jsonschema from typing_extensions import override +from singer_sdk import metrics from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, @@ -188,6 +190,23 @@ def __init__( ) self._validator: BaseJSONSchemaValidator | None = self.get_validator() + self._record_counter: metrics.Counter = metrics.Counter( + metrics.Metric.RECORD_COUNT, + { + metrics.Tag.STREAM: stream_name, + metrics.Tag.PID: os.getpid(), + }, + log_interval=metrics.DEFAULT_LOG_INTERVAL, + ) + + @property + def record_counter_metric(self) -> metrics.Counter: + """Get the record counter metric. + + Returns: + The record counter metric. + """ + return self._record_counter @cached_property def validate_schema(self) -> bool: @@ -680,6 +699,7 @@ def clean_up(self) -> None: should not be relied on, it's recommended to use a uuid as well. """ self.logger.info("Cleaning up %s", self.stream_name) + self.record_counter_metric.exit() def process_batch_files( self, diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d560a555e..8e5622c94 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -360,6 +360,7 @@ def _process_record_message(self, message_dict: dict) -> None: sink.tally_record_read() sink.process_record(transformed_record, context) + sink.record_counter_metric.increment() sink._after_process_record(context) # noqa: SLF001 if sink.is_full: From 4b166ac6b20d4f353c78ffed02529eeb787f0cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 09:11:00 -0600 Subject: [PATCH 2/7] Remove unused Metric --- singer_sdk/metrics.py | 1 - 1 file changed, 1 deletion(-) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 8a8903024..3d9defd54 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -59,7 +59,6 @@ class Metric(str, enum.Enum): HTTP_REQUEST_COUNT = "http_request_count" JOB_DURATION = "job_duration" SYNC_DURATION = "sync_duration" - MESSAGE_COUNT = "message_count" @dataclass From 93a87a094215ba780f42ff19661237f64a2a3279 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 09:14:35 -0600 Subject: [PATCH 3/7] Use PID tag in all metrics --- singer_sdk/metrics.py | 1 + singer_sdk/sinks/core.py | 10 +--------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 3d9defd54..15c00fbab 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -117,6 +117,7 @@ def __init__(self, metric: Metric, tags: dict | None = None) -> None: """ self.metric = metric self.tags = tags or {} + self.tags[Tag.PID] = os.getpid() self.logger = get_metrics_logger() @property diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index d8a0b004c..e5e5a0dd6 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -6,7 +6,6 @@ import copy import datetime import importlib.util -import os import time import typing as t from functools import cached_property @@ -190,14 +189,7 @@ def __init__( ) self._validator: BaseJSONSchemaValidator | None = self.get_validator() - self._record_counter: metrics.Counter = metrics.Counter( - metrics.Metric.RECORD_COUNT, - { - metrics.Tag.STREAM: stream_name, - metrics.Tag.PID: os.getpid(), - }, - log_interval=metrics.DEFAULT_LOG_INTERVAL, - ) + self._record_counter: metrics.Counter = metrics.record_counter(stream_name) @property def record_counter_metric(self) -> metrics.Counter: From a7e15fe87b0565a364130f998724c24be668c8f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 10:46:18 -0600 Subject: [PATCH 4/7] Update tests --- tests/core/test_metrics.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/core/test_metrics.py b/tests/core/test_metrics.py index c78969f86..b0de4c9bb 100644 --- a/tests/core/test_metrics.py +++ b/tests/core/test_metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import os import pytest import time_machine @@ -18,6 +19,8 @@ def __str__(self) -> str: def test_meter(): + pid = os.getpid() + class _MyMeter(metrics.Meter): def __enter__(self): return self @@ -27,11 +30,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): meter = _MyMeter(metrics.Metric.RECORD_COUNT) - assert meter.tags == {} + assert meter.tags == {metrics.Tag.PID: pid} stream_context = {"parent_id": 1} meter.context = stream_context - assert meter.tags == {metrics.Tag.CONTEXT: stream_context} + assert meter.tags == { + metrics.Tag.CONTEXT: stream_context, + metrics.Tag.PID: pid, + } meter.context = None assert metrics.Tag.CONTEXT not in meter.tags @@ -39,6 +45,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def test_record_counter(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + pid = os.getpid() custom_object = CustomObject("test", 1) with metrics.record_counter( @@ -68,6 +75,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture): assert point.tags == { metrics.Tag.STREAM: "test_stream", metrics.Tag.ENDPOINT: "test_endpoint", + metrics.Tag.PID: pid, "custom_tag": "pytest", "custom_obj": custom_object, } @@ -79,6 +87,7 @@ def test_record_counter(caplog: pytest.LogCaptureFixture): def test_sync_timer(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + pid = os.getpid() traveler = time_machine.travel(0, tick=False) traveler.start() @@ -100,6 +109,7 @@ def test_sync_timer(caplog: pytest.LogCaptureFixture): assert point.tags == { metrics.Tag.STREAM: "test_stream", metrics.Tag.STATUS: "succeeded", + metrics.Tag.PID: pid, "custom_tag": "pytest", } From 2eb36e83efc65a3f9fb3781a8f62193a85c52fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 10:53:10 -0600 Subject: [PATCH 5/7] Update docs --- docs/implementation/metrics.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/implementation/metrics.md b/docs/implementation/metrics.md index 87678cbe5..f16d4eee5 100644 --- a/docs/implementation/metrics.md +++ b/docs/implementation/metrics.md @@ -1,8 +1,13 @@ -# Tap Metrics +# Tap and Target Metrics Metrics logging is specified in the -[Singer Spec](https://hub.meltano.com/singer/spec#metrics). The SDK will automatically -emit metrics for `record_count`, `http_request_duration` and `sync_duration`. +[Singer Spec](https://hub.meltano.com/singer/spec#metrics). + +The SDK will automatically the following metrics: + +- `record_count`: The number of records processed by the tap or target. +- `http_request_duration`: The duration of HTTP requests made by the tap. +- `sync_duration`: The duration of the sync operation. ## Customization options From 1dbb65fcca3005dd9b050e046558dd8d9a2161ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 11:09:46 -0600 Subject: [PATCH 6/7] Add batch processing timer --- singer_sdk/metrics.py | 1 + singer_sdk/sinks/core.py | 21 ++++++++++++++++++--- singer_sdk/target_base.py | 3 ++- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py index 15c00fbab..fb97e693c 100644 --- a/singer_sdk/metrics.py +++ b/singer_sdk/metrics.py @@ -59,6 +59,7 @@ class Metric(str, enum.Enum): HTTP_REQUEST_COUNT = "http_request_count" JOB_DURATION = "job_duration" SYNC_DURATION = "sync_duration" + BATCH_PROCESSING_TIME = "batch_processing_time" @dataclass diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e5e5a0dd6..215d952e6 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -189,17 +189,32 @@ def __init__( ) self._validator: BaseJSONSchemaValidator | None = self.get_validator() - self._record_counter: metrics.Counter = metrics.record_counter(stream_name) + self._record_counter: metrics.Counter = metrics.record_counter(self.stream_name) + self._batch_timer = metrics.Timer( + metrics.Metric.BATCH_PROCESSING_TIME, + tags={ + metrics.Tag.STREAM: self.stream_name, + }, + ) @property def record_counter_metric(self) -> metrics.Counter: - """Get the record counter metric. + """Get the record counter for this sink. Returns: - The record counter metric. + The Meter instance for the record counter. """ return self._record_counter + @property + def batch_processing_timer(self) -> metrics.Timer: + """Get the batch processing timer for this sink. + + Returns: + The Meter instance for the batch processing timer. + """ + return self._batch_timer + @cached_property def validate_schema(self) -> bool: """Enable JSON schema record validation. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index b1810d0ab..d1f51705e 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -512,7 +512,8 @@ def drain_one(self, sink: Sink) -> None: # noqa: PLR6301 return draining_status = sink.start_drain() - sink.process_batch(draining_status) + with sink.batch_processing_timer: + sink.process_batch(draining_status) sink.mark_drained() def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None: From 69d599ceaddcbe4c4d75a47ca0f66f024674ee1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 6 Aug 2024 11:10:22 -0600 Subject: [PATCH 7/7] Add docs --- docs/implementation/metrics.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/implementation/metrics.md b/docs/implementation/metrics.md index f16d4eee5..3b2a2a6a5 100644 --- a/docs/implementation/metrics.md +++ b/docs/implementation/metrics.md @@ -3,11 +3,12 @@ Metrics logging is specified in the [Singer Spec](https://hub.meltano.com/singer/spec#metrics). -The SDK will automatically the following metrics: +The SDK will automatically emit the following metrics: - `record_count`: The number of records processed by the tap or target. - `http_request_duration`: The duration of HTTP requests made by the tap. - `sync_duration`: The duration of the sync operation. +- `batch_processing_time`: The duration of processing a batch of records. ## Customization options