Skip to content

Commit

Permalink
feat: Add post processing to QueryObject (#9427)
Browse files Browse the repository at this point in the history
* Add post processing to QueryObject

* Simplify sort signature and require explicit sort order

* Add new operations and unit tests

* linting

* Address comments

* Simplify test method names

* Address comments

* Linting

* remove unnecessary logic

* Apply strict whitelisting to all getattr calls

* Add checking of rolling_type_options and add/improve docs
  • Loading branch information
villebro authored Apr 10, 2020
1 parent 5ec0192 commit a8ce3bc
Show file tree
Hide file tree
Showing 9 changed files with 899 additions and 12 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ marshmallow==2.19.5 # via flask-appbuilder, marshmallow-enum, marshmallow-
more-itertools==8.1.0 # via zipp
msgpack==0.6.2 # via apache-superset (setup.py)
numpy==1.18.1 # via pandas, pyarrow
pandas==0.25.3 # via apache-superset (setup.py)
pandas==1.0.3 # via apache-superset (setup.py)
parsedatetime==2.5 # via apache-superset (setup.py)
pathlib2==2.3.5 # via apache-superset (setup.py)
polyline==1.4.0 # via apache-superset (setup.py)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_git_sha():
"isodate",
"markdown>=3.0",
"msgpack>=0.6.1, <0.7.0",
"pandas>=0.25.3, <1.0",
"pandas>=1.0.3, <1.1",
"parsedatetime",
"pathlib2",
"polyline",
Expand Down
9 changes: 6 additions & 3 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class QueryContext:
custom_cache_timeout: Optional[int]

# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
# a vanilla python type https://github.com/python/mypy/issues/5288
# a vanilla python type https://github.com/python/mypy/issues/5288
def __init__(
self,
datasource: Dict[str, Any],
Expand All @@ -70,8 +70,8 @@ def get_query_result(self, query_object: QueryObject) -> Dict[str, Any]:
"""Returns a pandas dataframe based on the query object"""

# Here, we assume that all the queries will use the same datasource, which is
# is a valid assumption for current setting. In a long term, we may or maynot
# support multiple queries from different data source.
# a valid assumption for current setting. In the long term, we may
# support multiple queries from different data sources.

timestamp_format = None
if self.datasource.type == "table":
Expand Down Expand Up @@ -105,6 +105,9 @@ def get_query_result(self, query_object: QueryObject) -> Dict[str, Any]:
self.df_metrics_to_num(df, query_object)

df.replace([np.inf, -np.inf], np.nan)

df = query_object.exec_post_processing(df)

return {
"query": result.query,
"status": result.status,
Expand Down
40 changes: 37 additions & 3 deletions superset/common/query_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
from typing import Any, Dict, List, Optional, Union

import simplejson as json
from flask_babel import gettext as _
from pandas import DataFrame

from superset import app
from superset.utils import core as utils
from superset.exceptions import QueryObjectValidationError
from superset.utils import core as utils, pandas_postprocessing
from superset.views.utils import get_time_range_endpoints

# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
# https://github.com/python/mypy/issues/5288
# https://github.com/python/mypy/issues/5288


class QueryObject:
Expand All @@ -50,6 +53,7 @@ class QueryObject:
extras: Dict
columns: List[str]
orderby: List[List]
post_processing: List[Dict[str, Any]]

def __init__(
self,
Expand All @@ -67,6 +71,7 @@ def __init__(
extras: Optional[Dict] = None,
columns: Optional[List[str]] = None,
orderby: Optional[List[List]] = None,
post_processing: Optional[List[Dict[str, Any]]] = None,
relative_start: str = app.config["DEFAULT_RELATIVE_START_TIME"],
relative_end: str = app.config["DEFAULT_RELATIVE_END_TIME"],
):
Expand All @@ -81,8 +86,9 @@ def __init__(
self.time_range = time_range
self.time_shift = utils.parse_human_timedelta(time_shift)
self.groupby = groupby or []
self.post_processing = post_processing or []

# Temporal solution for backward compatability issue due the new format of
# Temporary solution for backward compatibility issue due the new format of
# non-ad-hoc metric which needs to adhere to superset-ui per
# https://git.io/Jvm7P.
self.metrics = [
Expand Down Expand Up @@ -138,9 +144,37 @@ def cache_key(self, **extra: Any) -> str:
if self.time_range:
cache_dict["time_range"] = self.time_range
json_data = self.json_dumps(cache_dict, sort_keys=True)
if self.post_processing:
cache_dict["post_processing"] = self.post_processing
return hashlib.md5(json_data.encode("utf-8")).hexdigest()

def json_dumps(self, obj: Any, sort_keys: bool = False) -> str:
return json.dumps(
obj, default=utils.json_int_dttm_ser, ignore_nan=True, sort_keys=sort_keys
)

def exec_post_processing(self, df: DataFrame) -> DataFrame:
"""
Perform post processing operations on DataFrame.
:param df: DataFrame returned from database model.
:return: new DataFrame to which all post processing operations have been
applied
:raises ChartDataValidationError: If the post processing operation in incorrect
"""
for post_process in self.post_processing:
operation = post_process.get("operation")
if not operation:
raise QueryObjectValidationError(
_("`operation` property of post processing object undefined")
)
if not hasattr(pandas_postprocessing, operation):
raise QueryObjectValidationError(
_(
"Unsupported post processing operation: %(operation)s",
type=operation,
)
)
options = post_process.get("options", {})
df = getattr(pandas_postprocessing, operation)(df, **options)
return df
4 changes: 4 additions & 0 deletions superset/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ class CertificateException(SupersetException):

class DatabaseNotFound(SupersetException):
status = 400


class QueryObjectValidationError(SupersetException):
status = 400
Loading

0 comments on commit a8ce3bc

Please sign in to comment.