Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: emr_conn_id should be optional in EmrCreateJobFlowOperator #24306

Merged
merged 5 commits into from
Jun 10, 2022

Conversation

pankajastro
Copy link
Member

@pankajastro pankajastro commented Jun 7, 2022

#Closes: #24318


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon-aws AWS/Amazon - related issues labels Jun 7, 2022
@pankajastro pankajastro marked this pull request as ready for review June 8, 2022 09:46
@potiuk
Copy link
Member

potiuk commented Jun 8, 2022

Hey @pankajastro - I am about to release RC2 for providers for May and would like to include that one - would it be possibe to fix the failing tests quickly?

@pankajastro pankajastro force-pushed the fix_emr_create_job_flow_hook branch from 43ea8bd to acc8aa5 Compare June 8, 2022 10:10
@pankajastro
Copy link
Member Author

Hey @pankajastro - I am about to release RC2 for providers for May and would like to include that one - would it be possibe to fix the failing tests quickly?

Fixed it, the test passing on local. let's see how it goes in CI.

@pankajastro
Copy link
Member Author

@potiuk static check failing in this PR as well in other with error

Run pydocstyle.........................................................................Failed
- hook id: pydocstyle
- exit code: 1
airflow/www/security.py:627 in private method `_sync_dag_view_permissions`:
        D202: No blank lines allowed after function docstring (found 1)

@potiuk
Copy link
Member

potiuk commented Jun 8, 2022

Ah indeed - it looks like some cross-merged commits without rebase ....

@eladkal
Copy link
Contributor

eladkal commented Jun 8, 2022

Fix: #24322

@Taragolis
Copy link
Contributor

Is it actually an error? Possibly due to lack of documentation of this connection type

As far as I remember since Airflow 1.10 (and probably earlier) Amazon Elastic Map Reduce connection stored only kwargs for run_job_flow and aws_conn_id uses for actual boto3 emr client connection

@potiuk
Copy link
Member

potiuk commented Jun 8, 2022

Is it actually an error? Possibly due to lack of documentation of this connection type

I believe there were quite a lot of refactorings to AwsBaseHook since and I guess this is result of it - looks legit for me actually.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajastro - if you could comment on @Taragolis 's comment too - anyway I am happy to merge it soon and RC2 tests might be good for it.

@github-actions
Copy link

github-actions bot commented Jun 8, 2022

The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the okay to merge It's ok to merge this PR as it does not require more tests label Jun 8, 2022
@pankajastro
Copy link
Member Author

Is it actually an error? Possibly due to lack of documentation of this connection type

As far as I remember since Airflow 1.10 (and probably earlier) Amazon Elastic Map Reduce connection stored only kwargs for run_job_flow and aws_conn_id uses for actual boto3 emr client connection

emr_conn_id is used in only EmrCreateJobFlowOperator and I feel it expecting that emr_conn_id extra filed should contain JSON request body for run_job_flow API. But keeping the boto3 API request body in DAG looks more convenient to me than storing it in connection and if I'm passing job_flow_overrides param in the task in that case then emr_conn_id should not be mandatory. Here, I'm doing

  • check if aws_conn_id is available, use it for AWS credential
  • if aws_conn_id is not available assume that credential is in emr_conn_id
  • if emr_conn_id conn extra is containing request body override with job_flow_overrides request body
  • if emr_conn_id does not exist or contain AWS credentials then use job_flow_overrides as the request body
    After this change, if emr_conn_id is not available then it will use aws_conn_id for auth and job_flow_overrides for the request body earlier it was failing by error conn_id not found

@Taragolis
Copy link
Contributor

Taragolis commented Jun 8, 2022

I believe there were quite a lot of refactorings to AwsBaseHook since and I guess this is result of it - looks legit for me actually.

I've just check in 1.10.4 (I used to work with this version for about 1 year) and looks like it have the same behaviour https://github.com/apache/airflow/blob/1.10.4/airflow/contrib/hooks/emr_hook.py

If user stored some parameters in EMR Connection which not accepted by run_job_flow it would receive an error.

However after this changes:

I've never been happier with behaviour that I need provide emr connection in cases when I do not required it.

