From c29b8f614dfff3335e7350ae3d9ca68b1d46823a Mon Sep 17 00:00:00 2001 From: Laurent VAYLET Date: Wed, 2 Nov 2022 17:56:45 +0000 Subject: [PATCH 1/2] refactor shared code in Cloud Monitoring backends to Abstract Base Class (ABC) --- slo_generator/backends/cloud_monitoring.py | 277 +++++------------- .../backends/cloud_monitoring_abc.py | 213 ++++++++++++++ .../backends/cloud_monitoring_mql.py | 266 +++++------------ .../backends/test_cloud_monitoring_mql.py | 60 ++-- 4 files changed, 388 insertions(+), 428 deletions(-) create mode 100644 slo_generator/backends/cloud_monitoring_abc.py diff --git a/slo_generator/backends/cloud_monitoring.py b/slo_generator/backends/cloud_monitoring.py index eb9e8036..a0ff47fd 100644 --- a/slo_generator/backends/cloud_monitoring.py +++ b/slo_generator/backends/cloud_monitoring.py @@ -13,181 +13,57 @@ # limitations under the License. """ `cloud_monitoring.py` -Cloud Monitoring backend implementation. +Cloud Monitoring backend implementation with MQF (Monitoring Query Filters). """ import logging import pprint -import warnings -from collections import OrderedDict +from typing import List, Optional -from google.cloud import monitoring_v3 +from google.api.distribution_pb2 import Distribution +from google.cloud.monitoring_v3 import Aggregation, ListTimeSeriesRequest, TimeInterval +from google.cloud.monitoring_v3.services.metric_service import MetricServiceClient +from google.cloud.monitoring_v3.services.metric_service.pagers import ( + ListTimeSeriesPager, +) +from google.cloud.monitoring_v3.types.metric import TimeSeries -from slo_generator.constants import NO_DATA +from .cloud_monitoring_abc import CloudMonitoringBackendABC LOGGER = logging.getLogger(__name__) -class CloudMonitoringBackend: - """Backend for querying metrics from Cloud Monitoring. +class CloudMonitoringBackend(CloudMonitoringBackendABC): + """Backend for querying metrics from Cloud Monitoring with MQF. Args: project_id (str): Cloud Monitoring host project id. - client (google.cloud.monitoring_v3.MetricServiceClient, optional): - Existing Cloud Monitoring Metrics client. Initialize a new client - if omitted. + client (monitoring_v3.services.query_service.MetricServiceClient, optional): + Existing Cloud Monitoring Metrics client. Initialize a new client if + omitted. """ - def __init__(self, project_id, client=None): + def __init__(self, project_id: str, client=None): self.client = client if client is None: - self.client = monitoring_v3.MetricServiceClient() + self.client = MetricServiceClient() self.parent = self.client.common_project_path(project_id) - # pylint: disable=duplicate-code - def good_bad_ratio(self, timestamp, window, slo_config): - """Query two timeseries, one containing 'good' events, one containing - 'bad' events. - - Args: - timestamp (int): UNIX timestamp. - window (int): Window size (in seconds). - slo_config (dict): SLO configuration. - - Returns: - tuple: A tuple (good_event_count, bad_event_count) - """ - measurement = slo_config["spec"]["service_level_indicator"] - filter_good = measurement["filter_good"] - filter_bad = measurement.get("filter_bad") - filter_valid = measurement.get("filter_valid") - - # Query 'good events' timeseries - good_ts = self.query( - timestamp=timestamp, - window=window, - filter=filter_good, - ) - good_ts = list(good_ts) - good_event_count = CM.count(good_ts) - - # Query 'bad events' timeseries - if filter_bad: - bad_ts = self.query( - timestamp=timestamp, - window=window, - filter=filter_bad, - ) - bad_ts = list(bad_ts) - bad_event_count = CM.count(bad_ts) - elif filter_valid: - valid_ts = self.query( - timestamp=timestamp, - window=window, - filter=filter_valid, - ) - valid_ts = list(valid_ts) - bad_event_count = CM.count(valid_ts) - good_event_count - else: - raise Exception("One of `filter_bad` or `filter_valid` is required.") - - LOGGER.debug( - f"Good events: {good_event_count} | " f"Bad events: {bad_event_count}" - ) - - return good_event_count, bad_event_count - - # pylint: disable=duplicate-code,too-many-locals - def distribution_cut(self, timestamp, window, slo_config): - """Query one timeseries of type 'exponential'. - - Args: - timestamp (int): UNIX timestamp. - window (int): Window size (in seconds). - slo_config (dict): SLO configuration. - - Returns: - tuple: A tuple (good_event_count, bad_event_count). - """ - measurement = slo_config["spec"]["service_level_indicator"] - filter_valid = measurement["filter_valid"] - threshold_bucket = int(measurement["threshold_bucket"]) - good_below_threshold = measurement.get("good_below_threshold", True) - - # Query 'valid' events - series = self.query( - timestamp=timestamp, - window=window, - filter=filter_valid, - ) - series = list(series) - - if not series: - return NO_DATA, NO_DATA # no timeseries - - distribution_value = series[0].points[0].value.distribution_value - # bucket_options = distribution_value.bucket_options - bucket_counts = distribution_value.bucket_counts - valid_events_count = distribution_value.count - # growth_factor = bucket_options.exponential_buckets.growth_factor - # scale = bucket_options.exponential_buckets.scale - - # Explicit the exponential distribution result - count_sum = 0 - distribution = OrderedDict() - for i, bucket_count in enumerate(bucket_counts): - count_sum += bucket_count - # upper_bound = scale * math.pow(growth_factor, i) - distribution[i] = { - # 'upper_bound': upper_bound, - # 'bucket_count': bucket_count, - "count_sum": count_sum - } - LOGGER.debug(pprint.pformat(distribution)) - - if len(distribution) - 1 < threshold_bucket: - # maximum measured metric is below the cut after bucket number - lower_events_count = valid_events_count - upper_events_count = 0 - else: - lower_events_count = distribution[threshold_bucket]["count_sum"] - upper_events_count = valid_events_count - lower_events_count - - if good_below_threshold: - good_event_count = lower_events_count - bad_event_count = upper_events_count - else: - good_event_count = upper_events_count - bad_event_count = lower_events_count - - return good_event_count, bad_event_count - - def exponential_distribution_cut(self, *args, **kwargs): - """Alias for `distribution_cut` method to allow for backwards - compatibility. - """ - warnings.warn( - "exponential_distribution_cut will be deprecated in version 2.0, " - "please use distribution_cut instead", - FutureWarning, - ) - return self.distribution_cut(*args, **kwargs) - # pylint: disable=redefined-builtin,too-many-arguments def query( self, - timestamp, - window, - filter, - aligner="ALIGN_SUM", - reducer="REDUCE_SUM", - group_by=None, - ): - """Query timeseries from Cloud Monitoring. + timestamp: float, + window: int, + filter_or_query: str, + aligner: str = "ALIGN_SUM", + reducer: str = "REDUCE_SUM", + group_by: Optional[List[str]] = None, + ) -> List[TimeSeries]: + """Query timeseries from Cloud Monitoring using MQF. Args: - timestamp (int): Current timestamp. + timestamp (float): Current timestamp. window (int): Window size (in seconds). - filter (str): Query filter. + filter_or_query (str): Query filter. aligner (str, optional): Aligner to use. reducer (str, optional): Reducer to use. group_by (list, optional): List of fields to group by. @@ -197,53 +73,41 @@ def query( """ if group_by is None: group_by = [] - measurement_window = CM.get_window(timestamp, window) - aggregation = CM.get_aggregation( - window, aligner=aligner, reducer=reducer, group_by=group_by + measurement_window = self.get_window(timestamp, window) + aggregation = self.get_aggregation(window, aligner, reducer, group_by) + request = ListTimeSeriesRequest( + { + "name": self.parent, + "filter": filter_or_query, + "interval": measurement_window, + "view": ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation, + } ) - request = monitoring_v3.ListTimeSeriesRequest() - request.name = self.parent - request.filter = filter - request.interval = measurement_window - request.view = monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL - request.aggregation = aggregation - timeseries = self.client.list_time_series(request) + timeseries_pager: ListTimeSeriesPager = self.client.list_time_series(request) + timeseries: List[TimeSeries] = list(timeseries_pager) LOGGER.debug(pprint.pformat(timeseries)) return timeseries @staticmethod - def count(timeseries): - """Count events in time series assuming it was aligned with ALIGN_SUM - and reduced with REDUCE_SUM (default). - - Args: - :obj:`monitoring_v3.TimeSeries`: Timeseries object. - - Returns: - int: Event count. - """ - try: - return timeseries[0].points[0].value.int64_value - except (IndexError, AttributeError) as exception: - LOGGER.debug(exception, exc_info=True) - return NO_DATA # no events in timeseries - - @staticmethod - def get_window(timestamp, window): + def get_window( + timestamp: float, + window: int, + ) -> TimeInterval: """Helper for measurement window. Args: - timestamp (int): Current timestamp. + timestamp (float): Current timestamp. window (int): Window size (in seconds). Returns: - :obj:`monitoring_v3.types.TimeInterval`: Measurement window object. + :obj:`monitoring_v3.TimeInterval`: Measurement window object. """ end_time_seconds = int(timestamp) end_time_nanos = int((timestamp - end_time_seconds) * 10**9) start_time_seconds = int(timestamp - window) start_time_nanos = end_time_nanos - measurement_window = monitoring_v3.TimeInterval( + measurement_window = TimeInterval( { "end_time": { "seconds": end_time_seconds, @@ -260,11 +124,11 @@ def get_window(timestamp, window): @staticmethod def get_aggregation( - window, - aligner="ALIGN_SUM", - reducer="REDUCE_SUM", - group_by=None, - ): + window: int, + aligner: str = "ALIGN_SUM", + reducer: str = "REDUCE_SUM", + group_by: Optional[List[str]] = None, + ) -> Aggregation: """Helper for aggregation object. Default aggregation is `ALIGN_SUM`. @@ -274,27 +138,46 @@ def get_aggregation( window (int): Window size (in seconds). aligner (str): Aligner type. reducer (str): Reducer type. - group_by (list): List of fields to group by. + group_by (list, optional): List of fields to group by. Returns: - :obj:`monitoring_v3.types.Aggregation`: Aggregation object. + :obj:`monitoring_v3.Aggregation`: Aggregation object. """ if group_by is None: group_by = [] - aggregation = monitoring_v3.Aggregation( + aggregation = Aggregation( { "alignment_period": {"seconds": window}, - "per_series_aligner": getattr( - monitoring_v3.Aggregation.Aligner, aligner - ), - "cross_series_reducer": getattr( - monitoring_v3.Aggregation.Reducer, reducer - ), + "per_series_aligner": getattr(Aggregation.Aligner, aligner), + "cross_series_reducer": getattr(Aggregation.Reducer, reducer), "group_by_fields": group_by, } ) LOGGER.debug(pprint.pformat(aggregation)) return aggregation + @staticmethod + def get_distribution_value_from_timeseries( + timeseries: List[TimeSeries], + ) -> Distribution: + """Extract a distribution from a list of timeseries. + + Args: + timeseries (list): List of timeseries. + + Returns: + :obj:`google.api.distribution_pb2.Distribution`: Distribution. + """ + return timeseries[0].points[0].value.distribution_value -CM = CloudMonitoringBackend + @staticmethod + def get_nb_events_from_timeseries(timeseries: List[TimeSeries]) -> int: + """Count the events from a list of timeseries. + + Args: + timeseries (list): List of timeseries. + + Returns: + int: Number of events. + """ + return timeseries[0].points[0].value.int64_value diff --git a/slo_generator/backends/cloud_monitoring_abc.py b/slo_generator/backends/cloud_monitoring_abc.py new file mode 100644 index 00000000..7e0de384 --- /dev/null +++ b/slo_generator/backends/cloud_monitoring_abc.py @@ -0,0 +1,213 @@ +# Copyright 2022 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +`cloud_monitoring_base.py` +Abstract Base Class (ABC) for Cloud Monitoring backend implementations. +""" +import logging +import pprint +import warnings +from abc import ABC, abstractmethod +from collections import OrderedDict +from typing import List, Optional, Tuple + +from google.api.distribution_pb2 import Distribution +from google.cloud.monitoring_v3.types.metric import TimeSeries + +from slo_generator.constants import NO_DATA + +LOGGER = logging.getLogger(__name__) + + +class CloudMonitoringBackendABC(ABC): + """Abstract Base Class (ABC) for Cloud Monitoring backend implementations. + + Args: + project_id (str): Cloud Monitoring host project ID. + client (optional): Existing Cloud Monitoring client. Initialize a new client if + omitted. + """ + + def good_bad_ratio( + self, + timestamp: float, + window: int, + slo_config: dict, + ) -> Tuple[int, int]: + """Query two timeseries, one containing 'good' events, one containing 'bad' + events. + + Args: + timestamp (float): UNIX timestamp. + window (int): Window size (in seconds). + slo_config (dict): SLO configuration. + + Returns: + tuple: A tuple (good_event_count, bad_event_count) + """ + measurement: dict = slo_config["spec"]["service_level_indicator"] + filter_good: str = measurement["filter_good"] + filter_bad: Optional[str] = measurement.get("filter_bad") + filter_valid: Optional[str] = measurement.get("filter_valid") + + # Query 'good events' timeseries + good_ts: List[TimeSeries] = self.query(timestamp, window, filter_good) + good_event_count: int = self.count(good_ts) + + # Query 'bad events' timeseries + bad_event_count: int + if filter_bad: + bad_ts: List[TimeSeries] = self.query(timestamp, window, filter_bad) + bad_event_count = self.count(bad_ts) + elif filter_valid: + valid_ts: List[TimeSeries] = self.query(timestamp, window, filter_valid) + bad_event_count = self.count(valid_ts) - good_event_count + else: + raise Exception("One of `filter_bad` or `filter_valid` is required.") + + LOGGER.debug( + f"Good events: {good_event_count} | " f"Bad events: {bad_event_count}" + ) + + return good_event_count, bad_event_count + + # pylint: disable=too-many-locals + def distribution_cut( + self, + timestamp: float, + window: int, + slo_config: dict, + ) -> Tuple[int, int]: + """Query one timeseries of type 'exponential'. + + Args: + timestamp (float): UNIX timestamp. + window (int): Window size (in seconds). + slo_config (dict): SLO configuration. + + Returns: + tuple: A tuple (good_event_count, bad_event_count). + """ + measurement: dict = slo_config["spec"]["service_level_indicator"] + filter_valid: str = measurement["filter_valid"] + threshold_bucket: int = int(measurement["threshold_bucket"]) + good_below_threshold: bool = measurement.get("good_below_threshold", True) + + # Query 'valid' events. + series = self.query(timestamp, window, filter_valid) + + if not series: + return NO_DATA, NO_DATA # No timeseries. + + distribution_value = self.get_distribution_value_from_timeseries(series) + bucket_counts: list[int] = distribution_value.bucket_counts + valid_events_count: int = distribution_value.count + + # Explicit the exponential distribution result. + count_sum: int = 0 + distribution = OrderedDict() + for i, bucket_count in enumerate(bucket_counts): + count_sum += bucket_count + distribution[i] = {"count_sum": count_sum} + LOGGER.debug(pprint.pformat(distribution)) + + lower_events_count: int + upper_events_count: int + if len(distribution) - 1 < threshold_bucket: + # Maximum measured metric is below the cut after bucket number. + lower_events_count = valid_events_count + upper_events_count = 0 + else: + lower_events_count = distribution[threshold_bucket]["count_sum"] + upper_events_count = valid_events_count - lower_events_count + + good_event_count: int + bad_event_count: int + if good_below_threshold: + good_event_count = lower_events_count + bad_event_count = upper_events_count + else: + good_event_count = upper_events_count + bad_event_count = lower_events_count + + return good_event_count, bad_event_count + + def exponential_distribution_cut(self, *args, **kwargs) -> Tuple[int, int]: + """Alias for `distribution_cut` method to allow for backwards compatibility.""" + warnings.warn( + "exponential_distribution_cut will be deprecated in version 2.0, " + "please use distribution_cut instead", + FutureWarning, + ) + return self.distribution_cut(*args, **kwargs) + + def count(self, timeseries: List[TimeSeries]) -> int: + """Count events in time series. + + Args: + timeseries (list): List of Timeseries objects. + + Returns: + int: Event count. + """ + try: + return self.get_nb_events_from_timeseries(timeseries) + except (IndexError, AttributeError) as exception: + LOGGER.debug(exception, exc_info=True) + return NO_DATA # No events in timeseries. + + @abstractmethod + def query( + self, + timestamp: float, + window: int, + filter_or_query: str, + ) -> List[TimeSeries]: + """Query timeseries from Cloud Monitoring. + + Args: + timestamp (float): Current timestamp. + window (int): Window size (in seconds). + filter_or_query (str): Could Monitoring filter or query. + + Returns: + list: List of timeseries objects. + """ + + @staticmethod + @abstractmethod + def get_distribution_value_from_timeseries( + timeseries: List[TimeSeries], + ) -> Distribution: + """Extract a distribution from a list of timeseries. + + Args: + timeseries (list): List of timeseries. + + Returns: + :obj:`google.api.distribution_pb2.Distribution`: Distribution. + """ + + @staticmethod + @abstractmethod + def get_nb_events_from_timeseries(timeseries: List[TimeSeries]) -> int: + """Count the events from a list of timeseries. + + Args: + timeseries (list): List of timeseries. + + Returns: + int: Number of events. + """ + return NO_DATA diff --git a/slo_generator/backends/cloud_monitoring_mql.py b/slo_generator/backends/cloud_monitoring_mql.py index 56ea49ba..bb4b494e 100644 --- a/slo_generator/backends/cloud_monitoring_mql.py +++ b/slo_generator/backends/cloud_monitoring_mql.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. +# Copyright 2022 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,174 +17,47 @@ """ import logging import pprint -import re -import typing -import warnings -from collections import OrderedDict -from typing import List, Tuple +from datetime import datetime +from typing import List from google.api.distribution_pb2 import Distribution +from google.cloud.monitoring_v3 import QueryTimeSeriesRequest from google.cloud.monitoring_v3.services.query_service import QueryServiceClient from google.cloud.monitoring_v3.services.query_service.pagers import ( QueryTimeSeriesPager, ) -from google.cloud.monitoring_v3.types import metric_service -from google.cloud.monitoring_v3.types.metric import TimeSeriesData +from google.cloud.monitoring_v3.types.metric import TimeSeries -from slo_generator.constants import NO_DATA +from .cloud_monitoring_abc import CloudMonitoringBackendABC LOGGER = logging.getLogger(__name__) -class CloudMonitoringMqlBackend: +class CloudMonitoringMqlBackend(CloudMonitoringBackendABC): """Backend for querying metrics from Cloud Monitoring with MQL. Args: project_id (str): Cloud Monitoring host project id. - client (google.cloud.monitoring_v3.QueryServiceClient, optional): - Existing Cloud Monitoring Query client. Initialize a new client - if omitted. + client (monitoring_v3.services.query_service.QueryServiceClient, optional): + Existing Cloud Monitoring Query client. Initialize a new client if omitted. """ - def __init__(self, project_id: str, client: QueryServiceClient = None): + def __init__(self, project_id: str, client=None): self.client = client if client is None: self.client = QueryServiceClient() - self.parent = self.client.common_project_path( # type: ignore[union-attr] - project_id - ) - - def good_bad_ratio( - self, - timestamp: int, # pylint: disable=unused-argument - window: int, - slo_config: dict, - ) -> Tuple[int, int]: - """Query two timeseries, one containing 'good' events, one containing - 'bad' events. - - Args: - timestamp (int): UNIX timestamp. - window (int): Window size (in seconds). - slo_config (dict): SLO configuration. - - Returns: - tuple: A tuple (good_event_count, bad_event_count) - """ - measurement: dict = slo_config["spec"]["service_level_indicator"] - filter_good: str = measurement["filter_good"] - filter_bad: typing.Optional[str] = measurement.get("filter_bad") - filter_valid: typing.Optional[str] = measurement.get("filter_valid") - - # Query 'good events' timeseries - good_ts: List[TimeSeriesData] = self.query(query=filter_good, window=window) - good_event_count: int = CM.count(good_ts) - - # Query 'bad events' timeseries - bad_event_count: int - if filter_bad: - bad_ts: List[TimeSeriesData] = self.query(query=filter_bad, window=window) - bad_event_count = CM.count(bad_ts) - elif filter_valid: - valid_ts: List[TimeSeriesData] = self.query( - query=filter_valid, window=window - ) - bad_event_count = CM.count(valid_ts) - good_event_count - else: - raise Exception("One of `filter_bad` or `filter_valid` is required.") - - LOGGER.debug( - f"Good events: {good_event_count} | " f"Bad events: {bad_event_count}" - ) - - return good_event_count, bad_event_count - - # pylint: disable=too-many-locals,disable=unused-argument - def distribution_cut( - self, - timestamp: int, - window: int, - slo_config: dict, - ) -> Tuple[int, int]: - """Query one timeseries of type 'exponential'. - - Args: - timestamp (int): UNIX timestamp. - window (int): Window size (in seconds). - slo_config (dict): SLO configuration. - - Returns: - tuple: A tuple (good_event_count, bad_event_count). - """ - measurement: dict = slo_config["spec"]["service_level_indicator"] - filter_valid: str = measurement["filter_valid"] - threshold_bucket: int = int(measurement["threshold_bucket"]) - good_below_threshold: typing.Optional[bool] = measurement.get( - "good_below_threshold", True - ) - - # Query 'valid' events - series = self.query(query=filter_valid, window=window) - - if not series: - return NO_DATA, NO_DATA # no timeseries - - distribution_value: Distribution = ( - series[0].point_data[0].values[0].distribution_value - ) - bucket_counts: list = distribution_value.bucket_counts - valid_events_count: int = distribution_value.count - - # Explicit the exponential distribution result - count_sum: int = 0 - distribution = OrderedDict() - for i, bucket_count in enumerate(bucket_counts): - count_sum += bucket_count - distribution[i] = {"count_sum": count_sum} - LOGGER.debug(pprint.pformat(distribution)) - - lower_events_count: int - upper_events_count: int - if len(distribution) - 1 < threshold_bucket: - # maximum measured metric is below the cut after bucket number - lower_events_count = valid_events_count - upper_events_count = 0 - else: - lower_events_count = distribution[threshold_bucket]["count_sum"] - upper_events_count = valid_events_count - lower_events_count - - good_event_count: int - bad_event_count: int - if good_below_threshold: - good_event_count = lower_events_count - bad_event_count = upper_events_count - else: - good_event_count = upper_events_count - bad_event_count = lower_events_count - - return good_event_count, bad_event_count - - def exponential_distribution_cut(self, *args, **kwargs) -> Tuple[int, int]: - """Alias for `distribution_cut` method to allow for backwards - compatibility. - """ - warnings.warn( - "exponential_distribution_cut will be deprecated in version 2.0, " - "please use distribution_cut instead", - FutureWarning, - ) - return self.distribution_cut(*args, **kwargs) + self.parent = self.client.common_project_path(project_id) def query_sli( self, - timestamp: int, # pylint: disable=unused-argument + timestamp: float, window: int, slo_config: dict, ) -> float: """Query SLI value from a given MQL query. Args: - timestamp (int): UNIX timestamp. + timestamp (float): UNIX timestamp. window (int): Window (in seconds). slo_config (dict): SLO configuration. @@ -193,90 +66,95 @@ def query_sli( """ measurement: dict = slo_config["spec"]["service_level_indicator"] query: str = measurement["query"] - series: List[TimeSeriesData] = self.query(query=query, window=window) + series: List[TimeSeries] = self.query(timestamp, window, query) sli_value: float = series[0].point_data[0].values[0].double_value LOGGER.debug(f"SLI value: {sli_value}") return sli_value - def query(self, query: str, window: int) -> List[TimeSeriesData]: + def query( + self, + timestamp: float, + window: int, + filter_or_query: str, + ) -> List[TimeSeries]: """Query timeseries from Cloud Monitoring using MQL. Args: - query (str): MQL query. + timestamp (float): Current timestamp. window (int): Window size (in seconds). + query (str): MQL query. Returns: list: List of timeseries objects. """ - # Enrich query to aggregate and reduce the time series over the - # desired window. - formatted_query: str = self._fmt_query(query, window) - request = metric_service.QueryTimeSeriesRequest( - {"name": self.parent, "query": formatted_query} + # Enrich query to aggregate and reduce time series over target window. + query_with_time_horizon_and_period: str = ( + self.enrich_query_with_time_horizon_and_period( + timestamp, window, filter_or_query + ) ) - # fmt: off - timeseries_pager: QueryTimeSeriesPager = ( - self.client.query_time_series(request) # type: ignore[union-attr] + request = QueryTimeSeriesRequest( + { + "name": self.parent, + "query": query_with_time_horizon_and_period, + } ) - # fmt: on - timeseries: list = list(timeseries_pager) # convert pager to flat list + timeseries_pager: QueryTimeSeriesPager = self.client.query_time_series(request) + timeseries: List[TimeSeries] = list(timeseries_pager) LOGGER.debug(pprint.pformat(timeseries)) return timeseries @staticmethod - def count(timeseries: List[TimeSeriesData]) -> int: - """Count events in time series assuming it was aligned with ALIGN_SUM - and reduced with REDUCE_SUM (default). + def enrich_query_with_time_horizon_and_period( + timestamp: float, + window: int, + query: str, + ) -> str: + """Enrich MQL query with time period and horizon. Args: - :obj:`monitoring_v3.TimeSeries`: Timeseries object. + timestamp (float): UNIX timestamp. + window (int): Query window (in seconds). + query (str): Base query in YAML config. Returns: - int: Event count. + str: Enriched query. """ - try: - return timeseries[0].point_data[0].values[0].int64_value - except (IndexError, AttributeError) as exception: - LOGGER.debug(exception, exc_info=True) - return NO_DATA # no events in timeseries + # Python uses floating point numbers to represent time in seconds since the + # epoch, in UTC, with decimal part representing nanoseconds. + # MQL expects dates formatted like "%Y/%m/%d %H:%M:%S" or "%Y/%m/%d-%H:%M:%S". + # Reference: https://cloud.google.com/monitoring/mql/reference#lexical-elements + end_time_str: str = datetime.fromtimestamp(timestamp).strftime( + "%Y/%m/%d %H:%M:%S" + ) + query_with_time_horizon_and_period: str = ( + query + + f"| group_by [] | within {window}s, d'{end_time_str}' | every {window}s" + ) + return query_with_time_horizon_and_period @staticmethod - def _fmt_query(query: str, window: int) -> str: - """Format MQL query: + def get_distribution_value_from_timeseries( + timeseries: List[TimeSeries], + ) -> Distribution: + """Extract a distribution from a list of timeseries. - * If the MQL expression has a `window` placeholder, replace it by the - current window. Otherwise, append it to the expression. + Args: + timeseries (list): List of timeseries. - * If the MQL expression has a `every` placeholder, replace it by the - current window. Otherwise, append it to the expression. + Returns: + :obj:`google.api.distribution_pb2.Distribution`: Distribution. + """ + return timeseries[0].point_data[0].values[0].distribution_value - * If the MQL expression has a `group_by` placeholder, replace it. - Otherwise, append it to the expression. + @staticmethod + def get_nb_events_from_timeseries(timeseries: List[TimeSeries]) -> int: + """Count the events from a list of timeseries. Args: - query (str): Original query in YAMLconfig. - window (int): Query window (in seconds). + timeseries (list): List of timeseries. Returns: - str: Formatted query. + int: Number of events. """ - formatted_query: str = query.strip() - if "group_by" in formatted_query: - formatted_query = re.sub( - r"\|\s+group_by\s+\[.*\]\s*", "| group_by [] ", formatted_query - ) - else: - formatted_query += "| group_by [] " - for mql_time_interval_keyword in ["within", "every"]: - if mql_time_interval_keyword in formatted_query: - formatted_query = re.sub( - rf"\|\s+{mql_time_interval_keyword}\s+\w+\s*", - f"| {mql_time_interval_keyword} {window}s ", - formatted_query, - ) - else: - formatted_query += f"| {mql_time_interval_keyword} {window}s " - return formatted_query.strip() - - -CM = CloudMonitoringMqlBackend + return timeseries[0].point_data[0].values[0].int64_value diff --git a/tests/unit/backends/test_cloud_monitoring_mql.py b/tests/unit/backends/test_cloud_monitoring_mql.py index 9e5e9d2e..addb6ef3 100644 --- a/tests/unit/backends/test_cloud_monitoring_mql.py +++ b/tests/unit/backends/test_cloud_monitoring_mql.py @@ -20,42 +20,28 @@ class TestCloudMonitoringMqlBackend(unittest.TestCase): - def test_fmt_query(self): - queries = [ - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code] | within 1h """, - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code, response_code_class] - | within 1h - | every 1h """, - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code,response_code_class] - | within 1h - | every 1h """, - ] + def test_enrich_query_with_time_horizon_and_period(self): + timestamp: float = 1666995015.5144777 # = 2022/10/28 22:10:15.5144777 + window: int = 3600 # in seconds + query: str = """fetch gae_app +| metric 'appengine.googleapis.com/http/server/response_count' +| filter resource.project_id == 'slo-generator-demo' +| filter + metric.response_code == 429 + || metric.response_code == 200 +""" - formatted_query = """fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [] | within 3600s | every 3600s""" + enriched_query = """fetch gae_app +| metric 'appengine.googleapis.com/http/server/response_count' +| filter resource.project_id == 'slo-generator-demo' +| filter + metric.response_code == 429 + || metric.response_code == 200 +| group_by [] | within 3600s, d'2022/10/28 22:10:15' | every 3600s""" - for query in queries: - assert CloudMonitoringMqlBackend._fmt_query(query, 3600) == formatted_query + assert ( + CloudMonitoringMqlBackend.enrich_query_with_time_horizon_and_period( + timestamp, window, query + ) + == enriched_query + ) From 21490c5c6ad6a1784a4a17293f5cf7205e953097 Mon Sep 17 00:00:00 2001 From: Laurent VAYLET Date: Thu, 3 Nov 2022 13:38:16 +0000 Subject: [PATCH 2/2] ensure type hint is compatible with Python 3.7 and 3.8 --- slo_generator/backends/cloud_monitoring_abc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo_generator/backends/cloud_monitoring_abc.py b/slo_generator/backends/cloud_monitoring_abc.py index 7e0de384..32ac83c9 100644 --- a/slo_generator/backends/cloud_monitoring_abc.py +++ b/slo_generator/backends/cloud_monitoring_abc.py @@ -111,7 +111,7 @@ def distribution_cut( return NO_DATA, NO_DATA # No timeseries. distribution_value = self.get_distribution_value_from_timeseries(series) - bucket_counts: list[int] = distribution_value.bucket_counts + bucket_counts: List[int] = distribution_value.bucket_counts valid_events_count: int = distribution_value.count # Explicit the exponential distribution result.