Skip to content

Commit

Permalink
Merge pull request #54 from cabhishek/genie_headers
Browse files Browse the repository at this point in the history
Pipe kwargs from execute method all the down to the request call
  • Loading branch information
cabhishek authored Dec 9, 2019
2 parents b9fa0e1 + 622f596 commit 63d56ff
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pygenie/adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_adapter_for_version(version):
raise GenieAdapterError("no adapter for version '{}'".format(version))


def execute_job(job):
def execute_job(job, **kwargs):
"""
Take a job and convert it to a JSON payload based on the job's
configuration object's Genie version value and execute.
Expand All @@ -49,7 +49,7 @@ def execute_job(job):

if adapter is not None:
try:
adapter.submit_job(job)
adapter.submit_job(job, **kwargs)
except GenieHTTPError as err:
if err.response.status_code == 409:
logger.debug("reattaching to job id '%s'", job.get('job_id'))
Expand Down
2 changes: 1 addition & 1 deletion pygenie/adapter/genie_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def kill_job(self, job_id=None, kill_uri=None):
raise GenieJobNotFoundError("job not found at {}".format(url))
raise

def submit_job(self, job):
def submit_job(self, job, **kwargs):
"""Submit a job execution to the server."""

payload = {
Expand Down
4 changes: 2 additions & 2 deletions pygenie/jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def disable_archive(self):

return self.archive(False)

def execute(self, retry=False, force=False, catch_signal=False):
def execute(self, retry=False, force=False, catch_signal=False, **kwargs):
"""
Send the job to Genie and execute.
Expand Down Expand Up @@ -519,7 +519,7 @@ def sig_handler(signum, frame):
global execute_job # set in main __init__.py to avoid circular imports
# execute_job imports jobs, jobs need to import execute_job
# assigning to running_job variable for killing on signal
running_job = execute_job(self)
running_job = execute_job(self, **kwargs)
return running_job

@add_to_repr('overwrite')
Expand Down
19 changes: 19 additions & 0 deletions tests/job_tests/test_geniejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,25 @@ def test_job_execute_retry_force(self, exec_job, gen_job_id, reattach_job):
exec_job.assert_called_once_with(job)
assert_equals(new_job_id, job._job_id)

@patch('pygenie.jobs.core.reattach_job')
@patch('pygenie.jobs.core.generate_job_id')
@patch('pygenie.jobs.core.execute_job')
def test_job_execute_with_custom_headers(self, exec_job, gen_job_id, reattach_job):
"""Testing job execution with custom headers."""

headers = {'genie-force-agent-execution': 'true'}

job = pygenie.jobs.HiveJob() \
.job_id('exec') \
.genie_username('exectester') \
.script('select * from db.table')

job.execute(headers=headers)

gen_job_id.assert_not_called()
reattach_job.assert_not_called()
exec_job.assert_called_once_with(job, headers=headers)


@patch.dict('os.environ', {'GENIE_BYPASS_HOME_CONFIG': '1'})
class TestingSetJobId(unittest.TestCase):
Expand Down

0 comments on commit 63d56ff

Please sign in to comment.