Skip to content

Commit

Permalink
Adding exporting functionality to scheduled data docs (#60)
Browse files Browse the repository at this point in the history
Added gspread exporter plugin
Make gspread export acquire google sheet user permission before export
Added new data doc schedule config to allow export any cell result to any exporter plugin
Added optional options for exporter plugin
Added notification feature for data doc schedule
Made sure data doc schedule success/failure is correctly reflected in logs
Added manual run scheduled data doc functionality
#60
  • Loading branch information
czgu authored May 20, 2020
1 parent 804f6d7 commit 8e067f2
Show file tree
Hide file tree
Showing 53 changed files with 1,298 additions and 276 deletions.
16 changes: 16 additions & 0 deletions datahub/email_templates/datadoc_completion_notification.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<style type="text/css">
p {
font-size: 16px;
}
</style>
</head>

<body>
<p>
{{ message }}
</p>
</body>
</html>
13 changes: 5 additions & 8 deletions datahub/server/app/auth/google_auth.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
import requests
from app.auth.oauth_auth import OAuthLoginManager, OAUTH_CALLBACK_PATH
from env import DataHubSettings

GOOGLE_AUTH_CONFIG = "https://accounts.google.com/.well-known/openid-configuration"
from clients.google_client import get_google_oauth_config


class GoogleLoginManager(OAuthLoginManager):
@property
def oauth_config(self):
if not hasattr(self, "_cached_google_config"):
self._cached_google_config = requests.get(GOOGLE_AUTH_CONFIG).json()
google_config = get_google_oauth_config()

return {
"callback_url": "{}{}".format(
DataHubSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH
),
"client_id": DataHubSettings.OAUTH_CLIENT_ID,
"client_secret": DataHubSettings.OAUTH_CLIENT_SECRET,
"authorization_url": self._cached_google_config["authorization_endpoint"],
"token_url": self._cached_google_config["token_endpoint"],
"profile_url": self._cached_google_config["userinfo_endpoint"],
"authorization_url": google_config["authorization_endpoint"],
"token_url": google_config["token_endpoint"],
"profile_url": google_config["userinfo_endpoint"],
"scope": [
"https://www.googleapis.com/auth/userinfo.email",
"openid",
Expand Down
21 changes: 10 additions & 11 deletions datahub/server/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,21 @@ def func(*args, **kwargs):
session = get_session()()
kwargs["session"] = session

try:
return fn(*args, **kwargs)
except SQLAlchemyError as e:
if session:
if session is not None:
try:
return fn(*args, **kwargs)
except SQLAlchemyError as e:
session.rollback()

# TODO: Log the sqlalchemy error?
import traceback

LOG.error(traceback.format_exc())
else:
raise e
finally:
# If we created the session, close it.
if session:
finally:
# Since we created the session, close it.
get_session().remove()
else:
return fn(*args, **kwargs)

return func

Expand All @@ -110,13 +109,13 @@ def DBSession():
session = get_session()()
try:
yield session
except SQLAlchemyError:
except SQLAlchemyError as e:
session.rollback()

# TODO: Log the sqlalchemy error?
import traceback

LOG.error(traceback.format_exc())
raise e
finally:
get_session().remove()

Expand Down
8 changes: 4 additions & 4 deletions datahub/server/app/flask_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys

from celery import Celery
from flask import Flask, Blueprint, json as flask_json, has_app_context
from flask import Flask, Blueprint, json as flask_json, has_request_context
from flask_socketio import SocketIO
from flask_login import current_user
from flask_limiter import Limiter
Expand Down Expand Up @@ -77,9 +77,9 @@ class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
# If app context is already present then call the function
# app context is provided if the task run sychronously
if has_app_context():
# If request context is already present then the celery task is called
# sychronously in a request, so no need to generate a new app context
if has_request_context():
return TaskBase.__call__(self, *args, **kwargs)
# Otherwise in worker, we create the context and run
with app.app_context():
Expand Down
13 changes: 13 additions & 0 deletions datahub/server/clients/google_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import datetime
from urllib.parse import quote

import requests

from env import DataHubSettings
from .common import ChunkReader, FileDoesNotExist
from lib.utils.utils import DATETIME_TO_UTC
Expand All @@ -26,6 +28,17 @@ def get_google_credentials(creds_info=None):
return credentials


GOOGLE_AUTH_CONFIG = "https://accounts.google.com/.well-known/openid-configuration"
_cached_google_oauth_config = None


def get_google_oauth_config():
global _cached_google_oauth_config
if _cached_google_oauth_config is None:
_cached_google_oauth_config = requests.get(GOOGLE_AUTH_CONFIG).json()
return _cached_google_oauth_config


# Reference used: https://dev.to/sethmlarson/python-data-streaming-to-google-cloud-storage-with-resumable-uploads-458h
class GoogleUploadClient(object):
def __init__(
Expand Down
3 changes: 1 addition & 2 deletions datahub/server/const/data_doc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from enum import Enum

# KEEP IT CONSISTENT AS config/datadoc.yaml


# KEEP IT CONSISTENT AS config/datadoc.yaml
class DataCellType(Enum):
query = 0
text = 1
Expand Down
6 changes: 6 additions & 0 deletions datahub/server/const/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ class TaskRunStatus(Enum):
RUNNING = 0
SUCCESS = 1
FAILURE = 2


class NotifyOn(Enum):
ALL = 0
ON_FAILURE = 1
ON_SUCCESS = 2
99 changes: 67 additions & 32 deletions datahub/server/datasources/datadoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from env import DataHubSettings

from lib.celery.cron import validate_cron
from lib.form import validate_form
from lib.export.all_exporters import get_exporter
from lib.notification import simple_email, render_html
from lib.logger import get_logger

Expand All @@ -24,8 +26,8 @@
)
from logic.datadoc_permission import assert_can_read, assert_can_write
from logic.query_execution import get_query_execution_by_id
from logic.schedule import run_and_log_scheduled_task
from models.environment import Environment
from tasks.run_datadoc import run_datadoc

LOG = get_logger(__file__)

Expand Down Expand Up @@ -243,27 +245,6 @@ def delete_favorite_data_doc(
logic.unfavorite_data_doc(data_doc_id=data_doc_id, uid=uid)


@register("/datadoc/<int:id>/run/", methods=["GET"])
def get_datadoc_schedule_run(id):
with DBSession() as session:
assert_can_read(id, session=session)
verify_data_doc_permission(id, session=session)

runs, _ = schedule_logic.get_task_run_record_run_by_name(
name=get_data_doc_schedule_name(id), session=session
)
return runs


@register("/datadoc/<int:id>/run/", methods=["POST"])
def run_data_doc(id):
with DBSession() as session:
assert_can_write(id, session=session)
verify_data_doc_permission(id, session=session)

run_datadoc.apply_async(args=[id])


def get_data_doc_schedule_name(id: int):
return f"run_data_doc_{id}"

Expand All @@ -278,38 +259,61 @@ def get_datadoc_schedule(id):
return schedule_logic.get_task_schedule_by_name(schedule_name, session=session)


def validate_datadoc_schedule_kwargs(kwargs):
allowed_keys = [
"notify_with",
"notify_on",
"exporter_cell_id",
"exporter_name",
"exporter_params",
]
for key in kwargs.keys():
api_assert(key in allowed_keys, "Invalid field {}".format(key))
if "exporter_name" in kwargs:
exporter_name = kwargs["exporter_name"]
exporter = get_exporter(exporter_name)
api_assert(exporter is not None, "Invalid exporter {}".format(exporter_name))

exporter_params = kwargs.get("exporter_params", {})
exporter_form = exporter.export_form
if not (exporter_form is None and not exporter_params):
valid, reason = validate_form(exporter_form, exporter_params)
api_assert(valid, "Invalid exporter params, reason: " + reason)


@register("/datadoc/<int:id>/schedule/", methods=["POST"])
def create_datadoc_schedule(
id, cron,
id, cron, kwargs,
):
schedule_name = get_data_doc_schedule_name(id)
validate_datadoc_schedule_kwargs(kwargs)
api_assert(validate_cron(cron), "Invalid cron expression")

schedule_name = get_data_doc_schedule_name(id)
with DBSession() as session:
assert_can_write(id, session=session)
data_doc = logic.get_data_doc_by_id(id, session=session)
verify_environment_permission([data_doc.environment_id])

api_assert(validate_cron(cron), "Invalid cron expression")

return schedule_logic.create_task_schedule(
schedule_name,
"tasks.run_datadoc.run_datadoc",
cron=cron,
kwargs={"doc_id": id},
kwargs={**kwargs, "user_id": current_user.id, "doc_id": id},
task_type="user",
session=session,
)


@register("/datadoc/<int:id>/schedule/", methods=["PUT"])
def update_datadoc_schedule(
id, cron=None, enabled=None,
):
def update_datadoc_schedule(id, cron=None, enabled=None, kwargs=None):
if kwargs is not None:
validate_datadoc_schedule_kwargs(kwargs)
if cron is not None:
api_assert(validate_cron(cron), "Invalid cron expression")

schedule_name = get_data_doc_schedule_name(id)
with DBSession() as session:
assert_can_write(id, session=session)
if cron is not None:
api_assert(validate_cron(cron), "Invalid cron expression")

schedule = schedule_logic.get_task_schedule_by_name(
schedule_name, session=session
Expand All @@ -322,6 +326,12 @@ def update_datadoc_schedule(
updated_fields["cron"] = cron
if enabled is not None:
updated_fields["enabled"] = enabled
if kwargs is not None:
updated_fields["kwargs"] = {
**kwargs,
"user_id": current_user.id,
"doc_id": id,
}

return schedule_logic.update_task_schedule(
schedule.id, session=session, **updated_fields,
Expand All @@ -342,6 +352,31 @@ def delete_datadoc_schedule(id):
schedule_logic.delete_task_schedule(schedule.id, session=session)


@register("/datadoc/<int:id>/schedule/logs/", methods=["GET"])
def get_datadoc_schedule_run(id):
with DBSession() as session:
assert_can_read(id, session=session)
verify_data_doc_permission(id, session=session)

runs, _ = schedule_logic.get_task_run_record_run_by_name(
name=get_data_doc_schedule_name(id), session=session
)
return runs


@register("/datadoc/<int:id>/schedule/run/", methods=["POST"])
def run_data_doc(id):
schedule_name = get_data_doc_schedule_name(id)
with DBSession() as session:
assert_can_write(id, session=session)
verify_data_doc_permission(id, session=session)
schedule = schedule_logic.get_task_schedule_by_name(
schedule_name, session=session
)
api_assert(schedule, "Schedule does not exist")
run_and_log_scheduled_task(schedule.id, session=session)


@register("/datadoc/<int:doc_id>/editor/", methods=["GET"])
def get_datadoc_editors(doc_id):
return logic.get_data_doc_editors_by_doc_id(doc_id)
Expand Down
25 changes: 18 additions & 7 deletions datahub/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
verify_query_engine_permission,
)
from clients.s3_client import FileDoesNotExist
from lib.export.all_exporters import ALL_EXPORTERS, get_exporter_class
from lib.export.all_exporters import ALL_EXPORTERS, get_exporter
from lib.result_store import GenericReader
from lib.query_analysis.templating import (
render_templated_query,
Expand Down Expand Up @@ -344,13 +344,24 @@ def delete_query_execution_notification(
)


@register("/statement_execution_exporter/", methods=["GET"], require_auth=True)
def get_all_statement_execution_exporters():
@register("/query_execution_exporter/", methods=["GET"], require_auth=True)
def get_all_query_result_exporters():
return ALL_EXPORTERS


@register(
"/statement_execution/<int:statement_execution_id>/export/",
"/query_execution_exporter/auth/", methods=["GET"],
)
def export_statement_execution_acquire_auth(export_name):
exporter = get_exporter(export_name)
api_assert(exporter is not None, f"Invalid export name {export_name}")
if not exporter.requires_auth:
return None
return exporter.acquire_auth(current_user.id)


@register(
"/query_execution_exporter/statement_execution/<int:statement_execution_id>/",
methods=["GET"],
require_auth=True,
)
Expand All @@ -366,9 +377,9 @@ def export_statement_execution_result(statement_execution_id, export_name):
statement_execution.query_execution_id, session=session
)

exporter_class = get_exporter_class(export_name)
api_assert(exporter_class is not None, f"Invalid export name {export_name}")
return exporter_class.export(statement_execution_id, current_user.id)
exporter = get_exporter(export_name)
api_assert(exporter is not None, f"Invalid export name {export_name}")
return exporter.export(statement_execution_id, current_user.id)


@register("/query_execution/templated_query/", methods=["POST"], require_auth=True)
Expand Down
4 changes: 2 additions & 2 deletions datahub/server/lib/export/all_exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
ALL_EXPORTERS = ALL_PLUGIN_EXPORTERS


def get_exporter_class(name: str):
def get_exporter(name: str):
for exporter in ALL_EXPORTERS:
if exporter.EXPORTER_NAME() == name:
if exporter.exporter_name == name:
return exporter
raise ValueError(f"Unknown exporter name {name}")
Loading

0 comments on commit 8e067f2

Please sign in to comment.