Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take advantage of upcoming major version release to remove deprecated things #30755

Merged
merged 12 commits into from
Apr 21, 2023
15 changes: 0 additions & 15 deletions airflow/providers/amazon/aws/hooks/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""
from __future__ import annotations

import warnings
from time import sleep
from typing import Any

Expand Down Expand Up @@ -224,29 +223,15 @@ def get_query_results_paginator(
def poll_query_status(
self,
query_execution_id: str,
max_tries: int | None = None,
max_polling_attempts: int | None = None,
) -> str | None:
"""
Poll the status of submitted athena query until query state reaches final state.
Returns one of the final states

:param query_execution_id: Id of submitted athena query
:param max_tries: Deprecated - Use max_polling_attempts instead
:param max_polling_attempts: Number of times to poll for query state before function exits
"""
if max_tries:
warnings.warn(
f"Passing 'max_tries' to {self.__class__.__name__}.poll_query_status is deprecated "
f"and will be removed in a future release. Please use 'max_polling_attempts' instead.",
DeprecationWarning,
stacklevel=2,
)
if max_polling_attempts and max_polling_attempts != max_tries:
raise Exception("max_polling_attempts must be the same value as max_tries")
else:
max_polling_attempts = max_tries

try_number = 1
final_query_state = None # Query state when query reaches final state or max_polling_attempts reached
while True:
Expand Down
53 changes: 1 addition & 52 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import logging
import os
import uuid
import warnings
from copy import deepcopy
from functools import wraps
from os import PathLike
Expand All @@ -54,7 +53,6 @@
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowNotFoundException,
)
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
Expand Down Expand Up @@ -362,34 +360,6 @@ def web_identity_token_loader():
def _strip_invalid_session_name_characters(self, role_session_name: str) -> str:
return slugify(role_session_name, regex_pattern=r"[^\w+=,.@-]+")

def _get_region_name(self) -> str | None:
warnings.warn(
"`BaseSessionFactory._get_region_name` method deprecated and will be removed "
"in a future releases. Please use `BaseSessionFactory.region_name` property instead.",
DeprecationWarning,
stacklevel=2,
)
return self.region_name

def _read_role_arn_from_extra_config(self) -> str | None:
warnings.warn(
"`BaseSessionFactory._read_role_arn_from_extra_config` method deprecated and will be removed "
"in a future releases. Please use `BaseSessionFactory.role_arn` property instead.",
DeprecationWarning,
stacklevel=2,
)
return self.role_arn

def _read_credentials_from_connection(self) -> tuple[str | None, str | None]:
warnings.warn(
"`BaseSessionFactory._read_credentials_from_connection` method deprecated and will be removed "
"in a future releases. Please use `BaseSessionFactory.conn.aws_access_key_id` and "
"`BaseSessionFactory.aws_secret_access_key` properties instead.",
DeprecationWarning,
stacklevel=2,
)
return self.conn.aws_access_key_id, self.conn.aws_secret_access_key


class AwsGenericHook(BaseHook, Generic[BaseAwsConnection]):
"""
Expand Down Expand Up @@ -528,17 +498,7 @@ def conn_config(self) -> AwsConnectionWrapper:
"""Get the Airflow Connection object and wrap it in helper (cached)."""
connection = None
if self.aws_conn_id:
try:
connection = self.get_connection(self.aws_conn_id)
except AirflowNotFoundException:
warnings.warn(
f"Unable to find AWS Connection ID '{self.aws_conn_id}', switching to empty. "
"This behaviour is deprecated and will be removed in a future releases. "
"Please provide existed AWS connection ID or if required boto3 credential strategy "
"explicit set AWS Connection ID to None.",
DeprecationWarning,
stacklevel=2,
)
connection = self.get_connection(self.aws_conn_id)

return AwsConnectionWrapper(
conn=connection, region_name=self._region_name, botocore_config=self._config, verify=self._verify
Expand Down Expand Up @@ -730,17 +690,6 @@ def decorator_f(self, *args, **kwargs):

return retry_decorator

def _get_credentials(self, region_name: str | None) -> tuple[boto3.session.Session, str | None]:
warnings.warn(
"`AwsGenericHook._get_credentials` method deprecated and will be removed in a future releases. "
"Please use `AwsGenericHook.get_session` method and "
"`AwsGenericHook.conn_config.endpoint_url` property instead.",
DeprecationWarning,
stacklevel=2,
)

return self.get_session(region_name=region_name), self.conn_config.endpoint_url

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom UI field behaviour for AWS Connection."""
Expand Down
73 changes: 1 addition & 72 deletions airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import json
import warnings
from time import sleep
from typing import Any, Callable
from typing import Any

from botocore.exceptions import ClientError

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.waiter import get_state, waiter
from airflow.utils.helpers import prune_dict


Expand Down Expand Up @@ -259,62 +258,6 @@ def conn(self):
"""Get the underlying boto3 EmrServerlessAPIService client (cached)"""
return super().conn

