-
Notifications
You must be signed in to change notification settings - Fork 13.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Skeleton for remote query execution using celery. (#908)
* Carapal react mockup This is really just a mock up written in React to try different components. It could become scaffolding to build a prototype, or not. * Preliminary commit for Celery backend * Move the SQL query execution to the celery worker. * React scetch * Refactor SQL execution to use the celery if configured. * Refactor SQL execution to use the celery if configured. * Add query model * Remove QueryResult. Query has a tmp_table_name field that has all the data. * Add create table as wrapper. * Create table as * Address the comments. * Add trailing commas * Remove the init_query test. * Handle 'undefined' schema case
- Loading branch information
1 parent
c6d9202
commit 72a01ae
Showing
13 changed files
with
843 additions
and
76 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ dist | |
caravel.egg-info/ | ||
app.db | ||
*.bak | ||
.idea | ||
*.sqllite | ||
|
||
# Node.js, webpack artifacts | ||
*.entry.js | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 39 additions & 0 deletions
39
caravel/migrations/versions/ad82a75afd82_add_query_model.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
"""Update models to support storing the queries. | ||
Revision ID: ad82a75afd82 | ||
Revises: f162a1dea4c4 | ||
Create Date: 2016-07-25 17:48:12.771103 | ||
""" | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = 'ad82a75afd82' | ||
down_revision = 'f162a1dea4c4' | ||
|
||
from alembic import op | ||
import sqlalchemy as sa | ||
|
||
def upgrade(): | ||
op.create_table('query', | ||
sa.Column('id', sa.Integer(), nullable=False), | ||
sa.Column('database_id', sa.Integer(), nullable=False), | ||
sa.Column('tmp_table_name', sa.String(length=64), nullable=True), | ||
sa.Column('user_id', sa.Integer(), nullable=True), | ||
sa.Column('status', sa.String(length=16), nullable=True), | ||
sa.Column('name', sa.String(length=64), nullable=True), | ||
sa.Column('sql', sa.Text, nullable=True), | ||
sa.Column('limit', sa.Integer(), nullable=True), | ||
sa.Column('progress', sa.Integer(), nullable=True), | ||
sa.Column('start_time', sa.DateTime(), nullable=True), | ||
sa.Column('end_time', sa.DateTime(), nullable=True), | ||
sa.ForeignKeyConstraint(['database_id'], [u'dbs.id'], ), | ||
sa.ForeignKeyConstraint(['user_id'], [u'ab_user.id'], ), | ||
sa.PrimaryKeyConstraint('id') | ||
) | ||
op.add_column('dbs', sa.Column('select_as_create_table_as', sa.Boolean(), | ||
nullable=True)) | ||
|
||
|
||
def downgrade(): | ||
op.drop_table('query') | ||
op.drop_column('dbs', 'select_as_create_table_as') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
import celery | ||
from caravel import models, app, utils | ||
from datetime import datetime | ||
import logging | ||
from sqlalchemy import create_engine, select, text | ||
from sqlalchemy.orm import scoped_session, sessionmaker | ||
from sqlalchemy.sql.expression import TextAsFrom | ||
import sqlparse | ||
import pandas as pd | ||
|
||
celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG')) | ||
|
||
|
||
def is_query_select(sql): | ||
try: | ||
return sqlparse.parse(sql)[0].get_type() == 'SELECT' | ||
# Capture sqlparse exceptions, worker shouldn't fail here. | ||
except Exception: | ||
# TODO(bkyryliuk): add logging here. | ||
return False | ||
|
||
|
||
# if sqlparse provides the stream of tokens but don't provide the API | ||
# to access the table names, more on it: | ||
# https://groups.google.com/forum/#!topic/sqlparse/sL2aAi6dSJU | ||
# https://github.com/andialbrecht/sqlparse/blob/master/examples/ | ||
# extract_table_names.py | ||
# | ||
# Another approach would be to run the EXPLAIN on the sql statement: | ||
# https://prestodb.io/docs/current/sql/explain.html | ||
# https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain | ||
def get_tables(): | ||
"""Retrieves the query names from the query.""" | ||
# TODO(bkyryliuk): implement parsing the sql statement. | ||
pass | ||
|
||
|
||
def add_limit_to_the_query(sql, limit, eng): | ||
# Treat as single sql statement in case of failure. | ||
sql_statements = [sql] | ||
try: | ||
sql_statements = [s for s in sqlparse.split(sql) if s] | ||
except Exception as e: | ||
logging.info( | ||
"Statement " + sql + "failed to be transformed to have the limit " | ||
"with the exception" + e.message) | ||
return sql | ||
if len(sql_statements) == 1 and is_query_select(sql): | ||
qry = select('*').select_from( | ||
TextAsFrom(text(sql_statements[0]), ['*']).alias( | ||
'inner_qry')).limit(limit) | ||
sql_statement = str(qry.compile( | ||
eng, compile_kwargs={"literal_binds": True})) | ||
return sql_statement | ||
return sql | ||
|
||
|
||
# create table works only for the single statement. | ||
def create_table_as(sql, table_name, override=False): | ||
"""Reformats the query into the create table as query. | ||
Works only for the single select SQL statements, in all other cases | ||
the sql query is not modified. | ||
:param sql: string, sql query that will be executed | ||
:param table_name: string, will contain the results of the query execution | ||
:param override, boolean, table table_name will be dropped if true | ||
:return: string, create table as query | ||
""" | ||
# TODO(bkyryliuk): drop table if allowed, check the namespace and | ||
# the permissions. | ||
# Treat as single sql statement in case of failure. | ||
sql_statements = [sql] | ||
try: | ||
# Filter out empty statements. | ||
sql_statements = [s for s in sqlparse.split(sql) if s] | ||
except Exception as e: | ||
logging.info( | ||
"Statement " + sql + "failed to be transformed as create table as " | ||
"with the exception" + e.message) | ||
return sql | ||
if len(sql_statements) == 1 and is_query_select(sql): | ||
updated_sql = '' | ||
# TODO(bkyryliuk): use sqlalchemy statements for the | ||
# the drop and create operations. | ||
if override: | ||
updated_sql = 'DROP TABLE IF EXISTS {};\n'.format(table_name) | ||
updated_sql += "CREATE TABLE %s AS %s" % ( | ||
table_name, sql_statements[0]) | ||
return updated_sql | ||
return sql | ||
|
||
|
||
def get_session(): | ||
"""Creates new SQLAlchemy scoped_session.""" | ||
engine = create_engine( | ||
app.config.get('SQLALCHEMY_DATABASE_URI'), convert_unicode=True) | ||
return scoped_session(sessionmaker( | ||
autocommit=False, autoflush=False, bind=engine)) | ||
|
||
|
||
@celery_app.task | ||
def get_sql_results(database_id, sql, user_id, tmp_table_name="", schema=None): | ||
"""Executes the sql query returns the results. | ||
:param database_id: integer | ||
:param sql: string, query that will be executed | ||
:param user_id: integer | ||
:param tmp_table_name: name of the table for CTA | ||
:param schema: string, name of the schema (used in presto) | ||
:return: dataframe, query result | ||
""" | ||
# Create a separate session, reusing the db.session leads to the | ||
# concurrency issues. | ||
session = get_session() | ||
try: | ||
db_to_query = ( | ||
session.query(models.Database).filter_by(id=database_id).first() | ||
) | ||
except Exception as e: | ||
return { | ||
'error': utils.error_msg_from_exception(e), | ||
'success': False, | ||
} | ||
if not db_to_query: | ||
return { | ||
'error': "Database with id {0} is missing.".format(database_id), | ||
'success': False, | ||
} | ||
|
||
# TODO(bkyryliuk): provide a way for the user to name the query. | ||
# TODO(bkyryliuk): run explain query to derive the tables and fill in the | ||
# table_ids | ||
# TODO(bkyryliuk): check the user permissions | ||
# TODO(bkyryliuk): store the tab name in the query model | ||
limit = app.config.get('SQL_MAX_ROW', None) | ||
start_time = datetime.now() | ||
if not tmp_table_name: | ||
tmp_table_name = 'tmp.{}_table_{}'.format(user_id, start_time) | ||
query = models.Query( | ||
user_id=user_id, | ||
database_id=database_id, | ||
limit=limit, | ||
name='{}'.format(start_time), | ||
sql=sql, | ||
start_time=start_time, | ||
tmp_table_name=tmp_table_name, | ||
status=models.QueryStatus.IN_PROGRESS, | ||
) | ||
session.add(query) | ||
session.commit() | ||
query_result = get_sql_results_as_dict( | ||
db_to_query, sql, query.tmp_table_name, schema=schema) | ||
query.end_time = datetime.now() | ||
if query_result['success']: | ||
query.status = models.QueryStatus.FINISHED | ||
else: | ||
query.status = models.QueryStatus.FAILED | ||
session.commit() | ||
# TODO(bkyryliuk): return the tmp table / query_id | ||
return query_result | ||
|
||
|
||
# TODO(bkyryliuk): merge the changes made in the carapal first | ||
# before merging this PR. | ||
def get_sql_results_as_dict(db_to_query, sql, tmp_table_name, schema=None): | ||
"""Get the SQL query results from the give session and db connection. | ||
:param sql: string, query that will be executed | ||
:param db_to_query: models.Database to query, cannot be None | ||
:param tmp_table_name: name of the table for CTA | ||
:param schema: string, name of the schema (used in presto) | ||
:return: (dataframe, boolean), results and the status | ||
""" | ||
eng = db_to_query.get_sqla_engine(schema=schema) | ||
sql = sql.strip().strip(';') | ||
# TODO(bkyryliuk): fix this case for multiple statements | ||
if app.config.get('SQL_MAX_ROW'): | ||
sql = add_limit_to_the_query( | ||
sql, app.config.get("SQL_MAX_ROW"), eng) | ||
|
||
cta_used = False | ||
if (app.config.get('SQL_SELECT_AS_CTA') and | ||
db_to_query.select_as_create_table_as and is_query_select(sql)): | ||
# TODO(bkyryliuk): figure out if the query is select query. | ||
sql = create_table_as(sql, tmp_table_name) | ||
cta_used = True | ||
|
||
if cta_used: | ||
try: | ||
eng.execute(sql) | ||
return { | ||
'tmp_table': tmp_table_name, | ||
'success': True, | ||
} | ||
except Exception as e: | ||
return { | ||
'error': utils.error_msg_from_exception(e), | ||
'success': False, | ||
} | ||
|
||
# otherwise run regular SQL query. | ||
# TODO(bkyryliuk): rewrite into eng.execute as queries different from | ||
# select should be permitted too. | ||
try: | ||
df = db_to_query.get_df(sql, schema) | ||
df = df.fillna(0) | ||
return { | ||
'columns': [c for c in df.columns], | ||
'data': df.to_dict(orient='records'), | ||
'success': True, | ||
} | ||
|
||
except Exception as e: | ||
return { | ||
'error': utils.error_msg_from_exception(e), | ||
'success': False, | ||
} | ||
|
||
|
Oops, something went wrong.