Skip to content

Commit

Permalink
[bugfix] fix merge conflict that broke Hive support
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jul 28, 2017
1 parent e584a96 commit b2066da
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
6 changes: 5 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,12 @@ class CeleryConfig(object):
# configuration. These blueprints will get integrated in the app
BLUEPRINTS = []

try:
# Provide a callable that receives a tracking_url and returns another
# URL. This is used to translate internal Hadoop job tracker URL
# into a proxied one
TRACKING_URL_TRANSFORMER = lambda x: x

try:
if CONFIG_PATH_ENV_VAR in os.environ:
# Explicitly import config module that is not in pythonpath; useful
# for case where app is being executed via pex.
Expand Down
35 changes: 23 additions & 12 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@

from superset.utils import SupersetTemplateException
from superset.utils import QueryStatus
from superset import utils
from superset import cache_util
from superset import conf, cache_util, utils

tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')

Grain = namedtuple('Grain', 'name label function')

Expand Down Expand Up @@ -683,7 +684,7 @@ def adjust_database_uri(cls, uri, selected_schema=None):
@classmethod
def progress(cls, log_lines):
total_jobs = 1 # assuming there's at least 1 job
current_job = None
current_job = 1
stages = {}
for line in log_lines:
match = cls.jobs_stats_r.match(line)
Expand All @@ -692,6 +693,7 @@ def progress(cls, log_lines):
match = cls.launching_job_r.match(line)
if match:
current_job = int(match.groupdict()['job_number'])
total_jobs = int(match.groupdict()['max_jobs']) or 1
stages = {}
match = cls.stage_progress_r.match(line)
if match:
Expand All @@ -701,10 +703,9 @@ def progress(cls, log_lines):
stages[stage_number] = (map_progress + reduce_progress) / 2
logging.info(
"Progress detail: {}, "
"total jobs: {}".format(stages, total_jobs))
"current job {}, "
"total jobs: {}".format(stages, current_job, total_jobs))

if not total_jobs or not current_job:
return 0
stage_progress = sum(
stages.values()) / len(stages.values()) if stages else 0

Expand All @@ -731,18 +732,16 @@ def handle_cursor(cls, cursor, query, session):
polled = cursor.poll()
last_log_line = 0
tracking_url = None
job_id = None
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

resp = cursor.fetch_logs()
if resp and resp.log:
log = resp.log or ''
log_lines = resp.log.splitlines()
logging.info("\n".join(log_lines[last_log_line:]))
last_log_line = len(log_lines) - 1
log = cursor.fetch_logs() or ''
if log:
log_lines = log.splitlines()
progress = cls.progress(log_lines)
logging.info("Progress total: {}".format(progress))
needs_commit = False
Expand All @@ -754,8 +753,20 @@ def handle_cursor(cls, cursor, query, session):
if tracking_url:
logging.info(
"Found the tracking url: {}".format(tracking_url))
tracking_url = tracking_url_trans(tracking_url)
logging.info(
"Transformation applied: {}".format(tracking_url))
query.tracking_url = tracking_url
job_id = tracking_url.split('/')[-2]
logging.info("Job id: {}".format(job_id))
needs_commit = True
if job_id and len(log_lines) > last_log_line:
# Wait for job id before logging things out
# this allows for prefixing all log lines and becoming
# searchable in something like Kibana
for l in log_lines[last_log_line:]:
logging.info("[{}] {}".format(job_id, l))
last_log_line = len(log_lines)
if needs_commit:
session.commit()
time.sleep(5)
Expand Down

0 comments on commit b2066da

Please sign in to comment.