Skip to content

Commit

Permalink
Add snowflake syntax error parsing (#323)
Browse files Browse the repository at this point in the history
Also allow duplicated executor name as long as the language is different
  • Loading branch information
czgu authored Nov 18, 2020
1 parent 1c3c551 commit f79701d
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ def NAME(cls) -> str:
def _perform_check(cls, engine_id: int) -> EngineStatus:
with DBSession() as session:
engine = get_query_engine_by_id(engine_id, session=session)
executor_name = engine.executor
executor_params = engine.get_engine_params()

return check_connection(get_executor_class(executor_name), executor_params)
return check_connection(
get_executor_class(engine.language, engine.executor), executor_params
)


def check_connection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ def NAME(cls) -> str:
def _perform_check(cls, engine_id: int) -> EngineStatus:
with DBSession() as session:
engine = get_query_engine_by_id(engine_id, session=session)
executor_name = engine.executor
executor_params = engine.get_engine_params()

return check_select_one(get_executor_class(executor_name), executor_params)
return check_select_one(
get_executor_class(engine.language, engine.executor), executor_params
)


class WrongSelectOneException(Exception):
Expand Down
9 changes: 6 additions & 3 deletions datahub/server/lib/query_executor/all_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
] + ALL_PLUGIN_EXECUTORS


def get_executor_class(name: str):
def get_executor_class(language: str, name: str):
for executor in ALL_EXECUTORS:
if executor.EXECUTOR_NAME() == name:
if (
executor.EXECUTOR_LANGUAGE() == language
and executor.EXECUTOR_NAME() == name
):
return executor

raise ValueError(f"Unknown executor name {name}")
raise ValueError(f"Unknown executor {name} with language {language}")


# Re-export parse_exception
Expand Down
3 changes: 2 additions & 1 deletion datahub/server/lib/query_executor/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ def EXECUTOR_LANGUAGE(cls) -> str:

@abstractclassmethod
def EXECUTOR_NAME(cls) -> str:
"""Distinct name for the executor
"""Distinct name for the executor.
Must be distinct under for the same language.
"""
raise NotImplementedError

Expand Down
16 changes: 15 additions & 1 deletion datahub/server/lib/query_executor/executors/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from sqlalchemy.exc import SQLAlchemyError, DBAPIError
from snowflake.connector import errors as sf_errors

from const.query_execution import QueryExecutionErrorType
from lib.query_executor.base_executor import QueryExecutorBaseClass
Expand Down Expand Up @@ -48,7 +49,7 @@ def EXECUTOR_LANGUAGE(cls):
class DruidQueryExecutor(SqlAlchemyQueryExecutor):
@classmethod
def EXECUTOR_NAME(cls):
return "druid-sqlalchemy"
return "sqlalchemy"

@classmethod
def EXECUTOR_LANGUAGE(cls):
Expand All @@ -73,3 +74,16 @@ def EXECUTOR_NAME(cls):
@classmethod
def EXECUTOR_LANGUAGE(cls):
return "snowflake"

def _parse_exception(self, e):
if isinstance(e, SQLAlchemyError):
orig_error = getattr(e, "orig", None)

if isinstance(orig_error, sf_errors.ProgrammingError):
message = orig_error.msg
match = re.search(r"error line (\d+) at position (\d+)", message)
if match is not None:
return get_parsed_syntax_error(
message, int(match.group(1)) - 1, int(match.group(2))
)
return super(SnowflakeQueryExecutor, self)._parse_exception(e)
2 changes: 1 addition & 1 deletion datahub/server/lib/utils/execute_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __call__(
user = get_user_by_id(uid, session=session)
client_settings["proxy_user"] = user.username

executor = get_executor_class(engine.executor)
executor = get_executor_class(engine.language, engine.executor)

if executor.SINGLE_QUERY_QUERY_ENGINE():
statements = [query]
Expand Down
5 changes: 3 additions & 2 deletions datahub/server/tasks/run_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def run_query_task(self, query_execution_id):

user = user_logic.get_user_by_id(uid)
engine = admin_logic.get_query_engine_by_id(engine_id)
executor_type = engine.executor

executor_params = {
"query_execution_id": query_execution_id,
Expand All @@ -56,7 +55,9 @@ def run_query_task(self, query_execution_id):
},
}

executor = get_executor_class(executor_type)(**executor_params)
executor = get_executor_class(engine.language, engine.executor)(
**executor_params
)

while True:
if self.is_aborted():
Expand Down

0 comments on commit f79701d

Please sign in to comment.