From ca32740fc732f4754d2beb7473affc261b080951 Mon Sep 17 00:00:00 2001 From: khtruong Date: Thu, 23 May 2019 13:01:09 -0700 Subject: [PATCH 1/6] feat: rough check in for Presto rows and arrays --- superset/assets/src/SqlLab/main.less | 1 + superset/db_engine_specs.py | 145 ++++++++++++++++++++++++--- superset/sql_lab.py | 8 +- 3 files changed, 137 insertions(+), 17 deletions(-) diff --git a/superset/assets/src/SqlLab/main.less b/superset/assets/src/SqlLab/main.less index 822e7d84ce4dc..c7be8fbc20c6a 100644 --- a/superset/assets/src/SqlLab/main.less +++ b/superset/assets/src/SqlLab/main.less @@ -238,6 +238,7 @@ div.Workspace { .schemaPane { flex: 0 0 300px; + max-width: 300px; transition: all .3s ease-in-out; } diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 35a591fa10202..15f500c5b7564 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -28,7 +28,7 @@ The general idea is to use static classes and an inheritance scheme. """ -from collections import namedtuple +from collections import namedtuple, OrderedDict import hashlib import inspect import logging @@ -159,6 +159,10 @@ def fetch_data(cls, cursor, limit): return cursor.fetchmany(limit) return cursor.fetchall() + @classmethod + def expand_data(cls, columns, data): + return columns, data + @classmethod def alter_new_orm_column(cls, orm_col): """Allow altering default column attributes when first detected/added @@ -827,7 +831,7 @@ def get_view_names(cls, inspector, schema): return [] @classmethod - def _create_column_info(cls, column: RowProxy, name: str, data_type: str) -> dict: + def _create_column_info(cls, name: str, data_type: str, column: RowProxy = None) -> dict: """ Create column info object :param column: column object @@ -839,7 +843,7 @@ def _create_column_info(cls, column: RowProxy, name: str, data_type: str) -> dic 'name': name, 'type': data_type, # newer Presto no longer includes this column - 'nullable': getattr(column, 'Null', True), + 'nullable': getattr(column, 'Null', True) if column else True, 'default': None, } @@ -879,13 +883,12 @@ def _split_data_type(cls, data_type: str, delimiter: str) -> List[str]: r'{}(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'.format(delimiter), data_type) @classmethod - def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None: + def _parse_structural_column(cls, full_data_type: str, result: List[dict], column: RowProxy = None) -> None: """ Parse a row or array column :param column: column :param result: list tracking the results """ - full_data_type = '{} {}'.format(column.Column, column.Type) # split on open parenthesis ( to get the structural # data type and its component types data_types = cls._split_data_type(full_data_type, r'\(') @@ -900,8 +903,9 @@ def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None: stack.pop() elif cls._has_nested_data_types(inner_type): # split on comma , to get individual data types - single_fields = cls._split_data_type(inner_type, ', ') + single_fields = cls._split_data_type(inner_type, ',') for single_field in single_fields: + single_field = single_field.strip() # If component type starts with a comma, the first single field # will be an empty string. Disregard this empty string. if not single_field: @@ -914,13 +918,14 @@ def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None: stack.append((field_info[0], field_info[1])) full_parent_path = cls._get_full_name(stack) result.append(cls._create_column_info( - column, full_parent_path, - presto_type_map[field_info[1]]())) + full_parent_path, + presto_type_map[field_info[1]](), + column)) else: # otherwise this field is a basic data type full_parent_path = cls._get_full_name(stack) column_name = '{}.{}'.format(full_parent_path, field_info[0]) result.append(cls._create_column_info( - column, column_name, presto_type_map[field_info[1]]())) + column_name, presto_type_map[field_info[1]](), column)) # If the component type ends with a structural data type, do not pop # the stack. We have run across a structural data type within the # overall structural data type. Otherwise, we have completely parsed @@ -972,7 +977,8 @@ def get_columns( try: # parse column if it is a row or array if 'array' in column.Type or 'row' in column.Type: - cls._parse_structural_column(column, result) + full_data_type = '{} {}'.format(column.Column, column.Type) + cls._parse_structural_column(full_data_type, result, column) continue else: # otherwise column is a basic data type column_type = presto_type_map[column.Type]() @@ -980,7 +986,7 @@ def get_columns( logging.info('Did not recognize type {} of column {}'.format( column.Type, column.Column)) column_type = types.NullType - result.append(cls._create_column_info(column, column.Column, column_type)) + result.append(cls._create_column_info(column.Column, column_type, column)) return result @classmethod @@ -1025,7 +1031,7 @@ def _get_fields(cls, cols: List[dict]) -> List[ColumnClause]: return column_clauses @classmethod - def _filter_presto_cols(cls, cols: List[dict]) -> List[dict]: + def _filter_presto_cols(cls, cols: List[dict]) -> Tuple[List[dict], List[dict]]: """ We want to filter out columns that correspond to array content because expanding arrays would require us to use unnest and join. This can lead to a large, @@ -1047,20 +1053,23 @@ def _filter_presto_cols(cls, cols: List[dict]) -> List[dict]: :return: filtered list of columns """ filtered_cols = [] + array_cols = [] curr_array_col_name = '' - for col in cols: + for index, col in enumerate(cols): # col corresponds to an array's content and should be skipped if curr_array_col_name and col['name'].startswith(curr_array_col_name): + array_cols.append(col) continue # col is an array so we need to check if subsequent # columns correspond to the array's contents elif str(col['type']) == 'ARRAY': curr_array_col_name = col['name'] + array_cols.append(col) filtered_cols.append(col) else: curr_array_col_name = '' filtered_cols.append(col) - return filtered_cols + return filtered_cols, array_cols @classmethod def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None, @@ -1073,7 +1082,10 @@ def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None, """ presto_cols = cols if show_cols: - presto_cols = cls._filter_presto_cols(cols) + # presto_cols = cls._filter_presto_cols(cols) + dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)' + presto_cols = [ + col for col in presto_cols if re.search(dot_regex, col['name']) is None] return super(PrestoEngineSpec, cls).select_star( my_db, table_name, engine, schema, limit, show_cols, indent, latest_partition, presto_cols, @@ -1126,6 +1138,109 @@ def fetch_result_sets(cls, db, datasource_type): row['table_schema'], row['table_name'])) return result_sets + @classmethod + def create_column_hierarchy(cls, columns, column_hierarchy, parent_columns): + if len(columns) == 0: + return + # get root column name + root_column = columns.pop(0) + # empty list + nested_columns = {'type': root_column['type'], 'children': []} + column_hierarchy[root_column['name']] = nested_columns + # while there are columns to explore + while columns: + column_head = columns[0] + if not column_head['name'].startswith('{}.'.format(root_column['name'])): + break + # if column is of parent type, recursive call + if str(column_head['type']) in parent_columns: + cls.create_column_hierarchy(columns, column_hierarchy, parent_columns) + nested_columns['children'].append(column_head['name']) + continue + # assign column to list + else: + nested_columns['children'].append(column_head['name']) + # remove column + columns.pop(0) + + @classmethod + def expand_row_data(cls, datum, column, column_hierarchy): + row_data = datum[column] + row_children = column_hierarchy[column]['children'] + if row_data and len(row_data) != len(row_children): + raise Exception("mismatched arrays") + elif row_data: + for index, data_value in enumerate(row_data): + datum[row_children[index]] = data_value + else: + for index, row_child in enumerate(row_children): + datum[row_child] = '' + + @classmethod + def expand_data(cls, columns, data): + expanded_columns = [] + for column in columns: + if column['type'].startswith('ARRAY') or column['type'].startswith('ROW'): + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, expanded_columns) + else: + expanded_columns.append(column) + + row_column_hierarchy = OrderedDict() + array_column_hierarchy = OrderedDict() + expanded_array_columns = [] + for column in columns: + if column['type'].startswith('ROW'): + parsed_row_columns = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_row_columns) + filtered_row_columns, array_columns = cls._filter_presto_cols(parsed_row_columns) + expanded_array_columns = expanded_array_columns + array_columns + cls.create_column_hierarchy(filtered_row_columns, row_column_hierarchy, ['ROW']) + cls.create_column_hierarchy(array_columns, array_column_hierarchy, ['ROW', 'ARRAY']) + elif column['type'].startswith('ARRAY'): + parsed_array_columns = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_array_columns) + expanded_array_columns = expanded_array_columns + parsed_array_columns + cls.create_column_hierarchy(parsed_array_columns, array_column_hierarchy, ['ROW', 'ARRAY']) + + ordered_row_columns = row_column_hierarchy.keys() + for data_index, datum in enumerate(data): + for row_column in ordered_row_columns: + cls.expand_row_data(datum, row_column, row_column_hierarchy) + + # This part of the code addresses arrays and is buggy + ordered_array_columns = array_column_hierarchy.keys() + expanded_array_dict = {} + for data_index, datum in enumerate(data): + expanded_array_data = [datum.copy()] + datum_copy = expanded_array_data[0] + for array_column in ordered_array_columns: + array_data = datum_copy[array_column] + array_children = array_column_hierarchy[array_column] + if str(array_column_hierarchy[array_column]['type']) == 'ROW': + new_data = expanded_array_dict.values() + for expanded_array_datum in expanded_array_data: + cls.expand_row(expanded_array_datum, array_column, array_column_hierarchy) + elif array_data and array_children: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + expanded_array_data.append({}) + for index, datum_value in enumerate(data_value): + expanded_array_data[array_index][array_children['children'][index]] = datum_value + elif array_data: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + expanded_array_data.append({}) + expanded_array_data[array_index][array_column] = data_value + else: + for index, array_child in enumerate(array_children): + for expanded_array_datum in expanded_array_data: + expanded_array_datum[array_children['children'][index]] = '' + expanded_array_dict[data_index] = expanded_array_data + return expanded_columns, data + @classmethod def extra_table_metadata(cls, database, table_name, schema_name): indexes = database.get_indexes(table_name, schema_name) diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 6a5ee8ebc53ae..67821f502ed95 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -279,10 +279,14 @@ def execute_sql_statements( latest_partition=False) query.end_time = now_as_float() + columns = cdf.columns if cdf.columns else [] + data = cdf.data if cdf.data else [] + columns, data = db_engine_spec.expand_data(columns, data) + payload.update({ 'status': query.status, - 'data': cdf.data if cdf.data else [], - 'columns': cdf.columns if cdf.columns else [], + 'data': data, + 'columns': columns, 'query': query.to_dict(), }) From 835c1c1830a16a1dd6f00073605bd37fc4aa28e4 Mon Sep 17 00:00:00 2001 From: khtruong Date: Fri, 24 May 2019 00:24:42 -0700 Subject: [PATCH 2/6] fix: presto arrays --- superset/db_engine_specs.py | 92 ++++++++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 27 deletions(-) diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 15f500c5b7564..cb025a54fdeb2 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -1210,35 +1210,73 @@ def expand_data(cls, columns, data): for row_column in ordered_row_columns: cls.expand_row_data(datum, row_column, row_column_hierarchy) - # This part of the code addresses arrays and is buggy - ordered_array_columns = array_column_hierarchy.keys() - expanded_array_dict = {} - for data_index, datum in enumerate(data): - expanded_array_data = [datum.copy()] - datum_copy = expanded_array_data[0] + while array_column_hierarchy: + ready_array_columns = [] + child_arrays = set() + unprocessed_array_columns = set() + ordered_array_columns = list(array_column_hierarchy.keys()) + child_array = '' for array_column in ordered_array_columns: - array_data = datum_copy[array_column] - array_children = array_column_hierarchy[array_column] - if str(array_column_hierarchy[array_column]['type']) == 'ROW': - new_data = expanded_array_dict.values() - for expanded_array_datum in expanded_array_data: - cls.expand_row(expanded_array_datum, array_column, array_column_hierarchy) - elif array_data and array_children: - for array_index, data_value in enumerate(array_data): - if array_index >= len(expanded_array_data): - expanded_array_data.append({}) - for index, datum_value in enumerate(data_value): - expanded_array_data[array_index][array_children['children'][index]] = datum_value - elif array_data: - for array_index, data_value in enumerate(array_data): - if array_index >= len(expanded_array_data): - expanded_array_data.append({}) - expanded_array_data[array_index][array_column] = data_value - else: - for index, array_child in enumerate(array_children): + if array_column in data[0]: + ready_array_columns.append(array_column) + elif str(array_column_hierarchy[array_column]['type']) == 'ARRAY': + child_array = array_column + child_arrays.add(array_column) + unprocessed_array_columns.add(child_array) + elif child_array and array_column.startswith(child_array): + unprocessed_array_columns.add(array_column) + + filtered_array_data = {} + for data_index, datum in enumerate(data): + filtered_array_datum = {} + for array_column in ready_array_columns: + filtered_array_datum[array_column] = datum[array_column] + filtered_array_data[data_index] = [filtered_array_datum] + + for org_data_index, expanded_array_data in filtered_array_data.items(): + for array_column in ordered_array_columns: + if array_column in unprocessed_array_columns: + continue + if str(array_column_hierarchy[array_column]['type']) == 'ROW': for expanded_array_datum in expanded_array_data: - expanded_array_datum[array_children['children'][index]] = '' - expanded_array_dict[data_index] = expanded_array_data + cls.expand_row_data(expanded_array_datum, array_column, array_column_hierarchy) + continue + array_data = expanded_array_data[0][array_column] + array_children = array_column_hierarchy[array_column] + if not array_data and not array_children['children']: + continue + elif array_data and array_children['children']: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + expanded_array_data.append({}) + for index, datum_value in enumerate(data_value): + expanded_array_data[array_index][array_children['children'][index]] = datum_value + elif array_data: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + expanded_array_data.append({}) + expanded_array_data[array_index][array_column] = data_value + else: + for index, array_child in enumerate(array_children): + for expanded_array_datum in expanded_array_data: + expanded_array_datum[array_children['children'][index]] = '' + + data_index = 0 + org_data_index = 0 + while data_index < len(data): + data[data_index] = {**filtered_array_data[org_data_index][0], **data[data_index]} + filtered_array_data[org_data_index].pop(0) + data[data_index + 1:data_index + 1] = filtered_array_data[org_data_index] + data_index = data_index + len(filtered_array_data[org_data_index]) + 1 + org_data_index = org_data_index + 1 + + next_processed_arrays = unprocessed_array_columns + for array_column in ordered_array_columns: + if array_column in next_processed_arrays: + continue + else: + del array_column_hierarchy[array_column] + return expanded_columns, data @classmethod From af7346374e26afb44452ec079631d3ed21d47c27 Mon Sep 17 00:00:00 2001 From: khtruong Date: Wed, 29 May 2019 10:16:29 -0700 Subject: [PATCH 3/6] fix: return selected and expanded columns --- superset/db_engine_specs.py | 48 ++++++++++++++++++++++--------------- superset/sql_lab.py | 8 ++++--- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index cb025a54fdeb2..21af7a981060d 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -161,7 +161,7 @@ def fetch_data(cls, cursor, limit): @classmethod def expand_data(cls, columns, data): - return columns, data + return columns, data, [] @classmethod def alter_new_orm_column(cls, orm_col): @@ -1165,26 +1165,28 @@ def create_column_hierarchy(cls, columns, column_hierarchy, parent_columns): @classmethod def expand_row_data(cls, datum, column, column_hierarchy): - row_data = datum[column] - row_children = column_hierarchy[column]['children'] - if row_data and len(row_data) != len(row_children): - raise Exception("mismatched arrays") - elif row_data: - for index, data_value in enumerate(row_data): - datum[row_children[index]] = data_value - else: - for index, row_child in enumerate(row_children): - datum[row_child] = '' + if column in datum: + row_data = datum[column] + row_children = column_hierarchy[column]['children'] + if row_data and len(row_data) != len(row_children): + raise Exception("mismatched arrays") + elif row_data: + for index, data_value in enumerate(row_data): + datum[row_children[index]] = data_value + else: + for index, row_child in enumerate(row_children): + datum[row_child] = '' @classmethod def expand_data(cls, columns, data): + all_columns = [] expanded_columns = [] for column in columns: if column['type'].startswith('ARRAY') or column['type'].startswith('ROW'): full_data_type = '{} {}'.format(column['name'], column['type'].lower()) - cls._parse_structural_column(full_data_type, expanded_columns) + cls._parse_structural_column(full_data_type, all_columns) else: - expanded_columns.append(column) + all_columns.append(column) row_column_hierarchy = OrderedDict() array_column_hierarchy = OrderedDict() @@ -1194,6 +1196,7 @@ def expand_data(cls, columns, data): parsed_row_columns = [] full_data_type = '{} {}'.format(column['name'], column['type'].lower()) cls._parse_structural_column(full_data_type, parsed_row_columns) + expanded_columns = expanded_columns + parsed_row_columns[1:] filtered_row_columns, array_columns = cls._filter_presto_cols(parsed_row_columns) expanded_array_columns = expanded_array_columns + array_columns cls.create_column_hierarchy(filtered_row_columns, row_column_hierarchy, ['ROW']) @@ -1202,6 +1205,7 @@ def expand_data(cls, columns, data): parsed_array_columns = [] full_data_type = '{} {}'.format(column['name'], column['type'].lower()) cls._parse_structural_column(full_data_type, parsed_array_columns) + expanded_columns = expanded_columns + parsed_array_columns[1:] expanded_array_columns = expanded_array_columns + parsed_array_columns cls.create_column_hierarchy(parsed_array_columns, array_column_hierarchy, ['ROW', 'ARRAY']) @@ -1248,23 +1252,29 @@ def expand_data(cls, columns, data): elif array_data and array_children['children']: for array_index, data_value in enumerate(array_data): if array_index >= len(expanded_array_data): - expanded_array_data.append({}) + empty_dict = {} + for expanded_column in all_columns: + empty_dict[expanded_column['name']] = '' + expanded_array_data.append(empty_dict) for index, datum_value in enumerate(data_value): expanded_array_data[array_index][array_children['children'][index]] = datum_value elif array_data: for array_index, data_value in enumerate(array_data): if array_index >= len(expanded_array_data): - expanded_array_data.append({}) + empty_dict = {} + for expanded_column in all_columns: + empty_dict[expanded_column['name']] = '' + expanded_array_data.append(empty_dict) expanded_array_data[array_index][array_column] = data_value else: - for index, array_child in enumerate(array_children): + for index, array_child in enumerate(array_children['children']): for expanded_array_datum in expanded_array_data: - expanded_array_datum[array_children['children'][index]] = '' + expanded_array_datum[array_child] = '' data_index = 0 org_data_index = 0 while data_index < len(data): - data[data_index] = {**filtered_array_data[org_data_index][0], **data[data_index]} + data[data_index].update(filtered_array_data[org_data_index][0]) filtered_array_data[org_data_index].pop(0) data[data_index + 1:data_index + 1] = filtered_array_data[org_data_index] data_index = data_index + len(filtered_array_data[org_data_index]) + 1 @@ -1277,7 +1287,7 @@ def expand_data(cls, columns, data): else: del array_column_hierarchy[array_column] - return expanded_columns, data + return all_columns, data, expanded_columns @classmethod def extra_table_metadata(cls, database, table_name, schema_name): diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 67821f502ed95..b153246a98b54 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -279,14 +279,16 @@ def execute_sql_statements( latest_partition=False) query.end_time = now_as_float() - columns = cdf.columns if cdf.columns else [] + selected_columns = cdf.columns if cdf.columns else [] data = cdf.data if cdf.data else [] - columns, data = db_engine_spec.expand_data(columns, data) + all_columns, data, expanded_columns = db_engine_spec.expand_data(selected_columns, data) payload.update({ 'status': query.status, 'data': data, - 'columns': columns, + 'columns': all_columns, + 'selected_columns': selected_columns, + 'expanded_columns': expanded_columns, 'query': query.to_dict(), }) From 0e1683c2c9c15d1bdf5956f9b557f67b47fb246b Mon Sep 17 00:00:00 2001 From: khtruong Date: Thu, 30 May 2019 14:25:16 -0700 Subject: [PATCH 4/6] fix: add helper methods and unit tests --- superset/db_engine_specs.py | 498 ++++++++++++++++++++++++---------- superset/sql_lab.py | 3 +- tests/db_engine_specs_test.py | 122 ++++++++- 3 files changed, 476 insertions(+), 147 deletions(-) diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 21af7a981060d..22e5fa0c5570f 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -36,7 +36,7 @@ import re import textwrap import time -from typing import List, Tuple +from typing import List, Set, Tuple from urllib import parse from flask import g @@ -160,7 +160,9 @@ def fetch_data(cls, cursor, limit): return cursor.fetchall() @classmethod - def expand_data(cls, columns, data): + def expand_data(cls, + columns: List[dict], + data: List[dict]) -> Tuple[List[dict], List[dict], List[dict]]: return columns, data, [] @classmethod @@ -831,20 +833,16 @@ def get_view_names(cls, inspector, schema): return [] @classmethod - def _create_column_info(cls, name: str, data_type: str, column: RowProxy = None) -> dict: + def _create_column_info(cls, name: str, data_type: str) -> dict: """ Create column info object - :param column: column object :param name: column name :param data_type: column data type :return: column info object """ return { 'name': name, - 'type': data_type, - # newer Presto no longer includes this column - 'nullable': getattr(column, 'Null', True) if column else True, - 'default': None, + 'type': '{}'.format(data_type), } @classmethod @@ -883,10 +881,9 @@ def _split_data_type(cls, data_type: str, delimiter: str) -> List[str]: r'{}(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'.format(delimiter), data_type) @classmethod - def _parse_structural_column(cls, full_data_type: str, result: List[dict], column: RowProxy = None) -> None: + def _parse_structural_column(cls, full_data_type: str, result: List[dict]) -> None: """ Parse a row or array column - :param column: column :param result: list tracking the results """ # split on open parenthesis ( to get the structural @@ -919,13 +916,12 @@ def _parse_structural_column(cls, full_data_type: str, result: List[dict], colum full_parent_path = cls._get_full_name(stack) result.append(cls._create_column_info( full_parent_path, - presto_type_map[field_info[1]](), - column)) + presto_type_map[field_info[1]]())) else: # otherwise this field is a basic data type full_parent_path = cls._get_full_name(stack) column_name = '{}.{}'.format(full_parent_path, field_info[0]) result.append(cls._create_column_info( - column_name, presto_type_map[field_info[1]](), column)) + column_name, presto_type_map[field_info[1]]())) # If the component type ends with a structural data type, do not pop # the stack. We have run across a structural data type within the # overall structural data type. Otherwise, we have completely parsed @@ -978,7 +974,11 @@ def get_columns( # parse column if it is a row or array if 'array' in column.Type or 'row' in column.Type: full_data_type = '{} {}'.format(column.Column, column.Type) - cls._parse_structural_column(full_data_type, result, column) + structural_column_index = len(result) + cls._parse_structural_column(full_data_type, result) + result[structural_column_index]['nullable'] = getattr( + column, 'Null', True) + result[structural_column_index]['default'] = None continue else: # otherwise column is a basic data type column_type = presto_type_map[column.Type]() @@ -986,7 +986,10 @@ def get_columns( logging.info('Did not recognize type {} of column {}'.format( column.Type, column.Column)) column_type = types.NullType - result.append(cls._create_column_info(column.Column, column_type, column)) + column_info = cls._create_column_info(column.Column, column_type) + column_info['nullable'] = getattr(column, 'Null', True) + column_info['default'] = None + result.append(column_info) return result @classmethod @@ -1031,18 +1034,12 @@ def _get_fields(cls, cols: List[dict]) -> List[ColumnClause]: return column_clauses @classmethod - def _filter_presto_cols(cls, cols: List[dict]) -> Tuple[List[dict], List[dict]]: + def _filter_out_array_nested_cols( + cls, cols: List[dict]) -> Tuple[List[dict], List[dict]]: """ - We want to filter out columns that correspond to array content because expanding - arrays would require us to use unnest and join. This can lead to a large, - complicated, and slow query. - - Example: select array_content - from TABLE - cross join UNNEST(array_column) as t(array_content); - - We know which columns to skip because cols is a list provided to us in a specific - order where a structural column is positioned right before its content. + Filter out columns that correspond to array content. We know which columns to + skip because cols is a list provided to us in a specific order where a structural + column is positioned right before its content. Example: Column Name: ColA, Column Data Type: array(row(nest_obj int)) cols = [ ..., ColA, ColA.nest_obj, ... ] @@ -1050,7 +1047,7 @@ def _filter_presto_cols(cls, cols: List[dict]) -> Tuple[List[dict], List[dict]]: When we run across an array, check if subsequent column names start with the array name and skip them. :param cols: columns - :return: filtered list of columns + :return: filtered list of columns and list of array columns and its nested fields """ filtered_cols = [] array_cols = [] @@ -1082,7 +1079,6 @@ def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None, """ presto_cols = cols if show_cols: - # presto_cols = cls._filter_presto_cols(cols) dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)' presto_cols = [ col for col in presto_cols if re.search(dot_regex, col['name']) is None] @@ -1139,37 +1135,104 @@ def fetch_result_sets(cls, db, datasource_type): return result_sets @classmethod - def create_column_hierarchy(cls, columns, column_hierarchy, parent_columns): + def _build_column_hierarchy(cls, + columns: List[dict], + parent_column_types: List[str], + column_hierarchy: dict) -> None: + """ + Build a graph where the root node represents a column whose data type is in + parent_column_types. A node's children represent that column's nested fields + :param columns: list of columns + :param parent_column_types: list of data types that decide what columns can + be root nodes + :param column_hierarchy: dictionary representing the graph + """ if len(columns) == 0: return - # get root column name - root_column = columns.pop(0) - # empty list - nested_columns = {'type': root_column['type'], 'children': []} - column_hierarchy[root_column['name']] = nested_columns - # while there are columns to explore + root = columns.pop(0) + root_info = {'type': root['type'], 'children': []} + column_hierarchy[root['name']] = root_info while columns: - column_head = columns[0] - if not column_head['name'].startswith('{}.'.format(root_column['name'])): + column = columns[0] + # If the column name does not start with the root's name, + # then this column is not a nested field + if not column['name'].startswith('{}.'.format(root['name'])): break - # if column is of parent type, recursive call - if str(column_head['type']) in parent_columns: - cls.create_column_hierarchy(columns, column_hierarchy, parent_columns) - nested_columns['children'].append(column_head['name']) + # If the column's data type is one of the parent types, + # then this column may have nested fields + if str(column['type']) in parent_column_types: + cls._build_column_hierarchy(columns, parent_column_types, + column_hierarchy) + root_info['children'].append(column['name']) continue - # assign column to list - else: - nested_columns['children'].append(column_head['name']) - # remove column + else: # The column is a nested field + root_info['children'].append(column['name']) columns.pop(0) @classmethod - def expand_row_data(cls, datum, column, column_hierarchy): + def _create_row_and_array_hierarchy( + cls, selected_columns: List[dict]) -> Tuple[dict, dict, List[dict]]: + """ + Build graphs where the root node represents a row or array and its children + are that column's nested fields + :param selected_columns: columns selected in a query + :return: graph representing a row, graph representing an array, and a list + of all the nested fields + """ + row_column_hierarchy: OrderedDict = OrderedDict() + array_column_hierarchy: OrderedDict = OrderedDict() + expanded_columns: List[dict] = [] + for column in selected_columns: + if column['type'].startswith('ROW'): + parsed_row_columns: List[dict] = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_row_columns) + expanded_columns = expanded_columns + parsed_row_columns[1:] + filtered_row_columns, array_columns = cls._filter_out_array_nested_cols( + parsed_row_columns) + cls._build_column_hierarchy(filtered_row_columns, + ['ROW'], + row_column_hierarchy) + cls._build_column_hierarchy(array_columns, + ['ROW', 'ARRAY'], + array_column_hierarchy) + elif column['type'].startswith('ARRAY'): + parsed_array_columns: List[dict] = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_array_columns) + expanded_columns = expanded_columns + parsed_array_columns[1:] + cls._build_column_hierarchy(parsed_array_columns, + ['ROW', 'ARRAY'], + array_column_hierarchy) + return row_column_hierarchy, array_column_hierarchy, expanded_columns + + @classmethod + def _create_empty_row_of_data(cls, columns: List[dict]) -> dict: + """ + Create an empty row of data + :param columns: list of columns + :return: dictionary representing an empty row of data + """ + empty_data = {} + for column in columns: + empty_data[column['name']] = '' + return empty_data + + @classmethod + def _expand_row_data(cls, datum: dict, column: str, column_hierarchy: dict) -> None: + """ + Separate out nested fields and its value in a row of data + :param datum: row of data + :param column: row column name + :param column_hierarchy: dictionary tracking structural columns and its + nested fields + """ if column in datum: row_data = datum[column] row_children = column_hierarchy[column]['children'] if row_data and len(row_data) != len(row_children): - raise Exception("mismatched arrays") + raise Exception('The number of data values and number of nested' + 'fields are not equal') elif row_data: for index, data_value in enumerate(row_data): datum[row_children[index]] = data_value @@ -1178,9 +1241,231 @@ def expand_row_data(cls, datum, column, column_hierarchy): datum[row_child] = '' @classmethod - def expand_data(cls, columns, data): - all_columns = [] - expanded_columns = [] + def _split_array_columns_by_process_state( + cls, array_columns: List[str], + array_column_hierarchy: dict, + datum: dict) -> Tuple[List[str], Set[str]]: + """ + Take a list of array columns and split them according to whether or not we are + ready to process them from a data set + :param array_columns: list of array columns + :param array_column_hierarchy: graph representing array columns + :param datum: row of data + :return: list of array columns ready to be processed and set of array columns + not ready to be processed + """ + array_columns_to_process = [] + unprocessed_array_columns = set() + child_array = '' + for array_column in array_columns: + if array_column in datum: + array_columns_to_process.append(array_column) + elif str(array_column_hierarchy[array_column]['type']) == 'ARRAY': + child_array = array_column + unprocessed_array_columns.add(child_array) + elif child_array and array_column.startswith(child_array): + unprocessed_array_columns.add(array_column) + return array_columns_to_process, unprocessed_array_columns + + @classmethod + def _convert_data_list_to_array_data_dict( + cls, data: List[dict], array_columns_to_process: List[str]) -> dict: + """ + Pull out array data from rows of data into a dictionary where the key represents + the index in the data list and the value is the array data values + Example: + data = [ + {'ColumnA': [1, 2], 'ColumnB': 3}, + {'ColumnA': [11, 22], 'ColumnB': 3} + ] + data dictionary = { + 0: [{'ColumnA': [1, 2]], + 1: [{'ColumnA': [11, 22]] + } + :param data: rows of data + :param array_columns_to_process: array columns we want to pull out + :return: data dictionary + """ + array_data_dict = {} + for data_index, datum in enumerate(data): + all_array_datum = {} + for array_column in array_columns_to_process: + all_array_datum[array_column] = datum[array_column] + array_data_dict[data_index] = [all_array_datum] + return array_data_dict + + @classmethod + def _process_array_data(cls, + data: List[dict], + all_columns: List[dict], + array_column_hierarchy: dict) -> dict: + """ + Pull out array data that is ready to be processed into a dictionary. + The key refers to the index in the original data set. The value is + a list of data values. Initially this list will contain just one value, + the row of data that corresponds to the index in the original data set. + As we process arrays, we will pull out array values into separate rows + and append them to the list of data values. + Example: + Original data set = [ + {'ColumnA': [1, 2], 'ColumnB': [3]}, + {'ColumnA': [11, 22], 'ColumnB': [33]} + ] + all_array_data (intially) = { + 0: [{'ColumnA': [1, 2], 'ColumnB': [3}], + 1: [{'ColumnA': [11, 22], 'ColumnB': [33]}] + } + all_array_data (after processing) = { + 0: [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + ], + 1: [ + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ], + } + :param data: rows of data + :param all_columns: list of columns + :param array_column_hierarchy: graph representing array columns + :return: dictionary representing processed array data + """ + array_columns = list(array_column_hierarchy.keys()) + # Determine what columns are ready to be processed. This is necessary for + # array columns that contain rows with nested arrays. We first process + # the outer arrays before processing inner arrays. + array_columns_to_process, \ + unprocessed_array_columns = cls._split_array_columns_by_process_state( + array_columns, array_column_hierarchy, data[0]) + + # Pull out array data that is ready to be processed into a dictionary. + all_array_data = cls._convert_data_list_to_array_data_dict( + data, array_columns_to_process) + + for original_data_index, expanded_array_data in all_array_data.items(): + for array_column in array_columns: + if array_column in unprocessed_array_columns: + continue + # Expand array values that are rows + if str(array_column_hierarchy[array_column]['type']) == 'ROW': + for array_value in expanded_array_data: + cls._expand_row_data(array_value, + array_column, + array_column_hierarchy) + continue + array_data = expanded_array_data[0][array_column] + array_children = array_column_hierarchy[array_column] + # This is an empty array of primitive data type + if not array_data and not array_children['children']: + continue + # Pull out complex array values into its own row of data + elif array_data and array_children['children']: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + empty_data = cls._create_empty_row_of_data(all_columns) + expanded_array_data.append(empty_data) + for index, datum_value in enumerate(data_value): + array_child = array_children['children'][index] + expanded_array_data[array_index][array_child] = datum_value + # Pull out primitive array values into its own row of data + elif array_data: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + empty_data = cls._create_empty_row_of_data(all_columns) + expanded_array_data.append(empty_data) + expanded_array_data[array_index][array_column] = data_value + # This is an empty array with nested fields + else: + for index, array_child in enumerate(array_children['children']): + for array_value in expanded_array_data: + array_value[array_child] = '' + return all_array_data + + @classmethod + def _consolidate_array_data_into_data(cls, + data: List[dict], + array_data: dict) -> None: + """ + Consolidate data given a list representing rows of data and a dictionary + representing expanded array data + Example: + Original data set = [ + {'ColumnA': [1, 2], 'ColumnB': [3]}, + {'ColumnA': [11, 22], 'ColumnB': [33]} + ] + array_data = { + 0: [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + ], + 1: [ + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ], + } + Final data set = [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ] + :param data: list representing rows of data + :param array_data: dictionary representing expanded array data + :return: list where data and array_data are combined + """ + data_index = 0 + original_data_index = 0 + while data_index < len(data): + data[data_index].update(array_data[original_data_index][0]) + array_data[original_data_index].pop(0) + data[data_index + 1:data_index + 1] = array_data[original_data_index] + data_index = data_index + len(array_data[original_data_index]) + 1 + original_data_index = original_data_index + 1 + + @classmethod + def _remove_processed_array_columns(cls, + array_columns: List[str], + unprocessed_array_columns: Set[str], + array_column_hierarchy: dict) -> None: + """ + Remove keys representing array columns that have already been processed + :param array_columns: full list of array columns + :param unprocessed_array_columns: list of unprocessed array columns + :param array_column_hierarchy: graph representing array columns + """ + for array_column in array_columns: + if array_column in unprocessed_array_columns: + continue + else: + del array_column_hierarchy[array_column] + + @classmethod + def expand_data(cls, + columns: List[dict], + data: List[dict]) -> Tuple[List[dict], List[dict], List[dict]]: + """ + We do not immediately display rows and arrays clearly in the data grid. This + method separates out nested fields and data values to help clearly display + structural columns. + + Example: ColumnA is a row(nested_obj varchar) and ColumnB is an array(int) + Original data set = [ + {'ColumnA': ['a1'], 'ColumnB': [1, 2]}, + {'ColumnA': ['a2'], 'ColumnB': [3, 4]}, + ] + Expanded data set = [ + {'ColumnA': ['a1'], 'ColumnA.nested_obj': 'a1', 'ColumnB': 1}, + {'ColumnA': '', 'ColumnA.nested_obj': '', 'ColumnB': 2}, + {'ColumnA': ['a2'], 'ColumnA.nested_obj': 'a2', 'ColumnB': 3}, + {'ColumnA': '', 'ColumnA.nested_obj': '', 'ColumnB': 4}, + ] + :param columns: columns selected in the query + :param data: original data set + :return: list of all columns(selected columns and their nested fields), + expanded data set, listed of nested fields + """ + all_columns: List[dict] = [] + # Get the list of all columns (selected fields and their nested fields) for column in columns: if column['type'].startswith('ARRAY') or column['type'].startswith('ROW'): full_data_type = '{} {}'.format(column['name'], column['type'].lower()) @@ -1188,104 +1473,33 @@ def expand_data(cls, columns, data): else: all_columns.append(column) - row_column_hierarchy = OrderedDict() - array_column_hierarchy = OrderedDict() - expanded_array_columns = [] - for column in columns: - if column['type'].startswith('ROW'): - parsed_row_columns = [] - full_data_type = '{} {}'.format(column['name'], column['type'].lower()) - cls._parse_structural_column(full_data_type, parsed_row_columns) - expanded_columns = expanded_columns + parsed_row_columns[1:] - filtered_row_columns, array_columns = cls._filter_presto_cols(parsed_row_columns) - expanded_array_columns = expanded_array_columns + array_columns - cls.create_column_hierarchy(filtered_row_columns, row_column_hierarchy, ['ROW']) - cls.create_column_hierarchy(array_columns, array_column_hierarchy, ['ROW', 'ARRAY']) - elif column['type'].startswith('ARRAY'): - parsed_array_columns = [] - full_data_type = '{} {}'.format(column['name'], column['type'].lower()) - cls._parse_structural_column(full_data_type, parsed_array_columns) - expanded_columns = expanded_columns + parsed_array_columns[1:] - expanded_array_columns = expanded_array_columns + parsed_array_columns - cls.create_column_hierarchy(parsed_array_columns, array_column_hierarchy, ['ROW', 'ARRAY']) + # Build graphs where the root node is a row or array and its children are that + # column's nested fields + row_column_hierarchy,\ + array_column_hierarchy,\ + expanded_columns = cls._create_row_and_array_hierarchy(columns) + # Pull out a row's nested fields and their values into separate columns ordered_row_columns = row_column_hierarchy.keys() for data_index, datum in enumerate(data): for row_column in ordered_row_columns: - cls.expand_row_data(datum, row_column, row_column_hierarchy) + cls._expand_row_data(datum, row_column, row_column_hierarchy) while array_column_hierarchy: - ready_array_columns = [] - child_arrays = set() - unprocessed_array_columns = set() - ordered_array_columns = list(array_column_hierarchy.keys()) - child_array = '' - for array_column in ordered_array_columns: - if array_column in data[0]: - ready_array_columns.append(array_column) - elif str(array_column_hierarchy[array_column]['type']) == 'ARRAY': - child_array = array_column - child_arrays.add(array_column) - unprocessed_array_columns.add(child_array) - elif child_array and array_column.startswith(child_array): - unprocessed_array_columns.add(array_column) - - filtered_array_data = {} - for data_index, datum in enumerate(data): - filtered_array_datum = {} - for array_column in ready_array_columns: - filtered_array_datum[array_column] = datum[array_column] - filtered_array_data[data_index] = [filtered_array_datum] - - for org_data_index, expanded_array_data in filtered_array_data.items(): - for array_column in ordered_array_columns: - if array_column in unprocessed_array_columns: - continue - if str(array_column_hierarchy[array_column]['type']) == 'ROW': - for expanded_array_datum in expanded_array_data: - cls.expand_row_data(expanded_array_datum, array_column, array_column_hierarchy) - continue - array_data = expanded_array_data[0][array_column] - array_children = array_column_hierarchy[array_column] - if not array_data and not array_children['children']: - continue - elif array_data and array_children['children']: - for array_index, data_value in enumerate(array_data): - if array_index >= len(expanded_array_data): - empty_dict = {} - for expanded_column in all_columns: - empty_dict[expanded_column['name']] = '' - expanded_array_data.append(empty_dict) - for index, datum_value in enumerate(data_value): - expanded_array_data[array_index][array_children['children'][index]] = datum_value - elif array_data: - for array_index, data_value in enumerate(array_data): - if array_index >= len(expanded_array_data): - empty_dict = {} - for expanded_column in all_columns: - empty_dict[expanded_column['name']] = '' - expanded_array_data.append(empty_dict) - expanded_array_data[array_index][array_column] = data_value - else: - for index, array_child in enumerate(array_children['children']): - for expanded_array_datum in expanded_array_data: - expanded_array_datum[array_child] = '' - - data_index = 0 - org_data_index = 0 - while data_index < len(data): - data[data_index].update(filtered_array_data[org_data_index][0]) - filtered_array_data[org_data_index].pop(0) - data[data_index + 1:data_index + 1] = filtered_array_data[org_data_index] - data_index = data_index + len(filtered_array_data[org_data_index]) + 1 - org_data_index = org_data_index + 1 - - next_processed_arrays = unprocessed_array_columns - for array_column in ordered_array_columns: - if array_column in next_processed_arrays: - continue - else: - del array_column_hierarchy[array_column] + array_columns = list(array_column_hierarchy.keys()) + # Determine what columns are ready to be processed. + array_columns_to_process,\ + unprocessed_array_columns = cls._split_array_columns_by_process_state( + array_columns, array_column_hierarchy, data[0]) + all_array_data = cls._process_array_data(data, + all_columns, + array_column_hierarchy) + # Consolidate the original data set and the expanded array data + cls._consolidate_array_data_into_data(data, all_array_data) + # Remove processed array columns from the graph + cls._remove_processed_array_columns(array_columns, + unprocessed_array_columns, + array_column_hierarchy) return all_columns, data, expanded_columns diff --git a/superset/sql_lab.py b/superset/sql_lab.py index b153246a98b54..2e5a95c32bce3 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -281,7 +281,8 @@ def execute_sql_statements( selected_columns = cdf.columns if cdf.columns else [] data = cdf.data if cdf.data else [] - all_columns, data, expanded_columns = db_engine_spec.expand_data(selected_columns, data) + all_columns, data, expanded_columns = db_engine_spec.expand_data( + selected_columns, data) payload.update({ 'status': query.status, diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py index e0d914f0b51e4..10408dbd64ca3 100644 --- a/tests/db_engine_specs_test.py +++ b/tests/db_engine_specs_test.py @@ -398,13 +398,127 @@ def test_presto_get_fields(self): self.assertEqual(actual_result.element.name, expected_result['name']) self.assertEqual(actual_result.name, expected_result['label']) - def test_presto_filter_presto_cols(self): + def test_presto_filter_out_array_nested_cols(self): cols = [ {'name': 'column', 'type': 'ARRAY'}, {'name': 'column.nested_obj', 'type': 'FLOAT'}] - actual_results = PrestoEngineSpec._filter_presto_cols(cols) - expected_results = [cols[0]] - self.assertEqual(actual_results, expected_results) + actual_filtered_cols,\ + actual_array_cols = PrestoEngineSpec._filter_out_array_nested_cols(cols) + expected_filtered_cols = [{'name': 'column', 'type': 'ARRAY'}] + self.assertEqual(actual_filtered_cols, expected_filtered_cols) + self.assertEqual(actual_array_cols, cols) + + def test_presto_expand_data_with_simple_structural_columns(self): + cols = [ + {'name': 'row_column', 'type': 'ROW(NESTED_OBJ VARCHAR)'}, + {'name': 'array_column', 'type': 'ARRAY(BIGINT)'}] + data = [ + {'row_column': ['a'], 'array_column': [1, 2, 3]}, + {'row_column': ['b'], 'array_column': [4, 5, 6]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'row_column', 'type': 'ROW'}, + {'name': 'row_column.nested_obj', 'type': 'VARCHAR'}, + {'name': 'array_column', 'type': 'ARRAY'}] + expected_data = [ + {'row_column': ['a'], 'row_column.nested_obj': 'a', 'array_column': 1}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 2}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 3}, + {'row_column': ['b'], 'row_column.nested_obj': 'b', 'array_column': 4}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 5}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 6}] + expected_expanded_cols = [ + {'name': 'row_column.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_data_with_complex_row_columns(self): + cols = [ + {'name': 'row_column', + 'type': 'ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 VARCHAR)'}] + data = [ + {'row_column': ['a1', ['a2']]}, + {'row_column': ['b1', ['b2']]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'row_column', 'type': 'ROW'}, + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}] + expected_data = [ + {'row_column': ['a1', ['a2']], + 'row_column.nested_obj1': 'a1', + 'row_column.nested_row': ['a2'], + 'row_column.nested_row.nested_obj2': 'a2'}, + {'row_column': ['b1', ['b2']], + 'row_column.nested_obj1': 'b1', + 'row_column.nested_row': ['b2'], + 'row_column.nested_row.nested_obj2': 'b2'}] + expected_expanded_cols = [ + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_data_with_complex_array_columns(self): + cols = [ + {'name': 'int_column', 'type': 'BIGINT'}, + {'name': 'array_column', + 'type': 'ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ VARCHAR))))'}] + data = [ + {'int_column': 1, 'array_column': [[[['a'], ['b']]], [[['c'], ['d']]]]}, + {'int_column': 2, 'array_column': [[[['e'], ['f']]], [[['g'], ['h']]]]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'int_column', 'type': 'BIGINT'}, + {'name': 'array_column', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + expected_data = [ + {'int_column': 1, + 'array_column': [[[['a'], ['b']]], [[['c'], ['d']]]], + 'array_column.nested_array': [['a'], ['b']], + 'array_column.nested_array.nested_obj': 'a'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'b'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': [['c'], ['d']], + 'array_column.nested_array.nested_obj': 'c'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'd'}, + {'int_column': 2, + 'array_column': [[[['e'], ['f']]], [[['g'], ['h']]]], + 'array_column.nested_array': [['e'], ['f']], + 'array_column.nested_array.nested_obj': 'e'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'f'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': [['g'], ['h']], + 'array_column.nested_array.nested_obj': 'g'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'h'}] + expected_expanded_cols = [ + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) def test_hive_get_view_names_return_empty_list(self): self.assertEquals([], HiveEngineSpec.get_view_names(mock.ANY, mock.ANY)) From ccaddce3f2095dfe7bfc60f072856d2ee91bb01a Mon Sep 17 00:00:00 2001 From: khtruong Date: Thu, 30 May 2019 15:13:36 -0700 Subject: [PATCH 5/6] fix: only allow exploration of selected columns --- .../assets/src/SqlLab/components/ExploreResultsButton.jsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx b/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx index 2394c6713a942..5ac1e2570df1a 100644 --- a/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx +++ b/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx @@ -85,8 +85,8 @@ class ExploreResultsButton extends React.PureComponent { } getColumns() { const props = this.props; - if (props.query && props.query.results && props.query.results.columns) { - return props.query.results.columns; + if (props.query && props.query.results && props.query.results.selected_columns) { + return props.query.results.selected_columns; } return []; } @@ -97,7 +97,7 @@ class ExploreResultsButton extends React.PureComponent { const re1 = /^[A-Za-z_]\w*$/; // starts with char or _, then only alphanum const re2 = /__\d+$/; // does not finish with __ and then a number which screams dup col name - return this.props.query.results.columns.map(col => col.name) + return this.props.query.results.selected_columns.map(col => col.name) .filter(col => !re1.test(col) || re2.test(col)); } datasourceName() { From 13c3e6e9510e273a25de8a167ac73520423732d2 Mon Sep 17 00:00:00 2001 From: khtruong Date: Fri, 31 May 2019 10:00:25 -0700 Subject: [PATCH 6/6] fix: address Beto's comments and add more unit tests --- .../spec/javascripts/sqllab/fixtures.js | 16 +- superset/db_engine_specs.py | 29 ++-- superset/sql_lab.py | 4 +- tests/db_engine_specs_test.py | 159 ++++++++++++++++++ 4 files changed, 188 insertions(+), 20 deletions(-) diff --git a/superset/assets/spec/javascripts/sqllab/fixtures.js b/superset/assets/spec/javascripts/sqllab/fixtures.js index 6bd090ecaf841..99e740c338237 100644 --- a/superset/assets/spec/javascripts/sqllab/fixtures.js +++ b/superset/assets/spec/javascripts/sqllab/fixtures.js @@ -223,6 +223,20 @@ export const queries = [ type: 'STRING', }, ], + selected_columns: [ + { + is_date: true, + is_dim: false, + name: 'ds', + type: 'STRING', + }, + { + is_date: false, + is_dim: true, + name: 'gender', + type: 'STRING', + }, + ], data: [{ col1: 0, col2: 1 }, { col1: 2, col2: 3 }], }, }, @@ -264,7 +278,7 @@ export const queryWithBadColumns = { ...queries[0], results: { data: queries[0].results.data, - columns: [ + selected_columns: [ { is_date: true, is_dim: false, diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 6016866e2e032..00a9310b2acf4 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -906,7 +906,7 @@ def _create_column_info(cls, name: str, data_type: str) -> dict: """ return { 'name': name, - 'type': '{}'.format(data_type), + 'type': f'{data_type}', } @classmethod @@ -1115,8 +1115,8 @@ def _filter_out_array_nested_cols( """ filtered_cols = [] array_cols = [] - curr_array_col_name = '' - for index, col in enumerate(cols): + curr_array_col_name = None + for col in cols: # col corresponds to an array's content and should be skipped if curr_array_col_name and col['name'].startswith(curr_array_col_name): array_cols.append(col) @@ -1128,7 +1128,7 @@ def _filter_out_array_nested_cols( array_cols.append(col) filtered_cols.append(col) else: - curr_array_col_name = '' + curr_array_col_name = None filtered_cols.append(col) return filtered_cols, array_cols @@ -1145,7 +1145,7 @@ def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None, if show_cols: dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)' presto_cols = [ - col for col in presto_cols if re.search(dot_regex, col['name']) is None] + col for col in presto_cols if not re.search(dot_regex, col['name'])] return super(PrestoEngineSpec, cls).select_star( my_db, table_name, engine, schema, limit, show_cols, indent, latest_partition, presto_cols, @@ -1215,7 +1215,7 @@ def _build_column_hierarchy(cls, column = columns[0] # If the column name does not start with the root's name, # then this column is not a nested field - if not column['name'].startswith('{}.'.format(root['name'])): + if not column['name'].startswith(f"{root['name']}."): break # If the column's data type is one of the parent types, # then this column may have nested fields @@ -1272,10 +1272,7 @@ def _create_empty_row_of_data(cls, columns: List[dict]) -> dict: :param columns: list of columns :return: dictionary representing an empty row of data """ - empty_data = {} - for column in columns: - empty_data[column['name']] = '' - return empty_data + return {column['name']: '' for column in columns} @classmethod def _expand_row_data(cls, datum: dict, column: str, column_hierarchy: dict) -> None: @@ -1296,7 +1293,7 @@ def _expand_row_data(cls, datum: dict, column: str, column_hierarchy: dict) -> N for index, data_value in enumerate(row_data): datum[row_children[index]] = data_value else: - for index, row_child in enumerate(row_children): + for row_child in row_children: datum[row_child] = '' @classmethod @@ -1315,7 +1312,7 @@ def _split_array_columns_by_process_state( """ array_columns_to_process = [] unprocessed_array_columns = set() - child_array = '' + child_array = None for array_column in array_columns: if array_column in datum: array_columns_to_process.append(array_column) @@ -1483,15 +1480,14 @@ def _consolidate_array_data_into_data(cls, @classmethod def _remove_processed_array_columns(cls, - array_columns: List[str], unprocessed_array_columns: Set[str], array_column_hierarchy: dict) -> None: """ Remove keys representing array columns that have already been processed - :param array_columns: full list of array columns :param unprocessed_array_columns: list of unprocessed array columns :param array_column_hierarchy: graph representing array columns """ + array_columns = list(array_column_hierarchy.keys()) for array_column in array_columns: if array_column in unprocessed_array_columns: continue @@ -1540,7 +1536,7 @@ def expand_data(cls, # Pull out a row's nested fields and their values into separate columns ordered_row_columns = row_column_hierarchy.keys() - for data_index, datum in enumerate(data): + for datum in data: for row_column in ordered_row_columns: cls._expand_row_data(datum, row_column, row_column_hierarchy) @@ -1556,8 +1552,7 @@ def expand_data(cls, # Consolidate the original data set and the expanded array data cls._consolidate_array_data_into_data(data, all_array_data) # Remove processed array columns from the graph - cls._remove_processed_array_columns(array_columns, - unprocessed_array_columns, + cls._remove_processed_array_columns(unprocessed_array_columns, array_column_hierarchy) return all_columns, data, expanded_columns diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 2e5a95c32bce3..86e171ba44d77 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -279,8 +279,8 @@ def execute_sql_statements( latest_partition=False) query.end_time = now_as_float() - selected_columns = cdf.columns if cdf.columns else [] - data = cdf.data if cdf.data else [] + selected_columns = cdf.columns or [] + data = cdf.data or [] all_columns, data, expanded_columns = db_engine_spec.expand_data( selected_columns, data) diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py index a7325358c937a..02dbbae8bcfed 100644 --- a/tests/db_engine_specs_test.py +++ b/tests/db_engine_specs_test.py @@ -409,6 +409,165 @@ def test_presto_filter_out_array_nested_cols(self): self.assertEqual(actual_filtered_cols, expected_filtered_cols) self.assertEqual(actual_array_cols, cols) + def test_presto_create_row_and_array_hierarchy(self): + cols = [ + {'name': 'row_column', + 'type': 'ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 VARCHAR)'}, + {'name': 'array_column', + 'type': 'ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ VARCHAR))))'}] + actual_row_col_hierarchy,\ + actual_array_col_hierarchy,\ + actual_expanded_cols = PrestoEngineSpec._create_row_and_array_hierarchy(cols) + expected_row_col_hierarchy = { + 'row_column': { + 'type': 'ROW', + 'children': ['row_column.nested_obj1', 'row_column.nested_row'], + }, + 'row_column.nested_row': { + 'type': 'ROW', + 'children': ['row_column.nested_row.nested_obj2']}, + } + expected_array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + expected_expanded_cols = [ + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}, + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_row_col_hierarchy, expected_row_col_hierarchy) + self.assertEqual(actual_array_col_hierarchy, expected_array_col_hierarchy) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_row_data(self): + datum = {'row_col': [1, 'a']} + row_column = 'row_col' + row_col_hierarchy = { + 'row_col': { + 'type': 'ROW', + 'children': ['row_col.nested_int', 'row_col.nested_str'], + }, + } + PrestoEngineSpec._expand_row_data(datum, row_column, row_col_hierarchy) + expected_datum = { + 'row_col': [1, 'a'], 'row_col.nested_int': 1, 'row_col.nested_str': 'a', + } + self.assertEqual(datum, expected_datum) + + def test_split_array_columns_by_process_state(self): + array_cols = ['array_column', 'array_column.nested_array'] + array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + datum = {'array_column': [[[1], [2]]]} + actual_array_cols_to_process, actual_unprocessed_array_cols = \ + PrestoEngineSpec._split_array_columns_by_process_state( + array_cols, array_col_hierarchy, datum) + expected_array_cols_to_process = ['array_column'] + expected_unprocessed_array_cols = {'array_column.nested_array'} + self.assertEqual(actual_array_cols_to_process, expected_array_cols_to_process) + self.assertEqual(actual_unprocessed_array_cols, expected_unprocessed_array_cols) + + def test_presto_convert_data_list_to_array_data_dict(self): + data = [ + {'array_column': [1, 2], 'int_column': 3}, + {'array_column': [11, 22], 'int_column': 33}, + ] + array_columns_to_process = ['array_column'] + actual_array_data_dict = PrestoEngineSpec._convert_data_list_to_array_data_dict( + data, array_columns_to_process) + expected_array_data_dict = { + 0: [{'array_column': [1, 2]}], + 1: [{'array_column': [11, 22]}]} + self.assertEqual(actual_array_data_dict, expected_array_data_dict) + + def test_presto_process_array_data(self): + data = [ + {'array_column': [[1], [2]], 'int_column': 3}, + {'array_column': [[11], [22]], 'int_column': 33}, + ] + all_columns = [ + {'name': 'array_column', 'type': 'ARRAY'}, + {'name': 'array_column.nested_row', 'type': 'BIGINT'}, + {'name': 'int_column', 'type': 'BIGINT'}, + ] + array_column_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_row'], + }, + } + actual_array_data = PrestoEngineSpec._process_array_data( + data, all_columns, array_column_hierarchy) + expected_array_data = { + 0: [ + {'array_column': [[1], [2]], 'array_column.nested_row': 1}, + {'array_column': '', 'array_column.nested_row': 2, 'int_column': ''}, + ], + 1: [ + {'array_column': [[11], [22]], 'array_column.nested_row': 11}, + {'array_column': '', 'array_column.nested_row': 22, 'int_column': ''}, + ], + } + self.assertEqual(actual_array_data, expected_array_data) + + def test_presto_consolidate_array_data_into_data(self): + data = [ + {'arr_col': [[1], [2]], 'int_col': 3}, + {'arr_col': [[11], [22]], 'int_col': 33}, + ] + array_data = { + 0: [ + {'arr_col': [[1], [2]], 'arr_col.nested_row': 1}, + {'arr_col': '', 'arr_col.nested_row': 2, 'int_col': ''}, + ], + 1: [ + {'arr_col': [[11], [22]], 'arr_col.nested_row': 11}, + {'arr_col': '', 'arr_col.nested_row': 22, 'int_col': ''}, + ], + } + PrestoEngineSpec._consolidate_array_data_into_data(data, array_data) + expected_data = [ + {'arr_col': [[1], [2]], 'arr_col.nested_row': 1, 'int_col': 3}, + {'arr_col': '', 'arr_col.nested_row': 2, 'int_col': ''}, + {'arr_col': [[11], [22]], 'arr_col.nested_row': 11, 'int_col': 33}, + {'arr_col': '', 'arr_col.nested_row': 22, 'int_col': ''}, + ] + self.assertEqual(data, expected_data) + + def test_presto_remove_processed_array_columns(self): + array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + unprocessed_array_cols = {'array_column.nested_array'} + PrestoEngineSpec._remove_processed_array_columns( + unprocessed_array_cols, array_col_hierarchy) + expected_array_col_hierarchy = { + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + self.assertEqual(array_col_hierarchy, expected_array_col_hierarchy) + def test_presto_expand_data_with_simple_structural_columns(self): cols = [ {'name': 'row_column', 'type': 'ROW(NESTED_OBJ VARCHAR)'},