diff --git a/airflow/providers/google/CHANGELOG.rst b/airflow/providers/google/CHANGELOG.rst index ad85c5d662e11..9cb29093ec936 100644 --- a/airflow/providers/google/CHANGELOG.rst +++ b/airflow/providers/google/CHANGELOG.rst @@ -42,6 +42,10 @@ Breaking changes * ``GCSObjectsWtihPrefixExistenceSensor`` removed. Please use ``GCSObjectsWithPrefixExistenceSensor``. +* ``PubSubPullSensor``: Remove ``project``. Please use ``project_id`` + +* ``PubSubPullSensor``: Remove ``return_immediately`` + 6.8.0 ..... diff --git a/airflow/providers/google/cloud/sensors/pubsub.py b/airflow/providers/google/cloud/sensors/pubsub.py index 11abeca4dd1be..86081de72f4a0 100644 --- a/airflow/providers/google/cloud/sensors/pubsub.py +++ b/airflow/providers/google/cloud/sensors/pubsub.py @@ -16,7 +16,6 @@ # specific language governing permissions and limitations # under the License. """This module contains a Google PubSub sensor.""" -import warnings from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union from google.cloud.pubsub_v1.types import ReceivedMessage @@ -49,23 +48,18 @@ class PubSubPullSensor(BaseSensorOperator): acknowledged before being returned, otherwise, downstream tasks will be responsible for acknowledging them. - ``project`` and ``subscription`` are templated so you can use + If you want a non-blocking task that does not to wait for messages, please use + :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator` + instead. + + ``project_id`` and ``subscription`` are templated so you can use variables in them. - :param project: the Google Cloud project ID for the subscription (templated) + :param project_id: the Google Cloud project ID for the subscription (templated) :param subscription: the Pub/Sub subscription name. Do not include the full subscription path. :param max_messages: The maximum number of messages to retrieve per PubSub pull request - :param return_immediately: - (Deprecated) This is an underlying PubSub API implementation detail. - It has no real effect on Sensor behaviour other than some internal wait time before retrying - on empty queue. - The Sensor task will (by definition) always wait for a message, regardless of this argument value. - - If you want a non-blocking task that does not to wait for messages, please use - :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator` - instead. :param ack_messages: If True, each message will be acknowledged immediately rather than by any downstream tasks :param gcp_conn_id: The connection ID to use connecting to @@ -101,36 +95,13 @@ def __init__( project_id: str, subscription: str, max_messages: int = 5, - return_immediately: bool = True, ack_messages: bool = False, gcp_conn_id: str = 'google_cloud_default', messages_callback: Optional[Callable[[List[ReceivedMessage], "Context"], Any]] = None, delegate_to: Optional[str] = None, - project: Optional[str] = None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ) -> None: - # To preserve backward compatibility - # TODO: remove one day - if project: - warnings.warn( - "The project parameter has been deprecated. You should pass the project_id parameter.", - DeprecationWarning, - stacklevel=2, - ) - project_id = project - - if not return_immediately: - warnings.warn( - "The return_immediately parameter is deprecated.\n" - " It exposes what is really just an implementation detail of underlying PubSub API.\n" - " It has no effect on PubSubPullSensor behaviour.\n" - " It should be left as default value of True.\n" - " If is here only because of backwards compatibility.\n" - " If may be removed in the future.\n", - DeprecationWarning, - stacklevel=2, - ) super().__init__(**kwargs) self.gcp_conn_id = gcp_conn_id @@ -138,7 +109,6 @@ def __init__( self.project_id = project_id self.subscription = subscription self.max_messages = max_messages - self.return_immediately = return_immediately self.ack_messages = ack_messages self.messages_callback = messages_callback self.impersonation_chain = impersonation_chain @@ -161,7 +131,7 @@ def poke(self, context: "Context") -> bool: project_id=self.project_id, subscription=self.subscription, max_messages=self.max_messages, - return_immediately=self.return_immediately, + return_immediately=True, ) handle_messages = self.messages_callback or self._default_message_callback