Skip to content

Commit

Permalink
PubSubPullSensor: Remove project and return_immediately (#23231)
Browse files Browse the repository at this point in the history
* `PubSubPullSensor`: Remove `project` and `return_immediately`
  • Loading branch information
eladkal authored Apr 26, 2022
1 parent 06dfc25 commit 1416ac4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 37 deletions.
4 changes: 4 additions & 0 deletions airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ Breaking changes

* ``GCSObjectsWtihPrefixExistenceSensor`` removed. Please use ``GCSObjectsWithPrefixExistenceSensor``.

* ``PubSubPullSensor``: Remove ``project``. Please use ``project_id``

* ``PubSubPullSensor``: Remove ``return_immediately``

6.8.0
.....

Expand Down
44 changes: 7 additions & 37 deletions airflow/providers/google/cloud/sensors/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google PubSub sensor."""
import warnings
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union

from google.cloud.pubsub_v1.types import ReceivedMessage
Expand Down Expand Up @@ -49,23 +48,18 @@ class PubSubPullSensor(BaseSensorOperator):
acknowledged before being returned, otherwise, downstream tasks will be
responsible for acknowledging them.
``project`` and ``subscription`` are templated so you can use
If you want a non-blocking task that does not to wait for messages, please use
:class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator`
instead.
``project_id`` and ``subscription`` are templated so you can use
variables in them.
:param project: the Google Cloud project ID for the subscription (templated)
:param project_id: the Google Cloud project ID for the subscription (templated)
:param subscription: the Pub/Sub subscription name. Do not include the
full subscription path.
:param max_messages: The maximum number of messages to retrieve per
PubSub pull request
:param return_immediately:
(Deprecated) This is an underlying PubSub API implementation detail.
It has no real effect on Sensor behaviour other than some internal wait time before retrying
on empty queue.
The Sensor task will (by definition) always wait for a message, regardless of this argument value.
If you want a non-blocking task that does not to wait for messages, please use
:class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator`
instead.
:param ack_messages: If True, each message will be acknowledged
immediately rather than by any downstream tasks
:param gcp_conn_id: The connection ID to use connecting to
Expand Down Expand Up @@ -101,44 +95,20 @@ def __init__(
project_id: str,
subscription: str,
max_messages: int = 5,
return_immediately: bool = True,
ack_messages: bool = False,
gcp_conn_id: str = 'google_cloud_default',
messages_callback: Optional[Callable[[List[ReceivedMessage], "Context"], Any]] = None,
delegate_to: Optional[str] = None,
project: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
# To preserve backward compatibility
# TODO: remove one day
if project:
warnings.warn(
"The project parameter has been deprecated. You should pass the project_id parameter.",
DeprecationWarning,
stacklevel=2,
)
project_id = project

if not return_immediately:
warnings.warn(
"The return_immediately parameter is deprecated.\n"
" It exposes what is really just an implementation detail of underlying PubSub API.\n"
" It has no effect on PubSubPullSensor behaviour.\n"
" It should be left as default value of True.\n"
" If is here only because of backwards compatibility.\n"
" If may be removed in the future.\n",
DeprecationWarning,
stacklevel=2,
)

super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.project_id = project_id
self.subscription = subscription
self.max_messages = max_messages
self.return_immediately = return_immediately
self.ack_messages = ack_messages
self.messages_callback = messages_callback
self.impersonation_chain = impersonation_chain
Expand All @@ -161,7 +131,7 @@ def poke(self, context: "Context") -> bool:
project_id=self.project_id,
subscription=self.subscription,
max_messages=self.max_messages,
return_immediately=self.return_immediately,
return_immediately=True,
)

handle_messages = self.messages_callback or self._default_message_callback
Expand Down

0 comments on commit 1416ac4

Please sign in to comment.