Skip to content
This repository has been archived by the owner on Dec 5, 2019. It is now read-only.

Commit

Permalink
Fix #477: Record LogUri with job history
Browse files Browse the repository at this point in the history
  • Loading branch information
robhudson committed May 24, 2017
1 parent fe6728f commit 5f31918
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 4 deletions.
24 changes: 24 additions & 0 deletions atmo/jobs/migrations/0034_add_log_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.1 on 2017-05-24 23:41
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('jobs', '0033_rename_scheduled_date'),
]

operations = [
migrations.AlterModelOptions(
name='sparkjobrun',
options={'get_latest_by': 'created_at', 'ordering': ['-created_at']},
),
migrations.AddField(
model_name='sparkjobrun',
name='log_uri',
field=models.CharField(blank=True, default='', max_length=255),
),
]
8 changes: 7 additions & 1 deletion atmo/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def run(self):
# if the job ran before and is still running, don't start it again
if not self.is_runnable:
return
jobflow_id = self.provisioner.run(
jobflow_id, job_flow_params = self.provisioner.run(
user_username=self.created_by.username,
user_email=self.created_by.email,
identifier=self.identifier,
Expand All @@ -237,6 +237,7 @@ def run(self):
run = self.runs.create(
spark_job=self,
jobflow_id=jobflow_id,
log_uri=job_flow_params.get('LogUri', ''),
scheduled_at=timezone.now(),
emr_release_version=self.emr_release.version,
size=self.size,
Expand Down Expand Up @@ -327,6 +328,11 @@ class SparkJobRun(EditedAtModel):
blank=True,
null=True,
)
log_uri = models.CharField(
max_length=255,
blank=True,
default='',
)
status = models.CharField(
max_length=50,
blank=True,
Expand Down
2 changes: 1 addition & 1 deletion atmo/jobs/provisioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def run(self, user_username, user_email, identifier, emr_release, size,
})

cluster = self.emr.run_job_flow(**job_flow_params)
return cluster['JobFlowId']
return (cluster['JobFlowId'], job_flow_params)

def results(self, identifier, is_public):
if is_public:
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def sparkjob_provisioner_mocks(mocker):
),
'run': mocker.patch(
'atmo.jobs.provisioners.SparkJobProvisioner.run',
return_value='12345',
return_value=('12345',
{'LogUri': 's3://log-bucket/path/to/logs.tar.gz'}),
),
'remove': mocker.patch(
'atmo.jobs.provisioners.SparkJobProvisioner.remove',
Expand Down
11 changes: 11 additions & 0 deletions tests/jobs/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,14 @@ def test_expires(mocker, now, spark_job_factory, cluster_provisioner_mocks):
spark_job.save()
spark_job.refresh_from_db()
assert not spark_job.expired_date


@freeze_time('2016-04-05 13:25:47')
def test_run_stores_info_latest_run(spark_job, sparkjob_provisioner_mocks):
spark_job.run()

assert spark_job.latest_run.jobflow_id == '12345'
assert spark_job.latest_run.log_uri == 's3://log-bucket/path/to/logs.tar.gz'
assert spark_job.latest_run.emr_release_version == spark_job.emr_release.version
assert spark_job.latest_run.size == spark_job.size
assert spark_job.latest_run.scheduled_at == timezone.now()
7 changes: 6 additions & 1 deletion tests/jobs/test_provisioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def test_spark_job_run(mocker, is_public, spark_job_provisioner, user):
stubber.add_response('run_job_flow', response, expected_params)

with stubber:
jobflow_id = spark_job_provisioner.run(
jobflow_id, params = spark_job_provisioner.run(
user_username=user.username,
user_email=user.email,
identifier=identifier,
Expand All @@ -269,3 +269,8 @@ def test_spark_job_run(mocker, is_public, spark_job_provisioner, user):
job_timeout=job_timeout,
)
assert jobflow_id == '12345'
assert (
params['LogUri'] ==
's3://log-bucket/%s/%s/2017-02-03T13:48:09+00:00' %
(spark_job_provisioner.log_dir, identifier)
)

0 comments on commit 5f31918

Please sign in to comment.