From 98c5766edb10087f3b50224c3a5be551040bb829 Mon Sep 17 00:00:00 2001 From: Kushal Batra <34571348+s0nicboOm@users.noreply.github.com> Date: Wed, 1 May 2024 17:46:25 -0700 Subject: [PATCH] Static filter (#373) Static_filters support --------- Signed-off-by: s0nicboOm --- numalogic/connectors/_config.py | 7 ++++++ numalogic/connectors/druid/_druid.py | 35 ++++++++++++++++++++++++---- numalogic/udfs/trainer/_druid.py | 1 + pyproject.toml | 2 +- 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/numalogic/connectors/_config.py b/numalogic/connectors/_config.py index a788c66a..a598e2a4 100644 --- a/numalogic/connectors/_config.py +++ b/numalogic/connectors/_config.py @@ -36,9 +36,16 @@ class Pivot: value: list[str] = field(default_factory=lambda: ["count"]) +@dataclass +class FilterConf: + inclusion_filters: Optional[list[dict]] = None + exclusion_filters: Optional[list[dict]] = None + + @dataclass class DruidFetcherConf: datasource: str + static_filters: Optional[FilterConf] = None dimensions: list[str] = field(default_factory=list) aggregations: dict = field(default_factory=dict) group_by: list[str] = field(default_factory=list) diff --git a/numalogic/connectors/druid/_druid.py b/numalogic/connectors/druid/_druid.py index c62d4fec..99b9abe3 100644 --- a/numalogic/connectors/druid/_druid.py +++ b/numalogic/connectors/druid/_druid.py @@ -10,7 +10,7 @@ from pydruid.utils.filters import Filter from numalogic.connectors._base import DataFetcher -from numalogic.connectors._config import Pivot +from numalogic.connectors._config import Pivot, FilterConf from typing import Optional, Final from numalogic.tools.exceptions import DruidFetcherError @@ -34,6 +34,22 @@ def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[ return dict(zip(filter_keys, filter_values)) +def _combine_in_filters(filters_list) -> Filter: + return Filter(type="and", fields=[Filter(**item) for item in filters_list]) + + +def _combine_ex_filters(filters_list) -> Filter: + filters = _combine_in_filters(filters_list) + return Filter(type="not", field=filters) + + +def _make_static_filters(filters: FilterConf) -> Filter: + in_filters, ex_filters = _combine_in_filters(filters.inclusion_filters), _combine_ex_filters( + filters.exclusion_filters + ) + return Filter(type="and", fields=[in_filters, ex_filters]) + + def build_params( datasource: str, dimensions: list[str], @@ -41,6 +57,7 @@ def build_params( granularity: str, hours: float, delay: float, + static_filters: Optional[FilterConf] = None, aggregations: Optional[list[str]] = None, post_aggregations: Optional[list[str]] = None, reference_dt: Optional[datetime] = None, @@ -52,6 +69,7 @@ def build_params( dimensions: The dimensions to group by filter_pairs: Indicates which rows of data to include in the query + static_filters: Static filters passed from config granularity: Time bucket to aggregate data by hour, day, minute, etc., hours: Hours from now to skip training. delay: Added delay to the fetch query from current time. @@ -69,6 +87,11 @@ def build_params( type="and", fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()], ) + if static_filters: + _LOGGER.debug("Static Filters are present!") + _static_filters = _make_static_filters(static_filters) + _filter = Filter(type="and", fields=[_static_filters, _filter]) + reference_dt = reference_dt or datetime.now(pytz.utc) end_dt = reference_dt - timedelta(hours=delay) _LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay) @@ -118,6 +141,7 @@ def fetch( dimensions: list[str], delay: float = 3.0, granularity: str = "minute", + static_filters: Optional[FilterConf] = None, aggregations: Optional[dict] = None, post_aggregations: Optional[dict] = None, group_by: Optional[list[str]] = None, @@ -135,6 +159,7 @@ def fetch( dimensions: The dimensions to group by delay: Added delay to the fetch query from current time. granularity: Time bucket to aggregate data by hour, day, minute, etc. + static_filters: user defined filters aggregations: A map from aggregator name to one of the ``pydruid.utils.aggregators`` e.g., ``doublesum`` post_aggregations: postaggregations map @@ -152,6 +177,7 @@ def fetch( datasource=datasource, dimensions=dimensions, filter_pairs=filter_pairs, + static_filters=static_filters, granularity=granularity, hours=hours, delay=delay, @@ -193,6 +219,7 @@ def chunked_fetch( dimensions: list[str], delay: float = 3.0, granularity: str = "minute", + static_filter: Optional[FilterConf] = None, aggregations: Optional[dict] = None, post_aggregations: Optional[dict] = None, group_by: Optional[list[str]] = None, @@ -213,6 +240,7 @@ def chunked_fetch( granularity: Time bucket to aggregate data by hour, day, minute, etc. aggregations: A map from aggregator name to one of the ``pydruid.utils.aggregators`` e.g., ``doublesum`` + static_filter: user defined filters post_aggregations: postaggregations map group_by: List of columns to group by pivot: Pivot configuration @@ -245,6 +273,7 @@ def chunked_fetch( datasource=datasource, dimensions=dimensions, filter_pairs=filter_pairs, + static_filter=static_filter, granularity=granularity, hours=min(chunked_hours, hours - hours_elapsed), delay=delay, @@ -259,9 +288,7 @@ def chunked_fetch( _LOGGER.debug("Fetching data concurrently with %s threads", max_threads) with ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [executor.submit(self._fetch, **params) for params in qparams] - for future in futures: - chunked_dfs.append(future.result()) - + chunked_dfs.extend(future.result() for future in futures) df = pd.concat(chunked_dfs, axis=0, ignore_index=True) df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6 diff --git a/numalogic/udfs/trainer/_druid.py b/numalogic/udfs/trainer/_druid.py index f21f9d77..7a348bae 100644 --- a/numalogic/udfs/trainer/_druid.py +++ b/numalogic/udfs/trainer/_druid.py @@ -120,6 +120,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: datasource=_fetcher_conf.datasource, filter_keys=_stream_conf.composite_keys, filter_values=payload.composite_keys, + static_filters=_fetcher_conf.static_filters, dimensions=list(_fetcher_conf.dimensions), delay=self.dataconn_conf.delay_hrs, granularity=_fetcher_conf.granularity, diff --git a/pyproject.toml b/pyproject.toml index 0949c8f0..82af7bec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "numalogic" -version = "0.9.1a7" +version = "0.9.1a8" description = "Collection of operational Machine Learning models and tools." authors = ["Numalogic Developers"] packages = [{ include = "numalogic" }]