Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skeleton for remote query execution using celery. #908

Merged
merged 14 commits into from
Aug 10, 2016
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ dist
caravel.egg-info/
app.db
*.bak
.idea
*.sqllite

# Node.js, webpack artifacts
*.entry.js
Expand Down
22 changes: 21 additions & 1 deletion caravel/bin/caravel
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ from __future__ import print_function
from __future__ import unicode_literals

import logging
import celery
from celery.bin import worker as celery_worker
from datetime import datetime
from subprocess import Popen
import textwrap

from flask_migrate import MigrateCommand
from flask_script import Manager
Expand Down Expand Up @@ -127,5 +128,24 @@ def refresh_druid():
session.commit()


@manager.command
def worker():
"""Starts a Caravel worker for async SQL query execution."""
# celery -A tasks worker --loglevel=info
print("Starting SQL Celery worker.")
if config.get('CELERY_CONFIG'):
print("Celery broker url: ")
print(config.get('CELERY_CONFIG').BROKER_URL)

application = celery.current_app._get_current_object()
c_worker = celery_worker.worker(app=application)
options = {
'broker': config.get('CELERY_CONFIG').BROKER_URL,
'loglevel': 'INFO',
'traceback': True,
}
c_worker.run(**options)


if __name__ == "__main__":
manager.run()
16 changes: 16 additions & 0 deletions caravel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,22 @@

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ""
# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = ('caravel.tasks', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None

try:
from caravel_config import * # noqa
Expand All @@ -188,3 +203,4 @@

if not CACHE_DEFAULT_TIMEOUT:
CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT')

39 changes: 39 additions & 0 deletions caravel/migrations/versions/ad82a75afd82_add_query_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Update models to support storing the queries.

Revision ID: ad82a75afd82
Revises: f162a1dea4c4
Create Date: 2016-07-25 17:48:12.771103

"""

# revision identifiers, used by Alembic.
revision = 'ad82a75afd82'
down_revision = 'f162a1dea4c4'

from alembic import op
import sqlalchemy as sa

def upgrade():
op.create_table('query',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('database_id', sa.Integer(), nullable=False),
sa.Column('tmp_table_name', sa.String(length=64), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=16), nullable=True),
sa.Column('name', sa.String(length=64), nullable=True),
sa.Column('sql', sa.Text, nullable=True),
sa.Column('limit', sa.Integer(), nullable=True),
sa.Column('progress', sa.Integer(), nullable=True),
sa.Column('start_time', sa.DateTime(), nullable=True),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['database_id'], [u'dbs.id'], ),
sa.ForeignKeyConstraint(['user_id'], [u'ab_user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.add_column('dbs', sa.Column('select_as_create_table_as', sa.Boolean(),
nullable=True))


def downgrade():
op.drop_table('query')
op.drop_column('dbs', 'select_as_create_table_as')
42 changes: 40 additions & 2 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
from pydruid.utils.having import Aggregation
from six import string_types
from sqlalchemy import (
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date,
Table, create_engine, MetaData, desc, asc, select, and_, func)
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date, Table,
create_engine, MetaData, desc, asc, select, and_, func
)
from sqlalchemy.engine import reflection
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship
Expand Down Expand Up @@ -378,6 +379,7 @@ class Database(Model, AuditMixinNullable):
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=True)
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
Expand Down Expand Up @@ -1701,3 +1703,39 @@ class FavStar(Model):
class_name = Column(String(50))
obj_id = Column(Integer)
dttm = Column(DateTime, default=func.now())


class QueryStatus:
SCHEDULED = 'SCHEDULED'
CANCELLED = 'CANCELLED'
IN_PROGRESS = 'IN_PROGRESS'
FINISHED = 'FINISHED'
TIMED_OUT = 'TIMED_OUT'
FAILED = 'FAILED'


class Query(Model):

"""ORM model for SQL query"""

__tablename__ = 'query'
id = Column(Integer, primary_key=True)

database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)

# Store the tmp table into the DB only if the user asks for it.
tmp_table_name = Column(String(64))
user_id = Column(Integer, ForeignKey('ab_user.id'), nullable=True)

# models.QueryStatus
status = Column(String(16))

name = Column(String(64))
sql = Column(Text)
# Could be configured in the caravel config
limit = Column(Integer)

# 1..100
progress = Column(Integer)
start_time = Column(DateTime)
end_time = Column(DateTime)
219 changes: 219 additions & 0 deletions caravel/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import celery
from caravel import models, app, utils
from datetime import datetime
import logging
from sqlalchemy import create_engine, select, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.sql.expression import TextAsFrom
import sqlparse
import pandas as pd

celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG'))


def is_query_select(sql):
try:
return sqlparse.parse(sql)[0].get_type() == 'SELECT'
# Capture sqlparse exceptions, worker shouldn't fail here.
except Exception:
# TODO(bkyryliuk): add logging here.
return False


# if sqlparse provides the stream of tokens but don't provide the API
# to access the table names, more on it:
# https://groups.google.com/forum/#!topic/sqlparse/sL2aAi6dSJU
# https://github.com/andialbrecht/sqlparse/blob/master/examples/
# extract_table_names.py
#
# Another approach would be to run the EXPLAIN on the sql statement:
# https://prestodb.io/docs/current/sql/explain.html
# https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain
def get_tables():
"""Retrieves the query names from the query."""
# TODO(bkyryliuk): implement parsing the sql statement.
pass


def add_limit_to_the_query(sql, limit, eng):
# Treat as single sql statement in case of failure.
sql_statements = [sql]
try:
sql_statements = [s for s in sqlparse.split(sql) if s]
except Exception as e:
logging.info(
"Statement " + sql + "failed to be transformed to have the limit "
"with the exception" + e.message)
return sql
if len(sql_statements) == 1 and is_query_select(sql):
qry = select('*').select_from(
TextAsFrom(text(sql_statements[0]), ['*']).alias(
'inner_qry')).limit(limit)
sql_statement = str(qry.compile(
eng, compile_kwargs={"literal_binds": True}))
return sql_statement
return sql


# create table works only for the single statement.
def create_table_as(sql, table_name, override=False):
"""Reformats the query into the create table as query.

