diff --git a/datahub/server/lib/engine_status_checker/connection_checker.py b/datahub/server/lib/engine_status_checker/connection_checker.py index 16ff70666..2c6fa0d1c 100644 --- a/datahub/server/lib/engine_status_checker/connection_checker.py +++ b/datahub/server/lib/engine_status_checker/connection_checker.py @@ -19,10 +19,11 @@ def NAME(cls) -> str: def _perform_check(cls, engine_id: int) -> EngineStatus: with DBSession() as session: engine = get_query_engine_by_id(engine_id, session=session) - executor_name = engine.executor executor_params = engine.get_engine_params() - return check_connection(get_executor_class(executor_name), executor_params) + return check_connection( + get_executor_class(engine.language, engine.executor), executor_params + ) def check_connection( diff --git a/datahub/server/lib/engine_status_checker/select_one_checker.py b/datahub/server/lib/engine_status_checker/select_one_checker.py index a653f683e..dff227409 100644 --- a/datahub/server/lib/engine_status_checker/select_one_checker.py +++ b/datahub/server/lib/engine_status_checker/select_one_checker.py @@ -20,10 +20,11 @@ def NAME(cls) -> str: def _perform_check(cls, engine_id: int) -> EngineStatus: with DBSession() as session: engine = get_query_engine_by_id(engine_id, session=session) - executor_name = engine.executor executor_params = engine.get_engine_params() - return check_select_one(get_executor_class(executor_name), executor_params) + return check_select_one( + get_executor_class(engine.language, engine.executor), executor_params + ) class WrongSelectOneException(Exception): diff --git a/datahub/server/lib/query_executor/all_executors.py b/datahub/server/lib/query_executor/all_executors.py index f8519579e..0a7597366 100644 --- a/datahub/server/lib/query_executor/all_executors.py +++ b/datahub/server/lib/query_executor/all_executors.py @@ -26,12 +26,15 @@ ] + ALL_PLUGIN_EXECUTORS -def get_executor_class(name: str): +def get_executor_class(language: str, name: str): for executor in ALL_EXECUTORS: - if executor.EXECUTOR_NAME() == name: + if ( + executor.EXECUTOR_LANGUAGE() == language + and executor.EXECUTOR_NAME() == name + ): return executor - raise ValueError(f"Unknown executor name {name}") + raise ValueError(f"Unknown executor {name} with language {language}") # Re-export parse_exception diff --git a/datahub/server/lib/query_executor/base_executor.py b/datahub/server/lib/query_executor/base_executor.py index 27d3dbd5a..2dcbd8fc7 100644 --- a/datahub/server/lib/query_executor/base_executor.py +++ b/datahub/server/lib/query_executor/base_executor.py @@ -444,7 +444,8 @@ def EXECUTOR_LANGUAGE(cls) -> str: @abstractclassmethod def EXECUTOR_NAME(cls) -> str: - """Distinct name for the executor + """Distinct name for the executor. + Must be distinct under for the same language. """ raise NotImplementedError diff --git a/datahub/server/lib/query_executor/executors/sqlalchemy.py b/datahub/server/lib/query_executor/executors/sqlalchemy.py index 16a6821a9..2b09f1d84 100644 --- a/datahub/server/lib/query_executor/executors/sqlalchemy.py +++ b/datahub/server/lib/query_executor/executors/sqlalchemy.py @@ -1,5 +1,6 @@ import re from sqlalchemy.exc import SQLAlchemyError, DBAPIError +from snowflake.connector import errors as sf_errors from const.query_execution import QueryExecutionErrorType from lib.query_executor.base_executor import QueryExecutorBaseClass @@ -48,7 +49,7 @@ def EXECUTOR_LANGUAGE(cls): class DruidQueryExecutor(SqlAlchemyQueryExecutor): @classmethod def EXECUTOR_NAME(cls): - return "druid-sqlalchemy" + return "sqlalchemy" @classmethod def EXECUTOR_LANGUAGE(cls): @@ -73,3 +74,16 @@ def EXECUTOR_NAME(cls): @classmethod def EXECUTOR_LANGUAGE(cls): return "snowflake" + + def _parse_exception(self, e): + if isinstance(e, SQLAlchemyError): + orig_error = getattr(e, "orig", None) + + if isinstance(orig_error, sf_errors.ProgrammingError): + message = orig_error.msg + match = re.search(r"error line (\d+) at position (\d+)", message) + if match is not None: + return get_parsed_syntax_error( + message, int(match.group(1)) - 1, int(match.group(2)) + ) + return super(SnowflakeQueryExecutor, self)._parse_exception(e) diff --git a/datahub/server/lib/utils/execute_query.py b/datahub/server/lib/utils/execute_query.py index 34e11f365..3f199e52d 100644 --- a/datahub/server/lib/utils/execute_query.py +++ b/datahub/server/lib/utils/execute_query.py @@ -37,7 +37,7 @@ def __call__( user = get_user_by_id(uid, session=session) client_settings["proxy_user"] = user.username - executor = get_executor_class(engine.executor) + executor = get_executor_class(engine.language, engine.executor) if executor.SINGLE_QUERY_QUERY_ENGINE(): statements = [query] diff --git a/datahub/server/tasks/run_query.py b/datahub/server/tasks/run_query.py index 41f7917d4..a0c0bd744 100644 --- a/datahub/server/tasks/run_query.py +++ b/datahub/server/tasks/run_query.py @@ -43,7 +43,6 @@ def run_query_task(self, query_execution_id): user = user_logic.get_user_by_id(uid) engine = admin_logic.get_query_engine_by_id(engine_id) - executor_type = engine.executor executor_params = { "query_execution_id": query_execution_id, @@ -56,7 +55,9 @@ def run_query_task(self, query_execution_id): }, } - executor = get_executor_class(executor_type)(**executor_params) + executor = get_executor_class(engine.language, engine.executor)( + **executor_params + ) while True: if self.is_aborted():