Skip to content

Commit

Permalink
Review comments, phase 1.
Browse files Browse the repository at this point in the history
Factors out the validation process to a separate class which
does validation inline, rather than passing it through the
existing query flow implicitly.

This is meant to address Dave's feedback requesting that
the validation flow not be explicitly tied to a query transform
since that's uniquely a presto-ism.

Next up in this stack: unit tests.
  • Loading branch information
Alex Berghage committed May 2, 2019
1 parent d076e0b commit ed9ee8d
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 41 deletions.
3 changes: 3 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ class CeleryConfig(object):
# Timeout duration for SQL Lab synchronous queries
SQLLAB_TIMEOUT = 30

# Timeout duration for SQL Lab query validation
SQLLAB_VALIDATION_TIMEOUT = 10

# SQLLAB_DEFAULT_DBID
SQLLAB_DEFAULT_DBID = None

Expand Down
33 changes: 0 additions & 33 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class BaseEngineSpec(object):
force_column_alias_quotes = False
arraysize = None
max_column_name_length = None
supports_validation_queries = False

@classmethod
def get_time_expr(cls, expr, pdf, time_grain, grain):
Expand Down Expand Up @@ -480,15 +479,6 @@ def get_timestamp_column(expression, column_name):
can be overridden."""
return expression or column_name

@classmethod
def make_validation_query(cls, sql):
"""
If the underlying engine supports it, modify the query sql to request
that the database validate the query instead of running it.
"""
raise Exception(
f'Database engine {cls.engine} does not support validation queries')


class PostgresBaseEngineSpec(BaseEngineSpec):
""" Abstract class for Postgres 'like' databases """
Expand Down Expand Up @@ -795,7 +785,6 @@ def extract_error_message(cls, e):

class PrestoEngineSpec(BaseEngineSpec):
engine = 'presto'
supports_validation_queries = True

time_grain_functions = {
None: '{col}',
Expand Down Expand Up @@ -1092,25 +1081,13 @@ def latest_sub_partition(cls, table_name, schema, database, **kwargs):
return ''
return df.to_dict()[field_to_return][0]

@classmethod
def make_validation_query(cls, sql):
"""
Presto supports query-validation queries by prepending explain with a
type parameter.
For example, "SELECT 1 FROM default.mytable" becomes "EXPLAIN (TYPE
VALIDATE) SELECT 1 FROM default.mytable.
"""
return f'EXPLAIN (TYPE VALIDATE) {sql}'


class HiveEngineSpec(PrestoEngineSpec):

"""Reuses PrestoEngineSpec functionality."""

engine = 'hive'
max_column_name_length = 767
supports_validation_queries = False

# Scoping regex at class level to avoid recompiling
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
Expand Down Expand Up @@ -1415,16 +1392,6 @@ def execute(cursor, query, async_=False):
cursor.execute(query, **kwargs)


@classmethod
def make_validation_query(cls, sql):
"""
Hive doesn't support query-validation queries, so we disable the handler
inherited from presto.
"""
raise Exception(
f'Database engine {cls.engine} does not support validation queries')


class MssqlEngineSpec(BaseEngineSpec):
engine = 'mssql'
epoch_to_dttm = "dateadd(S, {col}, '1970-01-01')"
Expand Down
23 changes: 23 additions & 0 deletions superset/sql_validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from . import base # noqa
from . import presto_db # noqa

# TODO: Move this to a config setting
SQL_VALIDATORS_BY_ENGINE = {
'presto': presto_db.PrestoDBSQLValidator
}
61 changes: 61 additions & 0 deletions superset/sql_validators/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import (
Any,
List,
Optional
)

class SQLValidationAnnotation:
"""Represents a single annotation (error/warning) in an SQL querytext"""
def __init__(
self,
message: str,
line_number: Optional[int],
start_column: Optional[int],
end_column: Optional[int]
):
self.message = message
self.line_number = line_number
self.start_column = start_column
self.end_column = end_column

def to_dict(self):
return {
"line_number": self.line_number,
"start_column": self.start_column,
"end_column": self.end_column,
"message": self.message,
}


class BaseSQLValidator:
"""BaseSQLValidator defines the interface for checking that a given sql
query is valid for a given database engine."""

name = 'BaseSQLValidator'

@classmethod
def validate(
cls,
sql: str,
schema: str,
database: Any
) -> List[SQLValidationAnnotation]:
"""Check that the given SQL querystring is valid for the given engine"""
raise NotImplementedError
151 changes: 151 additions & 0 deletions superset/sql_validators/presto_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import logging
from contextlib import closing
from flask import g
from pyhive.exc import DatabaseError
from typing import (
Any,
List,
Optional
)

from superset import app, security_manager
from superset.utils.core import sources
from superset.sql_parse import ParsedQuery
from superset.sql_validators.base import (
BaseSQLValidator,
SQLValidationAnnotation,
)

MAX_ERROR_ROWS = 10

config = app.config

class PrestoSQLValidationError(Exception):
"""Error in the process of asking Presto to validate SQL querytext"""
pass

class PrestoDBSQLValidator(BaseSQLValidator):
"""Validate SQL queries using Presto's built-in EXPLAIN subtype"""

name = 'PrestoDBSQLValidator'