Works only for the single select SQL statements, in all other cases
the sql query is not modified.
:param sql: string, sql query that will be executed
:param table_name: string, will contain the results of the query execution
:param override, boolean, table table_name will be dropped if true
:return: string, create table as query
"""
# TODO(bkyryliuk): drop table if allowed, check the namespace and
# the permissions.
# Treat as single sql statement in case of failure.
sql_statements = [sql]
try:
# Filter out empty statements.
sql_statements = [s for s in sqlparse.split(sql) if s]
except Exception as e:
logging.info(
"Statement " + sql + "failed to be transformed as create table as "
"with the exception" + e.message)
return sql
if len(sql_statements) == 1 and is_query_select(sql):
updated_sql = ''
# TODO(bkyryliuk): use sqlalchemy statements for the
# the drop and create operations.
if override:
updated_sql = 'DROP TABLE IF EXISTS {};\n'.format(table_name)
updated_sql += "CREATE TABLE %s AS %s" % (
table_name, sql_statements[0])
return updated_sql
return sql


def get_session():
"""Creates new SQLAlchemy scoped_session."""
engine = create_engine(
app.config.get('SQLALCHEMY_DATABASE_URI'), convert_unicode=True)
return scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))


@celery_app.task
def get_sql_results(database_id, sql, user_id, tmp_table_name="", schema=None):
"""Executes the sql query returns the results.

:param database_id: integer
:param sql: string, query that will be executed
:param user_id: integer
:param tmp_table_name: name of the table for CTA
:param schema: string, name of the schema (used in presto)
:return: dataframe, query result
"""
# Create a separate session, reusing the db.session leads to the
# concurrency issues.
session = get_session()
try:
db_to_query = (
session.query(models.Database).filter_by(id=database_id).first()
)
except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}
if not db_to_query:
return {
'error': "Database with id {0} is missing.".format(database_id),
'success': False,
}

# TODO(bkyryliuk): provide a way for the user to name the query.
# TODO(bkyryliuk): run explain query to derive the tables and fill in the
# table_ids
# TODO(bkyryliuk): check the user permissions
# TODO(bkyryliuk): store the tab name in the query model
limit = app.config.get('SQL_MAX_ROW', None)
start_time = datetime.now()
if not tmp_table_name:
tmp_table_name = 'tmp.{}_table_{}'.format(user_id, start_time)
query = models.Query(
user_id=user_id,
database_id=database_id,
limit=limit,
name='{}'.format(start_time),
sql=sql,
start_time=start_time,
tmp_table_name=tmp_table_name,
status=models.QueryStatus.IN_PROGRESS,
)
session.add(query)
session.commit()
query_result = get_sql_results_as_dict(
db_to_query, sql, query.tmp_table_name, schema=schema)
query.end_time = datetime.now()
if query_result['success']:
query.status = models.QueryStatus.FINISHED
else:
query.status = models.QueryStatus.FAILED
session.commit()
# TODO(bkyryliuk): return the tmp table / query_id
return query_result


# TODO(bkyryliuk): merge the changes made in the carapal first
# before merging this PR.
def get_sql_results_as_dict(db_to_query, sql, tmp_table_name, schema=None):
"""Get the SQL query results from the give session and db connection.

:param sql: string, query that will be executed
:param db_to_query: models.Database to query, cannot be None
:param tmp_table_name: name of the table for CTA
:param schema: string, name of the schema (used in presto)
:return: (dataframe, boolean), results and the status
"""
eng = db_to_query.get_sqla_engine(schema=schema)
sql = sql.strip().strip(';')
# TODO(bkyryliuk): fix this case for multiple statements
if app.config.get('SQL_MAX_ROW'):
sql = add_limit_to_the_query(
sql, app.config.get("SQL_MAX_ROW"), eng)

cta_used = False
if (app.config.get('SQL_SELECT_AS_CTA') and
db_to_query.select_as_create_table_as and is_query_select(sql)):
# TODO(bkyryliuk): figure out if the query is select query.
sql = create_table_as(sql, tmp_table_name)
cta_used = True

if cta_used:
try:
eng.execute(sql)
return {
'tmp_table': tmp_table_name,
'success': True,
}
except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}

# otherwise run regular SQL query.
# TODO(bkyryliuk): rewrite into eng.execute as queries different from
# select should be permitted too.
try:
df = db_to_query.get_df(sql, schema)
df = df.fillna(0)
return {
'columns': [c for c in df.columns],
'data': df.to_dict(orient='records'),
'success': True,
}

except Exception as e:
return {
'error': utils.error_msg_from_exception(e),
'success': False,
}


Loading