Skip to content

Commit

Permalink
Fix: emr_conn_id should be optional in EmrCreateJobFlowOperator (#…
Browse files Browse the repository at this point in the history
…24306)

Closes: #24318
  • Loading branch information
pankajastro authored Jun 10, 2022
1 parent 4daf51a commit 99d9833
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
17 changes: 8 additions & 9 deletions airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from botocore.exceptions import ClientError

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


Expand All @@ -41,8 +41,8 @@ class EmrHook(AwsBaseHook):
conn_type = 'emr'
hook_name = 'Amazon Elastic MapReduce'

def __init__(self, emr_conn_id: Optional[str] = default_conn_name, *args, **kwargs) -> None:
self.emr_conn_id = emr_conn_id
def __init__(self, emr_conn_id: str = default_conn_name, *args, **kwargs) -> None:
self.emr_conn_id: str = emr_conn_id
kwargs["client_type"] = "emr"
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -78,12 +78,11 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]:
run_job_flow method.
Overrides for this config may be passed as the job_flow_overrides.
"""
if not self.emr_conn_id:
raise AirflowException('emr_conn_id must be present to use create_job_flow')

emr_conn = self.get_connection(self.emr_conn_id)

config = emr_conn.extra_dejson.copy()
try:
emr_conn = self.get_connection(self.emr_conn_id)
config = emr_conn.extra_dejson.copy()
except AirflowNotFoundException:
config = {}
config.update(job_flow_overrides)

response = self.get_conn().run_job_flow(**config)
Expand Down
9 changes: 7 additions & 2 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,13 @@ class EmrCreateJobFlowOperator(BaseOperator):
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EmrCreateJobFlowOperator`
:param aws_conn_id: aws connection to uses
:param emr_conn_id: emr connection to use
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node)
:param emr_conn_id: emr connection to use for run_job_flow request body.
This will be overridden by the job_flow_overrides param
:param job_flow_overrides: boto3 style arguments or reference to an arguments file
(must be '.json') to override emr_connection extra. (templated)
:param region_name: Region named passed to EmrHook
Expand Down

0 comments on commit 99d9833

Please sign in to comment.