@eladkal
Copy link
Contributor

eladkal commented Jun 8, 2022

The hook states:

class EmrHook(AwsBaseHook):
"""
Interact with AWS EMR. emr_conn_id is only necessary for using the
create_job_flow method.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.

@pankajastro
Copy link
Member Author

If the user will store aws_access_key_id in emr_conn_id extra then I'm assuming that intention to use this connection as authentication rather than using it in building request body. If you want to throw an error like the earlier case I can do that but in current case you can store the AWS JSON in emr_conn_id extra then you do not need to pass aws_conn_id pram in task

@pankajastro
Copy link
Member Author

If user store other extra arguments from AWS Connection such as region_name, profile_name, config_kwargs, session_kwargs, and etc the error will raised

In this case the behaviour will depend on AWS API like earlier

@potiuk
Copy link
Member

potiuk commented Jun 8, 2022

If I am judging well - I think @eladkal also suggested that - I think it's not a "regression" - rather improvement so I am ok with merging it.

@Taragolis
Copy link
Contributor

Taragolis commented Jun 8, 2022

emr_conn_id is used in only EmrCreateJobFlowOperator and I feel it expecting that emr_conn_id extra filed should contain JSON request body for run_job_flow API. But keeping the boto3 API request body in DAG looks more convenient to me than storing it in connection and if I'm passing job_flow_overrides param in the task in that case then emr_conn_id should not be mandatory.

If only case case make emr_conn_id non mandatory than probably better just change receiving config

config = {}
if self.emr_conn_id:
    emr_conn = self.get_connection(self.emr_conn_id)
    config = emr_conn.extra_dejson.copy()

Assume that user provide AWS Connection rather than EMR Connection personally for me it is not good point.
I could pass aws_conn_id=None and assume that Airflow will use Instance Profile / Task Role

If the user will store aws_access_key_id in emr_conn_id extra then I'm assuming that intention to use this connection as authentication rather than using it in building request body.

If user deploy Airflow in AWS Cloud and follow AWS Best Practices he/she won't use aws_access_key_id and most probably use Instance Profile / Task Role

Examples:

  • User use role_arn in EMR Connection and expected that he/she switch to this role, however error raise
  • User use region_name in EMR Connection and expected that region changed, however error raise

For me it is not clear why we only need to check only aws_access_key_id probably better check connection type ?

@pankajastro
Copy link
Member Author

pankajastro commented Jun 8, 2022

For me it is not clear why we only need to check aws_access_key_id ?

I'm checking this because I do not want a user to use emr_conn_id to authenticate and aws_access_key_id is required if you want to keep the AWS auth param in conn. In case they have kept the auth param in emr_conn_id then ignore it. But I'm ok to remove that if block check if you want.

@Taragolis
Copy link
Contributor

Taragolis commented Jun 8, 2022

I'm just worry that docstring told that this argument for EMR Connection, but we assume that it could be AWS Connection

:param aws_conn_id: aws connection to uses
:param emr_conn_id: emr connection to use

I think if we want keep it simpler, allow user to skip set emr_conn_id and do not allow use other connection type by mistake we could simply do this

config = {}
if self.emr_conn_id:
    emr_conn = self.get_connection(self.emr_conn_id)
    if emr_conn.conn_type != 'emr':
        raise AirflowException(
            f"Expected 'emr' connection type for `emr_conn_id`={self.emr_conn_id} but got {emr_conn.conn_type!r}"
        )
    config = emr_conn.extra_dejson.copy()

WDYT @potiuk @eladkal @pankajastro

With this suggestion

  • explicit set aws_conn_id = None - use default boto3 behaviour (same as current implementation)
  • explicit set emr_conn_id = None - use empty config (improvement)
  • aws_conn_id not exists - raise an error (same as current implementation)
  • emr_conn_id not exists - raise an error (same as current implementation)
  • emr_conn_id use wrong type - raise an error (improvement??)
  • Store in EMR Connection some unwanted stuff - raise an error (same as current implementation)

@pankajastro
Copy link
Member Author

  • emr_conn_id not exists - raise an error (same as current implementation)

If you want to keep this behaviour then some users will have to create emr connection having no data, which I don't want. Basically, emr_conn_id should not mandatory.

@Taragolis
Copy link
Contributor

Basically, emr_conn_id should not mandatory.

I mean if user set emr_conn_id="this_connection_not_exists" this connection not exists and should raise an error rather than set to empty extras. And emr_conn_id=None - is not mandatory.

I think it also required make changes in operator
https://github.com/apache/airflow/blob/287fc4bdeb107fc81653d3a6721f9f5b87b90529/airflow/providers/amazon/aws/operators/emr.py#L304-L305

@eladkal
Copy link
Contributor

eladkal commented Jun 8, 2022

@vincbeck @ferruzzi @o-nikolas any thoughts on this one?

@pankajastro
Copy link
Member Author

pankajastro commented Jun 8, 2022

I mean if user set emr_conn_id="this_connection_not_exists" this connection not exists and should raise an error rather than set to empty extras. And emr_conn_id=None - is not mandatory.

I think it also required make changes in operator

https://github.com/apache/airflow/blob/287fc4bdeb107fc81653d3a6721f9f5b87b90529/airflow/providers/amazon/aws/operators/emr.py#L304-L305

Ok, So behaviour should be like

  • if aws_conn_id does not exist raise an error like earlier
  • if emr_conn_id contain auth param like aws_access_key_id etc raise error like earlier
  • if emr_conn_id does not exist then make config as empty dict that way emr_conn_id will not be mandatory

We can't keep aws_conn_id and emr_conn_id default values None because if the user do not pass aws_conn_id then we assume that the value is aws_default across the operator/sensor. Fixed here 5b0718450df7d0c47b644f498ce625ab661ec31d

@pankajastro pankajastro force-pushed the fix_emr_create_job_flow_hook branch from 9083a03 to 5b07184 Compare June 8, 2022 14:59
@pankajastro pankajastro changed the title Fix: Unable to locate credentials and Unknown parameter in input for EmrCreateJobFlowOperator Fix: emr_conn_id should if optional in EmrCreateJobFlowOperator Jun 8, 2022
@pankajastro pankajastro changed the title Fix: emr_conn_id should if optional in EmrCreateJobFlowOperator Fix: emr_conn_id should if optional in EmrCreateJobFlowOperator Jun 8, 2022
@pankajastro pankajastro changed the title Fix: emr_conn_id should if optional in EmrCreateJobFlowOperator Fix: emr_conn_id should be optional in EmrCreateJobFlowOperator Jun 8, 2022
@Taragolis
Copy link
Contributor

We can't keep aws_conn_id and emr_conn_id default values None because if the user do pass aws_conn_id then we accuse that the value is aws_default across the operator/sensor.

BTW, AwsBaseHook handle case if aws_conn_id=None and use default boto3 behaviour

def _get_credentials(self, region_name: Optional[str]) -> Tuple[boto3.session.Session, Optional[str]]:
if not self.aws_conn_id:
session = boto3.session.Session(region_name=region_name)
return session, None

So it doesn't raise any error now - only static checks warning in IDE

AIRFLOW_CTX_DAG_ID=example_emr_job_flow_manual_steps
AIRFLOW_CTX_TASK_ID=create_job_flow
AIRFLOW_CTX_EXECUTION_DATE=2022-06-07T10:28:15.046932+00:00
AIRFLOW_CTX_TRY_NUMBER=8
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-07T10:28:15.046932+00:00
[2022-06-08, 15:10:55 UTC] {emr.py:300} INFO - Creating JobFlow using aws-conn-id: None, emr-conn-id: emr_default
[2022-06-08, 15:10:55 UTC] {base.py:68} INFO - Using connection ID 'emr_default' for task execution.
[2022-06-08, 15:10:55 UTC] {credentials.py:1180} INFO - Found credentials in environment variables.
[2022-06-08, 15:10:56 UTC] {emr.py:314} INFO - JobFlow with id j-1Y1LXW4LWC1Y1 created

@pankajastro
Copy link
Member Author

BTW, AwsBaseHook handle case if aws_conn_id=None and use default boto3 behaviour

Yes, we can also set it here. but that too will be breaking change for someone not passing aws_conn_id in this operator param and have created a connection with the name aws_default https://github.com/apache/airflow/blob/287fc4bdeb107fc81653d3a6721f9f5b87b90529/airflow/providers/amazon/aws/operators/emr.py#L304

@Taragolis
Copy link
Contributor

Yes, we can also set it here. but that too will be breaking change for someone not passing aws_conn_id in this operator param and have created a connection with the name aws_default

Quite a lot of operators and sensors in amazon-provider uses different annotations and default value for aws_conn_id

  • aws_conn_id: str = 'aws_default'
  • aws_conn_id: Optional[str] = 'aws_default'
  • aws_conn_id: Optional[str] = None

Due to the fact all of them (or most) uses hooks based on AwsBaseHook so provide None is fine but in case it all workers use the same credentials

You could use aws_conn_id: Optional[str] = 'aws_default' and it won't break anything

if aws_conn_id does not exist raise an error like earlier
if emr_conn_id contain auth param like aws_access_key_id etc raise error like earlier
if emr_conn_id does not exist then make config as empty dict that way emr_conn_id will not be mandatory

My suggestion still the same

  1. Make it allow provide None for emr_conn_id
  2. Do not touch anything related to aws_conn_id
  3. Check type of emr_conn_id if it not None
  • Connection not emr - raise an error
  • Connection not exists - raise an error which is default for BaseHook.get_connection
  1. User provide in connection wrong arguments? boto3 will raise an error

@pankajastro
Copy link
Member Author

You could use aws_conn_id: Optional[str] = 'aws_default' and it won't break anything

Then the default value won't be None it will be aws_default, right?

@Taragolis
Copy link
Contributor

You could use aws_conn_id: Optional[str] = 'aws_default' and it won't break anything

Then the default value won't be None it will be aws_default, right?

yep

@pankajastro
Copy link
Member Author

yep

So, if I'll change the init method of the operator to

def __init__(
        self,
        *,
        aws_conn_id: str = 'aws_default',
        emr_conn_id: Optional[str] = 'emr_default',
        job_flow_overrides: Optional[Union[str, Dict[str, Any]]] = None,
        region_name: Optional[str] = None,
        **kwargs,
    ):

Then from the code execution point of view, this is effectively no change
and if I'll do

def __init__(
        self,
        *,
        aws_conn_id: str = 'aws_default',
        emr_conn_id: Optional[str] = None,
        job_flow_overrides: Optional[Union[str, Dict[str, Any]]] = None,
        region_name: Optional[str] = None,
        **kwargs,
    ):

Then this will be a breaking change if someone has created emr_conn_id with the name emr_default and not passing emr_conn_id param in operator, isn't it?

@Taragolis
Copy link
Contributor

Then this will be a breaking change if someone has created emr_conn_id with the name emr_default and not passing emr_conn_id param in operator, isn't it?

Yeah, you right, for example EMR example DAG doesn't set emr_conn_id, it probably won't hurt if it uses empty connection. But if user previously stored some values in emr_conn_id it will

job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
)

So it would be nice update docstring about behaviour of emr_conn_id as well as for aws_conn_id (it could be grabbed from AwsBaseHook)

:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).

@pankajastro pankajastro force-pushed the fix_emr_create_job_flow_hook branch from f137427 to 140a26d Compare June 8, 2022 19:00
@pankajastro
Copy link
Member Author

Updated docstring here 140a26d

@o-nikolas
Copy link
Contributor

I unfortunately don't have the time to catch up on this issue today, but dropping in to CC @dacort (who is the AWS Developer Advocate for EMR)

@pankajastro
Copy link
Member Author

Hey guys, just wanted to check if I need to address anything more here?

@kaxil kaxil merged commit 99d9833 into apache:main Jun 10, 2022
@kaxil kaxil deleted the fix_emr_create_job_flow_hook branch June 10, 2022 13:25
@dacort
Copy link
Contributor

dacort commented Jun 20, 2022

I was also OOO, but looks like this is already taken care of. Feel free to ping me in the future for any EMR-related questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers okay to merge It's ok to merge this PR as it does not require more tests provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

EmrCreateJobFlowOperator does not work if emr_conn_id param contain credential
7 participants