diff --git a/atmo/jobs/migrations/0034_add_log_uri.py b/atmo/jobs/migrations/0034_add_log_uri.py new file mode 100644 index 00000000..134cfe55 --- /dev/null +++ b/atmo/jobs/migrations/0034_add_log_uri.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.1 on 2017-05-24 23:41 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('jobs', '0033_rename_scheduled_date'), + ] + + operations = [ + migrations.AlterModelOptions( + name='sparkjobrun', + options={'get_latest_by': 'created_at', 'ordering': ['-created_at']}, + ), + migrations.AddField( + model_name='sparkjobrun', + name='log_uri', + field=models.CharField(blank=True, default='', max_length=255), + ), + ] diff --git a/atmo/jobs/models.py b/atmo/jobs/models.py index 703d60af..21212e5f 100644 --- a/atmo/jobs/models.py +++ b/atmo/jobs/models.py @@ -223,7 +223,7 @@ def run(self): # if the job ran before and is still running, don't start it again if not self.is_runnable: return - jobflow_id = self.provisioner.run( + jobflow_id, job_flow_params = self.provisioner.run( user_username=self.created_by.username, user_email=self.created_by.email, identifier=self.identifier, @@ -237,6 +237,7 @@ def run(self): run = self.runs.create( spark_job=self, jobflow_id=jobflow_id, + log_uri=job_flow_params.get('LogUri', ''), scheduled_at=timezone.now(), emr_release_version=self.emr_release.version, size=self.size, @@ -327,6 +328,11 @@ class SparkJobRun(EditedAtModel): blank=True, null=True, ) + log_uri = models.CharField( + max_length=255, + blank=True, + default='', + ) status = models.CharField( max_length=50, blank=True, diff --git a/atmo/jobs/provisioners.py b/atmo/jobs/provisioners.py index 22b5b60c..3fefa64c 100644 --- a/atmo/jobs/provisioners.py +++ b/atmo/jobs/provisioners.py @@ -79,7 +79,7 @@ def run(self, user_username, user_email, identifier, emr_release, size, }) cluster = self.emr.run_job_flow(**job_flow_params) - return cluster['JobFlowId'] + return (cluster['JobFlowId'], job_flow_params) def results(self, identifier, is_public): if is_public: diff --git a/tests/conftest.py b/tests/conftest.py index aa1cb663..9689a02a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -166,7 +166,8 @@ def sparkjob_provisioner_mocks(mocker): ), 'run': mocker.patch( 'atmo.jobs.provisioners.SparkJobProvisioner.run', - return_value='12345', + return_value=('12345', + {'LogUri': 's3://log-bucket/path/to/logs.tar.gz'}), ), 'remove': mocker.patch( 'atmo.jobs.provisioners.SparkJobProvisioner.remove', diff --git a/tests/jobs/test_models.py b/tests/jobs/test_models.py index 0a5bff4a..b23b83ad 100644 --- a/tests/jobs/test_models.py +++ b/tests/jobs/test_models.py @@ -349,3 +349,14 @@ def test_expires(mocker, now, spark_job_factory, cluster_provisioner_mocks): spark_job.save() spark_job.refresh_from_db() assert not spark_job.expired_date + + +@freeze_time('2016-04-05 13:25:47') +def test_run_stores_info_latest_run(spark_job, sparkjob_provisioner_mocks): + spark_job.run() + + assert spark_job.latest_run.jobflow_id == '12345' + assert spark_job.latest_run.log_uri == 's3://log-bucket/path/to/logs.tar.gz' + assert spark_job.latest_run.emr_release_version == spark_job.emr_release.version + assert spark_job.latest_run.size == spark_job.size + assert spark_job.latest_run.scheduled_at == timezone.now() diff --git a/tests/jobs/test_provisioners.py b/tests/jobs/test_provisioners.py index 5fda8b24..9be094b9 100644 --- a/tests/jobs/test_provisioners.py +++ b/tests/jobs/test_provisioners.py @@ -258,7 +258,7 @@ def test_spark_job_run(mocker, is_public, spark_job_provisioner, user): stubber.add_response('run_job_flow', response, expected_params) with stubber: - jobflow_id = spark_job_provisioner.run( + jobflow_id, params = spark_job_provisioner.run( user_username=user.username, user_email=user.email, identifier=identifier, @@ -269,3 +269,8 @@ def test_spark_job_run(mocker, is_public, spark_job_provisioner, user): job_timeout=job_timeout, ) assert jobflow_id == '12345' + assert ( + params['LogUri'] == + 's3://log-bucket/%s/%s/2017-02-03T13:48:09+00:00' % + (spark_job_provisioner.log_dir, identifier) + )