Skip to content

Commit

Permalink
Refactor run_query into multiple modules (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
czgu authored Jan 25, 2021
1 parent 5774ae6 commit 5c4b0af
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 194 deletions.
14 changes: 14 additions & 0 deletions querybook/server/lib/query_executor/exc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class QueryExecutorException(Exception):
pass


# AlreadyExecutedException
# This error will happen since we turned acks_late = True
# So in the event of worker unexpected crash, the task
# will get reassigned
class AlreadyExecutedException(QueryExecutorException):
pass


class InvalidQueryExecution(QueryExecutorException):
pass
96 changes: 96 additions & 0 deletions querybook/server/lib/query_executor/executor_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from app.db import with_session
from const.query_execution import QueryExecutionStatus
from lib.logger import get_logger
from lib.query_analysis import get_statement_ranges
from lib.query_analysis.lineage import process_query
from logic import (
admin as admin_logic,
query_execution as qe_logic,
user as user_logic,
)
from .exc import AlreadyExecutedException, InvalidQueryExecution
from .all_executors import get_executor_class

LOG = get_logger(__file__)


@with_session
def create_executor_from_execution(query_execution_id, celery_task, session=None):
executor_params, engine = _get_executor_params_and_engine(
query_execution_id, celery_task=celery_task, session=session
)
executor = get_executor_class(engine.language, engine.executor)(**executor_params)
return executor


@with_session
def _get_executor_params_and_engine(query_execution_id, celery_task, session=None):
query, statement_ranges, uid, engine_id = _get_query_execution_info(
query_execution_id, session=session
)
user = user_logic.get_user_by_id(uid, session=session)
engine = admin_logic.get_query_engine_by_id(engine_id, session=session)

return (
{
"query_execution_id": query_execution_id,
"celery_task": celery_task,
"query": query,
"statement_ranges": statement_ranges,
"client_setting": {
**engine.get_engine_params(),
"proxy_user": user.username,
},
},
engine,
)


@with_session
def _get_query_execution_info(query_execution_id, session=None):
query_execution = qe_logic.get_query_execution_by_id(
query_execution_id, session=session
)
if not query_execution:
raise InvalidQueryExecution(f"Query {query_execution_id} does not exist")
if query_execution.status != QueryExecutionStatus.INITIALIZED:
# Double check to see query has been executed since
# it could be re-inserted after celery worker failure
raise AlreadyExecutedException(
f"Query {query_execution_id} is already executed. This is likely caused by a worker crash."
)

query = query_execution.query
statement_ranges = get_statement_ranges(query)
uid = query_execution.uid
engine_id = query_execution.engine_id

_assert_safe_query(query, engine_id, session=session)
return query, statement_ranges, uid, engine_id


@with_session
def _assert_safe_query(query, engine_id, session=None):
try:
from lib.metastore.utils import MetastoreTableACLChecker

LOG.debug("assert_safe_query")
table_per_statement, _ = process_query(query)
all_tables = [table for tables in table_per_statement for table in tables]

query_engine = admin_logic.get_query_engine_by_id(engine_id, session=session)
metastore = admin_logic.get_query_metastore_by_id(
query_engine.metastore_id, session=session
)
acl_checker = MetastoreTableACLChecker(metastore.acl_control)

for table in all_tables:
schema_name, table_name = table.split(".")
if not acl_checker.is_table_valid(schema_name, table_name):
raise InvalidQueryExecution(
f"Table {table} is not allowed by metastore"
)
except InvalidQueryExecution as e:
raise e
except Exception as e:
LOG.info(e)
61 changes: 61 additions & 0 deletions querybook/server/lib/query_executor/notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from app.db import with_session
from env import QuerybookSettings
from lib.notify.utils import notify_user
from logic import (
query_execution as qe_logic,
user as user_logic,
query_execution_permission as qe_perm_logic,
)


@with_session
def notifiy_on_execution_completion(query_execution_id, session=None):
query_execution = qe_logic.get_query_execution_by_id(
query_execution_id, session=session
)

notifications = query_execution.notifications
if len(notifications):
data_cell = next(iter(query_execution.cells), None)
# TODO: this should be determined by the notification.user?
# Come up with a more efficient way to determine env per user
env_name = getattr(
qe_perm_logic.get_default_user_environment_by_execution_id(
execution_id=query_execution_id,
uid=query_execution.uid,
session=session,
),
"name",
None,
)

# If the query execution is not associated with any environment
# then no notification can be done
if not env_name:
return

for notification in notifications:
uid = notification.user
user = user_logic.get_user_by_id(uid, session=session)
doc_id = None
cell_id = None
query_title = "Untitled"

if data_cell is not None:
cell_id = data_cell.id
doc_id = data_cell.doc.id
query_title = data_cell.meta.get("title", query_title)

notify_user(
user=user,
template_name="query_completion_notification",
template_params=dict(
query_execution=query_execution,
doc_id=doc_id,
cell_id=cell_id,
query_title=query_title,
public_url=QuerybookSettings.PUBLIC_URL,
env_name=env_name,
),
session=session,
)
Loading

0 comments on commit 5c4b0af

Please sign in to comment.