@classmethod
def validate_statement(
cls,
statement,
database,
cursor,
user_name
) -> Optional[SQLValidationAnnotation]:
db_engine_spec = database.db_engine_spec
parsed_query = ParsedQuery(statement)
sql = parsed_query.stripped()

# Hook to allow environment-specific mutation (usually comments) to the SQL
SQL_QUERY_MUTATOR = config.get('SQL_QUERY_MUTATOR')
if SQL_QUERY_MUTATOR:
sql = SQL_QUERY_MUTATOR(sql, user_name, security_manager, database)

# Transform the final statement to an explain call before sending it on
# to presto to validate
sql = f'EXPLAIN (TYPE VALIDATE) {sql}'

# Invoke the query against presto. NB this deliberately doesn't use the
# engine spec's handle_cursor implementation since we don't record
# these EXPLAIN queries done in validation as proper Query objects
# in the superset ORM.
try:
db_engine_spec.execute(cursor, sql)
polled = cursor.poll()
while polled:
logging.info('polling presto for validation progress')
stats = polled.get('stats', {})
if stats:
state = stats.get('state')
if state == 'FINISHED':
break
polled = cursor.poll()
db_engine_spec.fetch_data(cursor, MAX_ERROR_ROWS)
return None
except DatabaseError as db_error:
if not db_error.args:
raise PrestoSQLValidationError(
"Presto (via pyhive) returned unparseable error text")
db_error = db_error.args[0]

message = db_error.get('message', "unknown prestodb error")
err_loc = db_error.get('errorLocation', {})
line_number = err_loc.get('lineNumber', None)
start_column = err_loc.get('columnNumber', None)
end_column = err_loc.get('columnNumber', None)

return SQLValidationAnnotation(
message=message,
line_number=line_number,
start_column=start_column,
end_column=end_column,
)
except Exception as e:
logging.exception(f'Error running validation query: {e}')
raise e

@classmethod
def validate(
cls,
sql: str,
schema: str,
database: Any
) -> List[SQLValidationAnnotation]:
"""
Presto supports query-validation queries by running them with a
prepended explain.
For example, "SELECT 1 FROM default.mytable" becomes "EXPLAIN (TYPE
VALIDATE) SELECT 1 FROM default.mytable.
"""
user_name = g.user.username if g.user else None
parsed_query = ParsedQuery(sql)
statements = parsed_query.get_statements()

logging.debug(f'Validating {len(statements)} statement(s)')
engine = database.get_sqla_engine(
schema=schema,
nullpool=True,
user_name=user_name,
source=sources.get('sql_lab', None),
)
# Sharing a single connection and cursor across the
# execution of all statements (if many)
annotations: List[SQLValidationAnnotation] = []
with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
for statement in parsed_query.get_statements():
annotation = cls.validate_statement(
statement,
database,
cursor,
user_name
)
if annotation:
annotations.append(annotation)
logging.debug(f'Validation found {len(annotations)} error(s)')

return annotations
46 changes: 38 additions & 8 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from superset.models.sql_lab import Query
from superset.models.user_attributes import UserAttribute
from superset.sql_parse import ParsedQuery
from superset.sql_validators import SQL_VALIDATORS_BY_ENGINE
from superset.utils import core as utils
from superset.utils import dashboard_import_export
from superset.utils.dates import now_as_float
Expand Down Expand Up @@ -2529,6 +2530,43 @@ def sql_json(self):
json_error_response(
'Database with id {} is missing.'.format(database_id))

# Validation request.
if validate_only:
if len(template_params) > 0:
# TODO: factor the Database object out of template rendering
# or provide it as mydb so we can render template params
# without having to also persist a Query ORM object.
return json_error_response(
'SQL validation does not support template parameters')

spec = mydb.db_engine_spec
if not spec.engine in SQL_VALIDATORS_BY_ENGINE:
return json_error_response(
'no SQL validator is configured for {}'.format(spec.engine))
validator = SQL_VALIDATORS_BY_ENGINE[spec.engine]

try:
timeout = config.get('SQLLAB_VALIDATION_TIMEOUT')
timeout_msg = (
f'The query exceeded the {timeout} seconds timeout.')
with utils.timeout(seconds=timeout,
error_message=timeout_msg):
errors = validator.validate(sql, schema, mydb)
payload = json.dumps(
[err.to_dict() for err in errors],
default=utils.pessimistic_json_iso_dttm_ser,
ignore_nan=True,
encoding=None,
)
return json_success(payload)
except Exception as e:
logging.exception(e)
msg = _(
'Failed to validate your SQL query text. Please check that '
f'you have configured the {validator.name} validator '
'correctly and that any services it depends on are up.')
return json_error_response(f'{msg}')

rejected_tables = security_manager.rejected_datasources(sql, mydb, schema)
if rejected_tables:
return json_error_response(
Expand Down Expand Up @@ -2581,14 +2619,6 @@ def sql_json(self):
limits = [mydb.db_engine_spec.get_limit_from_sql(rendered_query), limit]
query.limit = min(lim for lim in limits if lim is not None)

# apply validation transform last -- after template processing
if validate_only:
spec = mydb.db_engine_spec
if not spec.supports_validation_queries:
json_error_response('{} does not support validation queries'
.format(spec.engine))
rendered_query = spec.make_validation_query(rendered_query)

# Async request.
if async_:
logging.info('Running query on a Celery worker')
Expand Down

0 comments on commit ed9ee8d

Please sign in to comment.