Skip to content

Commit

Permalink
Add extraction function support for Druid queries (apache#4740)
Browse files Browse the repository at this point in the history
* add extraction fn support for Druid queries

* bump pydruid version to get extraction fn commits

* update and add tests for druid for filters with extraction fns

* conform to flake8 rules

* fix flake8 issues

* bump pyruid version for extraction function features
  • Loading branch information
jasnovak authored and michellethomas committed May 23, 2018
1 parent f0c89be commit f9136e4
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 38 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pandas==0.22.0
parsedatetime==2.0.0
pathlib2==2.3.0
polyline==1.3.2
pydruid==0.4.1
pydruid==0.4.2
pyhive==0.5.0
python-dateutil==2.6.1
python-geohash==0.8.5
Expand Down
145 changes: 127 additions & 18 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from flask_babel import lazy_gettext as _
from pydruid.client import PyDruid
from pydruid.utils.aggregators import count
from pydruid.utils.filters import Bound, Dimension, Filter
from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.having import Aggregation
from pydruid.utils.postaggregator import (
Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
Expand Down Expand Up @@ -960,8 +961,25 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = Dimension(dim) == row[dim]
fields.append(f)
f = None
# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
if isinstance(dim, dict) and 'extractionFn' in dim:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(dim)
dim_val = dim['outputName']
f = Filter(
dimension=col,
value=row[dim_val],
extraction_function=extraction_fn,
)
elif isinstance(dim, dict):
dim_val = dim['outputName']
if dim_val:
f = Dimension(dim_val) == row[dim_val]
else:
f = Dimension(dim) == row[dim]
if f:
fields.append(f)
if len(fields) > 1:
term = Filter(type='and', fields=fields)
new_filters.append(term)
Expand Down Expand Up @@ -1065,7 +1083,9 @@ def _dimensions_to_values(dimensions):
values = []
for dimension in dimensions:
if isinstance(dimension, dict):
if 'dimension' in dimension:
if 'extractionFn' in dimension:
values.append(dimension)
elif 'dimension' in dimension:
values.append(dimension['dimension'])
else:
values.append(dimension)
Expand Down Expand Up @@ -1132,7 +1152,7 @@ def run_query( # noqa / druid
intervals=self.intervals_from_dttms(from_dttm, to_dttm),
)

filters = DruidDatasource.get_filters(filter, self.num_cols)
filters = DruidDatasource.get_filters(filter, self.num_cols, columns_dict)
if filters:
qry['filter'] = filters

Expand Down Expand Up @@ -1217,7 +1237,14 @@ def run_query( # noqa / druid

pre_qry = deepcopy(qry)
pre_qry_dims = self._dimensions_to_values(qry['dimensions'])
pre_qry['dimensions'] = list(set(pre_qry_dims))

# Can't use set on an array with dicts
# Use set with non-dict items only
non_dict_dims = list(
set([x for x in pre_qry_dims if not isinstance(x, dict)]),
)
dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
pre_qry['dimensions'] = non_dict_dims + dict_dims

order_by = metrics[0] if metrics else pre_qry_dims[0]

Expand Down Expand Up @@ -1341,8 +1368,31 @@ def increment_timestamp(ts):
query=query_str,
duration=datetime.now() - qry_start_dttm)

@staticmethod
def _create_extraction_fn(dim_spec):
extraction_fn = None
if dim_spec and 'extractionFn' in dim_spec:
col = dim_spec['dimension']
fn = dim_spec['extractionFn']
ext_type = fn.get('type')
if ext_type == 'lookup' and fn['lookup'].get('type') == 'map':
replace_missing_values = fn.get('replaceMissingValueWith')
retain_missing_values = fn.get('retainMissingValue', False)
injective = fn.get('isOneToOne', False)
extraction_fn = MapLookupExtraction(
fn['lookup']['map'],
replace_missing_values=replace_missing_values,
retain_missing_values=retain_missing_values,
injective=injective,
)
elif ext_type == 'regex':
extraction_fn = RegexExtraction(fn['expr'])
else:
raise Exception(_('Unsupported extraction function: ' + ext_type))
return (col, extraction_fn)

@classmethod
def get_filters(cls, raw_filters, num_cols): # noqa
def get_filters(cls, raw_filters, num_cols, columns_dict): # noqa
"""Given Superset filter data structure, returns pydruid Filter(s)"""
filters = None
for flt in raw_filters:
Expand All @@ -1354,21 +1404,42 @@ def get_filters(cls, raw_filters, num_cols): # noqa
not op or
(eq is None and op not in ('IS NULL', 'IS NOT NULL'))):
continue

# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
column_def = columns_dict.get(col)
dim_spec = column_def.dimension_spec if column_def else None
extraction_fn = None
if dim_spec and 'extractionFn' in dim_spec:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(dim_spec)

cond = None
is_numeric_col = col in num_cols
is_list_target = op in ('in', 'not in')
eq = cls.filter_values_handler(
eq, is_list_target=is_list_target,
target_column_is_numeric=is_numeric_col)

# For these two ops, could have used Dimension,
# but it doesn't support extraction functions
if op == '==':
cond = Dimension(col) == eq
cond = Filter(dimension=col, value=eq, extraction_function=extraction_fn)
elif op == '!=':
cond = Dimension(col) != eq
cond = ~Filter(dimension=col, value=eq, extraction_function=extraction_fn)
elif op in ('in', 'not in'):
fields = []
# ignore the filter if it has no value
if not len(eq):
continue
# if it uses an extraction fn, use the "in" operator
# as Dimension isn't supported
elif extraction_fn is not None:
cond = Filter(
dimension=col,
values=eq,
type='in',
extraction_function=extraction_fn,
)
elif len(eq) == 1:
cond = Dimension(col) == eq[0]
else:
Expand All @@ -1378,20 +1449,58 @@ def get_filters(cls, raw_filters, num_cols): # noqa
if op == 'not in':
cond = ~cond
elif op == 'regex':
cond = Filter(type='regex', pattern=eq, dimension=col)
cond = Filter(
extraction_function=extraction_fn,
type='regex',
pattern=eq,
dimension=col,
)

# For the ops below, could have used pydruid's Bound,
# but it doesn't support extraction functions
elif op == '>=':
cond = Bound(col, eq, None, alphaNumeric=is_numeric_col)
cond = Filter(
type='bound',
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=eq,
upper=None,
alphaNumeric=is_numeric_col,
)
elif op == '<=':
cond = Bound(col, None, eq, alphaNumeric=is_numeric_col)
cond = Filter(
type='bound',
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=None,
upper=eq,
alphaNumeric=is_numeric_col,
)
elif op == '>':
cond = Bound(
col, eq, None,
lowerStrict=True, alphaNumeric=is_numeric_col,
cond = Filter(
type='bound',
extraction_function=extraction_fn,
lowerStrict=True,
upperStrict=False,
dimension=col,
lower=eq,
upper=None,
alphaNumeric=is_numeric_col,
)
elif op == '<':
cond = Bound(
col, None, eq,
upperStrict=True, alphaNumeric=is_numeric_col,
cond = Filter(
type='bound',
extraction_function=extraction_fn,
upperStrict=True,
lowerStrict=False,
dimension=col,
lower=None,
upper=eq,
alphaNumeric=is_numeric_col,
)
elif op == 'IS NULL':
cond = Dimension(col) == None # NOQA
Expand Down
Loading

0 comments on commit f9136e4

Please sign in to comment.