From 749c08dabd8f21d59b1ea779efd9d75f0718643f Mon Sep 17 00:00:00 2001 From: Albert Okiri Date: Sun, 3 Mar 2024 11:00:07 +0300 Subject: [PATCH 1/4] remove commented sections --- .pre-commit-config.yaml | 8 -------- .../amazon/aws/transfers/redshift_to_s3.py | 13 +++++++++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7f5fca84ae858..a5a8efe06aef9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -330,14 +330,6 @@ repos: pass_filenames: true files: ^airflow/providers/.*/(operators|transfers|sensors)/.*\.py$ additional_dependencies: [ 'rich>=12.4.4' ] - # TODO: Handle the provider-specific exclusions and remove them from the list, see: - # https://github.com/apache/airflow/issues/36484 - exclude: | - (?x)^( - ^.*__init__\.py$| - ^airflow\/providers\/amazon\/aws\/transfers\/redshift_to_s3\.py$| - ^airflow\/providers\/amazon\/aws\/operators\/emr\.py$| - )$ - id: ruff name: Run 'ruff' for extremely fast Python linting description: "Run 'ruff' for extremely fast Python linting" diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index e14db14b7e6ab..3dbbf22db6571 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -109,13 +109,17 @@ def __init__( ) -> None: super().__init__(**kwargs) self.s3_bucket = s3_bucket - self.s3_key = f"{s3_key}/{table}_" if (table and table_as_file_name) else s3_key + if table and table_as_file_name: + s3_key = f"{s3_key}/{table}_" + else: + pass + self.s3_key = s3_key self.schema = schema self.table = table self.redshift_conn_id = redshift_conn_id self.aws_conn_id = aws_conn_id self.verify = verify - self.unload_options: list = unload_options or [] + self.unload_options = unload_options or [] self.autocommit = autocommit self.include_header = include_header self.parameters = parameters @@ -123,13 +127,14 @@ def __init__( self.redshift_data_api_kwargs = redshift_data_api_kwargs or {} if select_query: - self.select_query = select_query + pass elif self.schema and self.table: - self.select_query = f"SELECT * FROM {self.schema}.{self.table}" + select_query = f"SELECT * FROM {self.schema}.{self.table}" else: raise ValueError( "Please provide both `schema` and `table` params or `select_query` to fetch the data." ) + self.select_query = select_query if self.include_header and "HEADER" not in [uo.upper().strip() for uo in self.unload_options]: self.unload_options = [*self.unload_options, "HEADER"] From f5298124e8732e0c3b6df0de1f982651721b1ebc Mon Sep 17 00:00:00 2001 From: Albert Olweny <41315252+okirialbert@users.noreply.github.com> Date: Sun, 3 Mar 2024 12:17:52 +0300 Subject: [PATCH 2/4] Update airflow/providers/amazon/aws/transfers/redshift_to_s3.py Co-authored-by: Andrey Anshin --- airflow/providers/amazon/aws/transfers/redshift_to_s3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index 3dbbf22db6571..d5f3e8a1043f7 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -111,8 +111,6 @@ def __init__( self.s3_bucket = s3_bucket if table and table_as_file_name: s3_key = f"{s3_key}/{table}_" - else: - pass self.s3_key = s3_key self.schema = schema self.table = table From c77c176b2858f7b1003c6570572c6c6ca55d0673 Mon Sep 17 00:00:00 2001 From: Albert Okiri Date: Sun, 3 Mar 2024 17:18:03 +0300 Subject: [PATCH 3/4] add checks in execute --- .../amazon/aws/transfers/redshift_to_s3.py | 35 +++++++++---------- .../aws/transfers/test_redshift_to_s3.py | 3 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index d5f3e8a1043f7..e30bf03d33466 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -109,8 +109,6 @@ def __init__( ) -> None: super().__init__(**kwargs) self.s3_bucket = s3_bucket - if table and table_as_file_name: - s3_key = f"{s3_key}/{table}_" self.s3_key = s3_key self.schema = schema self.table = table @@ -123,24 +121,8 @@ def __init__( self.parameters = parameters self.table_as_file_name = table_as_file_name self.redshift_data_api_kwargs = redshift_data_api_kwargs or {} - - if select_query: - pass - elif self.schema and self.table: - select_query = f"SELECT * FROM {self.schema}.{self.table}" - else: - raise ValueError( - "Please provide both `schema` and `table` params or `select_query` to fetch the data." - ) self.select_query = select_query - if self.include_header and "HEADER" not in [uo.upper().strip() for uo in self.unload_options]: - self.unload_options = [*self.unload_options, "HEADER"] - - if self.redshift_data_api_kwargs: - for arg in ["sql", "parameters"]: - if arg in self.redshift_data_api_kwargs: - raise AirflowException(f"Cannot include param '{arg}' in Redshift Data API kwargs") def _build_unload_query( self, credentials_block: str, select_query: str, s3_key: str, unload_options: str @@ -156,9 +138,26 @@ def _build_unload_query( """ def execute(self, context: Context) -> None: + if self.table and self.table_as_file_name: + self.s3_key = f"{self.s3_key}/{self.table}_" + + if self.schema and self.table: + self.select_query = f"SELECT * FROM {self.schema}.{self.table}" + + if self.select_query is None: + raise ValueError( + "Please provide both `schema` and `table` params or `select_query` to fetch the data." + ) + + if self.include_header and "HEADER" not in [uo.upper().strip() for uo in self.unload_options]: + self.unload_options = [*self.unload_options, "HEADER"] + redshift_hook: RedshiftDataHook | RedshiftSQLHook if self.redshift_data_api_kwargs: redshift_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id) + for arg in ["sql", "parameters"]: + if arg in self.redshift_data_api_kwargs: + raise AirflowException(f"Cannot include param '{arg}' in Redshift Data API kwargs") else: redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id) conn = S3Hook.get_connection(conn_id=self.aws_conn_id) if self.aws_conn_id else None diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index 977975211a7d3..b7876747f4874 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -375,7 +375,7 @@ def test_invalid_param_in_redshift_data_api_kwargs(self, param): Test passing invalid param in RS Data API kwargs raises an error """ with pytest.raises(AirflowException): - RedshiftToS3Operator( + redshift_operator = RedshiftToS3Operator( s3_bucket="s3_bucket", s3_key="s3_key", select_query="select_query", @@ -383,6 +383,7 @@ def test_invalid_param_in_redshift_data_api_kwargs(self, param): dag=None, redshift_data_api_kwargs={param: "param"}, ) + redshift_operator.execute(None) @pytest.mark.parametrize("table_as_file_name, expected_s3_key", [[True, "key/table_"], [False, "key"]]) @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") From a6cb59f4d1bb4f5a81cea1544610fb9a74e22492 Mon Sep 17 00:00:00 2001 From: Albert Okiri Date: Sun, 3 Mar 2024 17:25:55 +0300 Subject: [PATCH 4/4] ruff format --- airflow/providers/amazon/aws/transfers/redshift_to_s3.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index e30bf03d33466..938a9663bf5be 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -123,7 +123,6 @@ def __init__( self.redshift_data_api_kwargs = redshift_data_api_kwargs or {} self.select_query = select_query - def _build_unload_query( self, credentials_block: str, select_query: str, s3_key: str, unload_options: str ) -> str: @@ -148,10 +147,10 @@ def execute(self, context: Context) -> None: raise ValueError( "Please provide both `schema` and `table` params or `select_query` to fetch the data." ) - + if self.include_header and "HEADER" not in [uo.upper().strip() for uo in self.unload_options]: - self.unload_options = [*self.unload_options, "HEADER"] - + self.unload_options = [*self.unload_options, "HEADER"] + redshift_hook: RedshiftDataHook | RedshiftSQLHook if self.redshift_data_api_kwargs: redshift_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id)