diff --git a/singer_sdk/metrics.py b/singer_sdk/metrics.py new file mode 100644 index 0000000000..f9e5c8b6b6 --- /dev/null +++ b/singer_sdk/metrics.py @@ -0,0 +1,388 @@ +"""Singer metrics logging.""" + +from __future__ import annotations + +import abc +import enum +import json +import logging +import logging.config +from dataclasses import asdict, dataclass, field +from time import time +from types import TracebackType +from typing import Any, Generic, Mapping, TypeVar + +DEFAULT_LOG_INTERVAL = 60.0 +METRICS_LOGGER_NAME = "singer.metrics" +METRICS_LOG_LEVEL_SETTING = "metrics_log_level" + +_T = TypeVar("_T") + + +class Status(str, enum.Enum): + """Constants for commonly used status values.""" + + SUCCEEDED = "succeeded" + FAILED = "failed" + + +class Tag(str, enum.Enum): + """Constants for commonly used tags.""" + + STREAM = "stream" + CONTEXT = "context" + ENDPOINT = "endpoint" + JOB_TYPE = "job_type" + HTTP_STATUS_CODE = "http_status_code" + STATUS = "status" + + +class Metric(str, enum.Enum): + """Common metric types.""" + + RECORD_COUNT = "record_count" + BATCH_COUNT = "batch_count" + HTTP_REQUEST_DURATION = "http_request_duration" + HTTP_REQUEST_COUNT = "http_request_count" + JOB_DURATION = "job_duration" + SYNC_DURATION = "sync_duration" + + +@dataclass +class Point(Generic[_T]): + """An individual metric measurement.""" + + metric_type: str + metric: Metric + value: _T + tags: dict[str, Any] = field(default_factory=dict) + + def __str__(self) -> str: + """Get string representation of this measurement. + + Returns: + A string representation of this measurement. + """ + return self.to_json() + + def to_json(self) -> str: + """Convert this measure to a JSON object. + + Returns: + A JSON object. + """ + return json.dumps(asdict(self)) + + +def log(logger: logging.Logger, point: Point) -> None: + """Log a measurement. + + Args: + logger: An logger instance. + point: A measurement. + """ + logger.info("INFO METRIC: %s", point) + + +class Meter(metaclass=abc.ABCMeta): + """Base class for all meters.""" + + def __init__(self, metric: Metric, tags: dict | None = None) -> None: + """Initialize a meter. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + """ + self.metric = metric + self.tags = tags or {} + self.logger = get_metrics_logger() + + @abc.abstractmethod + def __enter__(self) -> Meter: + """Enter the meter context.""" + ... + + @abc.abstractmethod + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the meter context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + ... + + +class Counter(Meter): + """A meter for counting things.""" + + def __init__( + self, + metric: Metric, + tags: dict | None = None, + log_interval: float = DEFAULT_LOG_INTERVAL, + ) -> None: + """Initialize a counter. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + log_interval: The interval at which to log the count. + """ + super().__init__(metric, tags) + self.value = 0 + self.log_interval = log_interval + self.last_log_time = time() + + def __enter__(self) -> Counter: + """Enter the counter context. + + Returns: + The counter instance. + """ + self.last_log_time = time() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the counter context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + self._pop() + + def _pop(self) -> None: + """Log and reset the counter.""" + log(self.logger, Point("counter", self.metric, self.value, self.tags)) + self.value = 0 + self.last_log_time = time() + + def increment(self, value: int = 1) -> None: + """Increment the counter. + + Args: + value: The value to increment by. + """ + self.value += value + if self._ready_to_log(): + self._pop() + + def _ready_to_log(self) -> bool: + """Check if the counter is ready to log. + + Returns: + True if the counter is ready to log. + """ + return time() - self.last_log_time > self.log_interval + + +class Timer(Meter): + """A meter for timing things.""" + + def __init__(self, metric: Metric, tags: dict | None = None) -> None: + """Initialize a timer. + + Args: + metric: The metric type. + tags: Tags to add to the measurement. + """ + super().__init__(metric, tags) + self.start_time = time() + + def __enter__(self) -> Timer: + """Enter the timer context. + + Returns: + The timer instance. + """ + self.start_time = time() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the timer context. + + Args: + exc_type: The exception type. + exc_val: The exception value. + exc_tb: The exception traceback. + """ + if Tag.STATUS not in self.tags: + if exc_type is None: + self.tags[Tag.STATUS] = Status.SUCCEEDED + else: + self.tags[Tag.STATUS] = Status.FAILED + log(self.logger, Point("timer", self.metric, self.elapsed(), self.tags)) + + def elapsed(self) -> float: + """Get the elapsed time. + + Returns: + The elapsed time. + """ + return time() - self.start_time + + +def get_metrics_logger() -> logging.Logger: + """Get a logger for emitting metrics. + + Returns: + A logger that can be used to emit metrics. + """ + return logging.getLogger(METRICS_LOGGER_NAME) + + +def record_counter( + stream: str, + endpoint: str | None = None, + log_interval: float = DEFAULT_LOG_INTERVAL, + **tags: Any, +) -> Counter: + """Use for counting records retrieved from the source. + + with singer.metrics.record_counter(endpoint="users") as counter: + for record in my_records: + # Do something with the record + counter.increment() + + Args: + stream: The stream name. + endpoint: The endpoint name. + log_interval: The interval at which to log the count. + tags: Tags to add to the measurement. + + Returns: + A counter for counting records. + """ + tags[Tag.STREAM] = stream + if endpoint: + tags[Tag.ENDPOINT] = endpoint + return Counter(Metric.RECORD_COUNT, tags, log_interval=log_interval) + + +def batch_counter(stream: str, **tags: Any) -> Counter: + """Use for counting batches sent to the target. + + with singer.metrics.batch_counter() as counter: + for batch in my_batches: + # Do something with the batch + counter.increment() + + Args: + stream: The stream name. + tags: Tags to add to the measurement. + + Returns: + A counter for counting batches. + """ + tags[Tag.STREAM] = stream + return Counter(Metric.BATCH_COUNT, tags) + + +def http_request_counter( + stream: str, + endpoint: str, + log_interval: float = DEFAULT_LOG_INTERVAL, + **tags: Any, +) -> Counter: + """Use for counting HTTP requests. + + with singer.metrics.http_request_counter() as counter: + for record in my_records: + # Do something with the record + counter.increment() + + Args: + stream: The stream name. + endpoint: The endpoint name. + log_interval: The interval at which to log the count. + tags: Tags to add to the measurement. + + Returns: + A counter for counting HTTP requests. + """ + tags.update({Tag.STREAM: stream, Tag.ENDPOINT: endpoint}) + return Counter(Metric.HTTP_REQUEST_COUNT, tags, log_interval=log_interval) + + +def sync_timer(stream: str, **tags: Any) -> Timer: + """Use for timing the sync of a stream. + + with singer.metrics.sync_timer() as timer: + # Do something + print(f"Sync took {timer.elapsed()} seconds") + + Args: + stream: The stream name. + tags: Tags to add to the measurement. + + Returns: + A timer for timing the sync of a stream. + """ + tags[Tag.STREAM] = stream + return Timer(Metric.SYNC_DURATION, tags) + + +def _get_logging_config(config: Mapping[str, Any] | None = None) -> dict: + """Get a logging configuration. + + Args: + config: A logging configuration. + + Returns: + A logging configuration. + """ + config = config or {} + metrics_log_level = config.get(METRICS_LOG_LEVEL_SETTING, "INFO") + + return { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "metrics": { + "format": "{asctime} {message}", + "style": "{", + }, + }, + "handlers": { + "metrics": { + "class": "logging.FileHandler", + "formatter": "metrics", + "filename": "metrics.log", + }, + }, + "loggers": { + METRICS_LOGGER_NAME: { + "level": metrics_log_level, + "handlers": ["metrics"], + "propagate": True, + }, + }, + } + + +def _setup_logging(config: Mapping[str, Any]) -> None: + """Setup logging. + + Args: + config: A plugin configuration dictionary. + """ + logging.config.dictConfig(_get_logging_config(config)) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 7ef4d48577..9c2258baad 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -23,6 +23,7 @@ import click from jsonschema import Draft4Validator, SchemaError, ValidationError +from singer_sdk import metrics from singer_sdk.configuration._dict_config import parse_environment_config from singer_sdk.exceptions import ConfigValidationError from singer_sdk.helpers._classproperty import classproperty @@ -120,6 +121,9 @@ def __init__( self._validate_config(raise_errors=validate_config) self.mapper: PluginMapper + metrics._setup_logging(self.config) + self.metrics_logger = metrics.get_metrics_logger() + @classproperty def capabilities(self) -> List[CapabilitiesEnum]: """Get capabilities. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 9c7d2e2d60..0145979b27 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -12,13 +12,13 @@ from os import PathLike from pathlib import Path from types import MappingProxyType -from typing import Any, Callable, Generator, Iterable, Iterator, Mapping, TypeVar, cast +from typing import Any, Generator, Iterable, Iterator, Mapping, TypeVar, cast from uuid import uuid4 import pendulum -import requests import singer_sdk._singerlib as singer +from singer_sdk import metrics from singer_sdk.exceptions import InvalidStreamSortException, MaxRecordsLimitException from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, @@ -52,8 +52,6 @@ FactoryType = TypeVar("FactoryType", bound="Stream") _T = TypeVar("_T") -METRICS_LOG_LEVEL_SETTING = "metrics_log_level" - def lazy_chunked_generator( iterable: Iterable[_T], @@ -116,6 +114,7 @@ def __init__( raise ValueError("Missing argument or class variable 'name'.") self.logger: logging.Logger = tap.logger + self.metrics_logger = tap.metrics_logger self.tap_name: str = tap.name self._config: dict = dict(tap.config) self._tap = tap @@ -832,95 +831,13 @@ def _write_batch_message( ) ) - @property - def _metric_logging_function(self) -> Callable | None: - """Return the metrics logging function. - - Returns: - The logging function for emitting metrics. - - Raises: - ValueError: If logging level setting is an unsupported value. - """ - if METRICS_LOG_LEVEL_SETTING not in self.config: - return self.logger.info - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "INFO": - return self.logger.info - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "DEBUG": - return self.logger.debug - - if self.config[METRICS_LOG_LEVEL_SETTING].upper() == "NONE": - return None - - raise ValueError( - "Unexpected logging level for metrics: " - + self.config[METRICS_LOG_LEVEL_SETTING] - ) - - def _write_metric_log(self, metric: dict, extra_tags: dict | None) -> None: - """Emit a metric log. Optionally with appended tag info. - - Args: - metric: TODO - extra_tags: TODO - - Returns: - None - """ - if not self._metric_logging_function: - return None - - if extra_tags: - metric["tags"].update(extra_tags) - self._metric_logging_function(f"INFO METRIC: {json.dumps(metric)}") - - def _write_record_count_log(self, record_count: int, context: dict | None) -> None: - """Emit a metric log. Optionally with appended tag info. + def _log_metric(self, point: metrics.Point) -> None: + """Log a single measurement. Args: - record_count: TODO - context: Stream partition or context dictionary. + point: A single measurement value. """ - extra_tags = {} if not context else {"context": context} - counter_metric: dict[str, Any] = { - "type": "counter", - "metric": "record_count", - "value": record_count, - "tags": {"stream": self.name}, - } - self._write_metric_log(counter_metric, extra_tags=extra_tags) - - def _write_request_duration_log( - self, - endpoint: str, - response: requests.Response, - context: dict | None, - extra_tags: dict | None, - ) -> None: - """TODO. - - Args: - endpoint: TODO - response: TODO - context: Stream partition or context dictionary. - extra_tags: TODO - """ - request_duration_metric: dict[str, Any] = { - "type": "timer", - "metric": "http_request_duration", - "value": response.elapsed.total_seconds(), - "tags": { - "endpoint": endpoint, - "http_status_code": response.status_code, - "status": "succeeded" if response.status_code < 400 else "failed", - }, - } - extra_tags = extra_tags or {} - if context: - extra_tags["context"] = context - self._write_metric_log(metric=request_duration_metric, extra_tags=extra_tags) + metrics.log(self.metrics_logger, point=point) def log_sync_costs(self) -> None: """Log a summary of Sync costs. @@ -1047,6 +964,8 @@ def _sync_records( selected = self.selected for current_context in context_list or [{}]: + counter = metrics.record_counter(self.name, context=current_context) + timer = metrics.sync_timer(self.name, context=current_context) partition_record_count = 0 current_context = current_context or None state = self.get_context_state(current_context) @@ -1056,45 +975,47 @@ def _sync_records( None if current_context is None else copy.copy(current_context) ) - for record_result in self.get_records(current_context): - if isinstance(record_result, tuple): - # Tuple items should be the record and the child context - record, child_context = record_result - else: - record = record_result - try: - self._process_record( - record, - child_context=child_context, - partition_context=state_partition_context, - ) - except InvalidStreamSortException as ex: - log_sort_error( - log_fn=self.logger.error, - ex=ex, - record_count=record_count + 1, - partition_record_count=partition_record_count + 1, - current_context=current_context, - state_partition_context=state_partition_context, - stream_name=self.name, - ) - raise ex + with counter, timer: + for record_result in self.get_records(current_context): + if isinstance(record_result, tuple): + # Tuple items should be the record and the child context + record, child_context = record_result + else: + record = record_result + try: + self._process_record( + record, + child_context=child_context, + partition_context=state_partition_context, + ) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex - self._check_max_record_limit(record_count) + self._check_max_record_limit(record_count) - if selected: - if ( - record_count - 1 - ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: - self._write_state_message() - if write_messages: - self._write_record_message(record) - self._increment_stream_state(record, context=current_context) + if selected: + if ( + record_count - 1 + ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: + self._write_state_message() + if write_messages: + self._write_record_message(record) + self._increment_stream_state(record, context=current_context) - yield record + yield record - record_count += 1 - partition_record_count += 1 + counter.increment() + record_count += 1 + partition_record_count += 1 if current_context == state_partition_context: # Finalize per-partition state only if 1:1 with context @@ -1103,7 +1024,6 @@ def _sync_records( # Finalize total stream only if we have the full full context. # Otherwise will be finalized by tap at end of sync. finalize_state_progress_markers(self.stream_state) - self._write_record_count_log(record_count=record_count, context=context) if write_messages: # Reset interim bookmarks before emitting final STATE message: @@ -1120,9 +1040,11 @@ def _sync_batches( batch_config: The batch configuration. context: Stream partition or context dictionary. """ - for encoding, manifest in self.get_batches(batch_config, context): - self._write_batch_message(encoding=encoding, manifest=manifest) - self._write_state_message() + with metrics.batch_counter(self.name, context=context) as counter: + for encoding, manifest in self.get_batches(batch_config, context): + counter.increment() + self._write_batch_message(encoding=encoding, manifest=manifest) + self._write_state_message() # Public methods ("final", not recommended to be overridden) diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index cd7709edce..871cb71e40 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -14,6 +14,7 @@ import requests from singer_sdk._singerlib import Schema +from singer_sdk import metrics from singer_sdk.authenticators import APIAuthenticatorBase, SimpleAuthenticator from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.helpers.jsonpath import extract_jsonpath @@ -241,16 +242,14 @@ def _request( TODO """ response = self.requests_session.send(prepared_request, timeout=self.timeout) - if self._LOG_REQUEST_METRICS: - extra_tags = {} - if self._LOG_REQUEST_METRIC_URLS: - extra_tags["url"] = prepared_request.path_url - self._write_request_duration_log( - endpoint=self.path, - response=response, - context=context, - extra_tags=extra_tags, - ) + self._write_request_duration_log( + endpoint=self.path, + response=response, + context=context, + extra_tags={"url": prepared_request.path_url} + if self._LOG_REQUEST_METRIC_URLS + else None, + ) self.validate_response(response) logging.debug("Response received successfully.") return response @@ -347,16 +346,57 @@ def request_records(self, context: dict | None) -> Iterable[dict]: paginator = self.get_new_paginator() decorated_request = self.request_decorator(self._request) - while not paginator.finished: - prepared_request = self.prepare_request( - context, - next_page_token=paginator.current_value, - ) - resp = decorated_request(prepared_request, context) - self.update_sync_costs(prepared_request, resp, context) - yield from self.parse_response(resp) + with metrics.http_request_counter(self.name, self.path) as request_counter: + if context is not None: + request_counter.tags[metrics.Tag.CONTEXT] = context + while not paginator.finished: + prepared_request = self.prepare_request( + context, + next_page_token=paginator.current_value, + ) + resp = decorated_request(prepared_request, context) + request_counter.increment() + self.update_sync_costs(prepared_request, resp, context) + yield from self.parse_response(resp) + + paginator.advance(resp) + + def _write_request_duration_log( + self, + endpoint: str, + response: requests.Response, + context: dict | None, + extra_tags: dict | None, + ) -> None: + """TODO. - paginator.advance(resp) + Args: + endpoint: TODO + response: TODO + context: Stream partition or context dictionary. + extra_tags: TODO + """ + extra_tags = extra_tags or {} + if context: + extra_tags[metrics.Tag.CONTEXT] = context + + point = metrics.Point( + "timer", + metric=metrics.Metric.HTTP_REQUEST_DURATION, + value=response.elapsed.total_seconds(), + tags={ + metrics.Tag.STREAM: self.name, + metrics.Tag.ENDPOINT: self.path, + metrics.Tag.HTTP_STATUS_CODE: response.status_code, + metrics.Tag.STATUS: ( + metrics.Status.SUCCEEDED + if response.status_code < 400 + else metrics.Status.FAILED + ), + **extra_tags, + }, + ) + self._log_metric(point) def update_sync_costs( self, diff --git a/tests/core/test_metrics.py b/tests/core/test_metrics.py new file mode 100644 index 0000000000..e1f2d00637 --- /dev/null +++ b/tests/core/test_metrics.py @@ -0,0 +1,57 @@ +import logging +import time + +import pytest + +from singer_sdk import metrics + + +def test_record_counter(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + with metrics.record_counter("test_stream", custom_tag="pytest") as counter: + for _ in range(100): + counter.increment() + + total = 0 + for record in caplog.records: + assert record.levelname == "INFO" + assert record.msg == "INFO METRIC: %s" + + point: metrics.Point[int] = record.args[0] + assert point.metric_type == "counter" + assert point.metric == "record_count" + assert point.tags == { + metrics.Tag.STREAM: "test_stream", + "custom_tag": "pytest", + } + + total += point.value + + assert total == 100 + + +def test_sync_timer(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME) + with metrics.sync_timer("test_stream", custom_tag="pytest") as timer: + start_time = timer.start_time + for _ in range(1000): + time.sleep(0.001) + end_time = time.time() + + total = 0.0 + for record in caplog.records: + assert record.levelname == "INFO" + assert record.msg == "INFO METRIC: %s" + + point: metrics.Point[float] = record.args[0] + assert point.metric_type == "timer" + assert point.metric == "sync_duration" + assert point.tags == { + metrics.Tag.STREAM: "test_stream", + metrics.Tag.STATUS: "succeeded", + "custom_tag": "pytest", + } + + total += point.value + + assert pytest.approx(total, rel=0.001) == end_time - start_time