# This method should be replaced with boto waiters which would implement timeouts and backoff nicely.
def waiter(
self,
get_state_callable: Callable,
get_state_args: dict,
parse_response: list,
desired_state: set,
failure_states: set,
object_type: str,
action: str,
countdown: int = 25 * 60,
check_interval_seconds: int = 60,
) -> None:
"""
Will run the sensor until it turns True.

:param get_state_callable: A callable to run until it returns True
:param get_state_args: Arguments to pass to get_state_callable
:param parse_response: Dictionary keys to extract state from response of get_state_callable
:param desired_state: Wait until the getter returns this value
:param failure_states: A set of states which indicate failure and should throw an
exception if any are reached before the desired_state
:param object_type: Used for the reporting string. What are you waiting for? (application, job, etc)
:param action: Used for the reporting string. What action are you waiting for? (created, deleted, etc)
:param countdown: Total amount of time the waiter should wait for the desired state
before timing out (in seconds). Defaults to 25 * 60 seconds.
:param check_interval_seconds: Number of seconds waiter should wait before attempting
to retry get_state_callable. Defaults to 60 seconds.
"""
warnings.warn(
"""This method is deprecated.
Please use `airflow.providers.amazon.aws.utils.waiter.waiter`.""",
DeprecationWarning,
stacklevel=2,
)
waiter(
get_state_callable=get_state_callable,
get_state_args=get_state_args,
parse_response=parse_response,
desired_state=desired_state,
failure_states=failure_states,
object_type=object_type,
action=action,
countdown=countdown,
check_interval_seconds=check_interval_seconds,
)

def get_state(self, response, keys) -> str:
warnings.warn(
"""This method is deprecated.
Please use `airflow.providers.amazon.aws.utils.waiter.get_state`.""",
DeprecationWarning,
stacklevel=2,
)
return get_state(response=response, keys=keys)


class EmrContainerHook(AwsBaseHook):
"""
Expand Down Expand Up @@ -483,7 +426,6 @@ def check_query_status(self, job_id: str) -> str | None:
def poll_query_status(
self,
job_id: str,
max_tries: int | None = None,
poll_interval: int = 30,
max_polling_attempts: int | None = None,
) -> str | None:
Expand All @@ -492,22 +434,9 @@ def poll_query_status(
Returns one of the final states.

:param job_id: The ID of the job run request.
:param max_tries: Deprecated - Use max_polling_attempts instead
:param poll_interval: Time (in seconds) to wait between calls to check query status on EMR
:param max_polling_attempts: Number of times to poll for query state before function exits
"""
if max_tries:
warnings.warn(
f"Method `{self.__class__.__name__}.max_tries` is deprecated and will be removed "
"in a future release. Please use method `max_polling_attempts` instead.",
DeprecationWarning,
stacklevel=2,
)
if max_polling_attempts and max_polling_attempts != max_tries:
raise Exception("max_polling_attempts must be the same value as max_tries")
else:
max_polling_attempts = max_tries

try_number = 1
final_query_state = None # Query state when query reaches final state or max_polling_attempts reached

Expand Down
13 changes: 1 addition & 12 deletions airflow/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import asyncio
import warnings
from typing import Any, Sequence

import botocore.exceptions
Expand Down Expand Up @@ -177,22 +176,12 @@ def create_cluster_snapshot(
)
return response["Snapshot"] if response["Snapshot"] else None

def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str | None = None):
def get_cluster_snapshot_status(self, snapshot_identifier: str):
"""
Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``

:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: (deprecated) The unique identifier of the cluster
the snapshot was created from
"""
if cluster_identifier:
warnings.warn(
"Parameter `cluster_identifier` is deprecated."
"This option will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)

try:
response = self.get_conn().describe_cluster_snapshots(
SnapshotIdentifier=snapshot_identifier,
Expand Down
16 changes: 0 additions & 16 deletions airflow/providers/amazon/aws/hooks/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import tarfile
import tempfile
import time
import warnings
from collections import Counter
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -977,21 +976,6 @@ def _list_request(
else:
next_token = response["NextToken"]

def find_processing_job_by_name(self, processing_job_name: str) -> bool:
"""
Query processing job by name

This method is deprecated.
Please use `airflow.providers.amazon.aws.hooks.sagemaker.count_processing_jobs_by_name`.
"""
warnings.warn(
"This method is deprecated. "
"Please use `airflow.providers.amazon.aws.hooks.sagemaker.count_processing_jobs_by_name`.",
DeprecationWarning,
stacklevel=2,
)
return bool(self.count_processing_jobs_by_name(processing_job_name))

@staticmethod
def _name_matches_pattern(
processing_job_name: str,
Expand Down
15 changes: 0 additions & 15 deletions airflow/providers/amazon/aws/operators/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Sequence

from airflow.compat.functools import cached_property
Expand Down Expand Up @@ -45,7 +44,6 @@ class AthenaOperator(BaseOperator):
:param query_execution_context: Context in which query need to be run
:param result_configuration: Dict with path to store results in and config related to encryption
:param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on Athena
:param max_tries: Deprecated - use max_polling_attempts instead.
:param max_polling_attempts: Number of times to poll for query state before function exits
To limit task execution time, use execution_timeout.
:param log_query: Whether to log athena query and other execution params when it's executed.
Expand All @@ -69,7 +67,6 @@ def __init__(
query_execution_context: dict[str, str] | None = None,
result_configuration: dict[str, Any] | None = None,
sleep_time: int = 30,
max_tries: int | None = None,
max_polling_attempts: int | None = None,
log_query: bool = True,
**kwargs: Any,
Expand All @@ -88,18 +85,6 @@ def __init__(
self.query_execution_id: str | None = None
self.log_query: bool = log_query

if max_tries:
warnings.warn(
f"Parameter `{self.__class__.__name__}.max_tries` is deprecated and will be removed "
"in a future release. Please use method `max_polling_attempts` instead.",
DeprecationWarning,
stacklevel=2,
)
if max_polling_attempts and max_polling_attempts != max_tries:
raise Exception("max_polling_attempts must be the same value as max_tries")
else:
self.max_polling_attempts = max_tries

@cached_property
def hook(self) -> AthenaHook:
"""Create and return an AthenaHook."""
Expand Down
29 changes: 0 additions & 29 deletions airflow/providers/amazon/aws/operators/aws_lambda.py

This file was deleted.

Loading