diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index cf00da98e..0a6311308 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -28,6 +28,7 @@ import publisher +# This uuid is shared across tests which run in parallel. UUID = uuid.uuid4().hex PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] TOPIC_ID = "publisher-test-topic-" + UUID @@ -35,6 +36,10 @@ # Allow 60s for tests to finish. MAX_TIME = 60 +# These tests run in parallel if pytest-parallel is installed. +# Avoid modifying resources that are shared across tests, +# as this results in test flake. + if typing.TYPE_CHECKING: from unittest.mock import AsyncMock, MagicMock diff --git a/samples/snippets/schema_test.py b/samples/snippets/schema_test.py index 2cdf4bfb6..7780bebc1 100644 --- a/samples/snippets/schema_test.py +++ b/samples/snippets/schema_test.py @@ -31,6 +31,7 @@ import schema +# This uuid is shared across tests which run in parallel. UUID = uuid.uuid4().hex try: PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] @@ -45,6 +46,10 @@ AVSC_FILE = "resources/us-states.avsc" PROTO_FILE = "resources/us-states.proto" +# These tests run in parallel if pytest-parallel is installed. +# Avoid modifying resources that are shared across tests, +# as this results in test flake. + @pytest.fixture(scope="module") def schema_client() -> Generator[pubsub_v1.SchemaServiceClient, None, None]: diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index d656c6ce4..e07aba775 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -22,14 +22,13 @@ from _pytest.capture import CaptureFixture import backoff from flaky import flaky -from google.api_core.exceptions import InternalServerError from google.api_core.exceptions import NotFound -from google.api_core.exceptions import Unknown from google.cloud import pubsub_v1 import pytest import subscriber +# This uuid is shared across tests which run in parallel. UUID = uuid.uuid4().hex PY_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}" PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] @@ -37,15 +36,6 @@ DEAD_LETTER_TOPIC = f"subscription-test-dead-letter-topic-{PY_VERSION}-{UUID}" EOD_TOPIC = f"subscription-test-eod-topic-{PY_VERSION}-{UUID}" SUBSCRIPTION_ADMIN = f"subscription-test-subscription-admin-{PY_VERSION}-{UUID}" -SUBSCRIPTION_ASYNC = f"subscription-test-subscription-async-{PY_VERSION}-{UUID}" -SUBSCRIPTION_SYNC = f"subscription-test-subscription-sync-{PY_VERSION}-{UUID}" -SUBSCRIPTION_DLQ = f"subscription-test-subscription-dlq-{PY_VERSION}-{UUID}" -SUBSCRIPTION_EOD_FOR_CREATE = ( - f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" -) -SUBSCRIPTION_EOD_FOR_RECEIVE = ( - f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" -) ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push" NEW_ENDPOINT = f"https://{PROJECT_ID}.appspot.com/push2" REGIONAL_ENDPOINT = "us-east1-pubsub.googleapis.com:443" @@ -57,6 +47,10 @@ typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1)) +# These tests run in parallel if pytest-parallel is installed. +# Avoid modifying resources that are shared across tests, +# as this results in test flake. + @pytest.fixture(scope="module") def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: @@ -70,6 +64,26 @@ def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, No yield publisher +@pytest.fixture(scope="module") +def subscription_admin( + subscriber_client: pubsub_v1.SubscriberClient, topic: str +) -> Generator[str, None, None]: + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_ADMIN + ) + + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + subscription = subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) + + yield subscription.name + + @pytest.fixture(scope="module") def topic(publisher_client: pubsub_v1.PublisherClient) -> Generator[str, None, None]: topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC) @@ -123,171 +137,6 @@ def subscriber_client() -> Generator[pubsub_v1.SubscriberClient, None, None]: subscriber_client.close() -@pytest.fixture(scope="module") -def subscription_admin( - subscriber_client: pubsub_v1.SubscriberClient, topic: str -) -> Generator[str, None, None]: - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - subscription = subscriber_client.create_subscription( - request={"name": subscription_path, "topic": topic} - ) - - yield subscription.name - - -@pytest.fixture(scope="module") -def subscription_sync( - subscriber_client: pubsub_v1.SubscriberClient, topic: str -) -> Generator[str, None, None]: - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_SYNC - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - subscription = subscriber_client.create_subscription( - request={"name": subscription_path, "topic": topic} - ) - - yield subscription.name - - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=300), - ) - - @typed_backoff - def delete_subscription() -> None: - try: - subscriber_client.delete_subscription( - request={"subscription": subscription.name} - ) - except NotFound: - print( - "When Unknown error happens, the server might have" - " successfully deleted the subscription under the cover, so" - " we ignore NotFound" - ) - - delete_subscription() - - -@pytest.fixture(scope="module") -def subscription_async( - subscriber_client: pubsub_v1.SubscriberClient, topic: str -) -> Generator[str, None, None]: - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ASYNC - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - subscription = subscriber_client.create_subscription( - request={"name": subscription_path, "topic": topic} - ) - - yield subscription.name - - subscriber_client.delete_subscription(request={"subscription": subscription.name}) - - -@pytest.fixture(scope="module") -def subscription_dlq( - subscriber_client: pubsub_v1.SubscriberClient, topic: str, dead_letter_topic: str -) -> Generator[str, None, None]: - from google.cloud.pubsub_v1.types import DeadLetterPolicy - - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_DLQ - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - request = { - "name": subscription_path, - "topic": topic, - "dead_letter_policy": DeadLetterPolicy( - dead_letter_topic=dead_letter_topic, max_delivery_attempts=10 - ), - } - subscription = subscriber_client.create_subscription(request) - - yield subscription.name - - subscriber_client.delete_subscription(request={"subscription": subscription.name}) - - -@pytest.fixture(scope="module") -def subscription_eod_for_receive( - subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str -) -> Generator[str, None, None]: - - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - subscription = subscriber_client.create_subscription( - request={ - "name": subscription_path, - "topic": exactly_once_delivery_topic, - "enable_exactly_once_delivery": True, - } - ) - - yield subscription.name - - subscriber_client.delete_subscription(request={"subscription": subscription.name}) - - -@pytest.fixture(scope="module") -def subscription_eod_for_create( - subscriber_client: pubsub_v1.SubscriberClient, exactly_once_delivery_topic: str -) -> Generator[str, None, None]: - - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE - ) - - try: - subscription = subscriber_client.get_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - subscription = subscriber_client.create_subscription( - request={ - "name": subscription_path, - "topic": exactly_once_delivery_topic, - "enable_exactly_once_delivery": True, - } - ) - - yield subscription.name - - subscriber_client.delete_subscription(request={"subscription": subscription.name}) - - def _publish_messages( publisher_client: pubsub_v1.PublisherClient, topic: str, @@ -334,11 +183,16 @@ def eventually_consistent_test() -> None: def test_create_subscription( subscriber_client: pubsub_v1.SubscriberClient, - subscription_admin: str, + topic: str, capsys: CaptureFixture[str], ) -> None: + + subscription_for_create_name = ( + f"subscription-test-subscription-for-create-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN + PROJECT_ID, subscription_for_create_name ) try: @@ -348,116 +202,198 @@ def test_create_subscription( except NotFound: pass - subscriber.create_subscription(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN) + subscriber.create_subscription(PROJECT_ID, TOPIC, subscription_for_create_name) out, _ = capsys.readouterr() - assert f"{subscription_admin}" in out + assert f"{subscription_for_create_name}" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_create_subscription_with_dead_letter_policy( subscriber_client: pubsub_v1.SubscriberClient, - subscription_dlq: str, dead_letter_topic: str, capsys: CaptureFixture[str], ) -> None: + + subscription_dlq_name = ( + f"subscription-test-subscription-dlq-for-create-{PY_VERSION}-{UUID}" + ) + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_dlq_name + ) + try: subscriber_client.delete_subscription( - request={"subscription": subscription_dlq} + request={"subscription": subscription_path} ) except NotFound: pass subscriber.create_subscription_with_dead_letter_topic( - PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + PROJECT_ID, TOPIC, subscription_dlq_name, DEAD_LETTER_TOPIC ) out, _ = capsys.readouterr() - assert f"Subscription created: {subscription_dlq}" in out + assert f"Subscription created: {subscription_path}" in out assert f"It will forward dead letter messages to: {dead_letter_topic}" in out assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + -@typed_flaky def test_receive_with_delivery_attempts( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, dead_letter_topic: str, - subscription_dlq: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, (Unknown, NotFound), max_time=120), + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + subscription_dlq_for_receive_name = ( + f"subscription-test-subscription-dlq-for-receive-{PY_VERSION}-{UUID}" ) - # The dlq subscription raises 404 before it's ready. - # We keep retrying up to 10 minutes for mitigating the flakiness. - @typed_backoff - def run_sample() -> None: - _ = _publish_messages(publisher_client, topic) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_dlq_for_receive_name + ) - subscriber.receive_messages_with_delivery_attempts( - PROJECT_ID, SUBSCRIPTION_DLQ, 90 + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} ) + except NotFound: + request = { + "name": subscription_path, + "topic": topic, + "dead_letter_policy": DeadLetterPolicy( + dead_letter_topic=dead_letter_topic, max_delivery_attempts=10 + ), + } + subscription = subscriber_client.create_subscription(request) - run_sample() + subscription_dlq = subscription.name + + _ = _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_delivery_attempts( + PROJECT_ID, subscription_dlq_for_receive_name, 90 + ) out, _ = capsys.readouterr() assert f"Listening for messages on {subscription_dlq}.." in out assert "With delivery attempts: " in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + -@typed_flaky def test_update_dead_letter_policy( - subscription_dlq: str, dead_letter_topic: str, capsys: CaptureFixture[str] + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, + dead_letter_topic: str, + capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, (Unknown, InternalServerError), max_time=60), + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + subscription_dlq_for_update_name = ( + f"subscription-test-subscription-dlq-for-update-{PY_VERSION}-{UUID}" ) - # We saw internal server error that suggests to retry. + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_dlq_for_update_name + ) - @typed_backoff - def run_sample() -> None: - subscriber.update_subscription_with_dead_letter_policy( - PROJECT_ID, - TOPIC, - SUBSCRIPTION_DLQ, - DEAD_LETTER_TOPIC, - UPDATED_MAX_DELIVERY_ATTEMPTS, + try: + subscription = subscriber_client.get_subscription( + request={"subscription": subscription_path} ) + except NotFound: + request = { + "name": subscription_path, + "topic": topic, + "dead_letter_policy": DeadLetterPolicy( + dead_letter_topic=dead_letter_topic, max_delivery_attempts=10 + ), + } + subscription = subscriber_client.create_subscription(request) - run_sample() + subscription_dlq = subscription.name + + subscriber.update_subscription_with_dead_letter_policy( + PROJECT_ID, + TOPIC, + subscription_dlq_for_update_name, + DEAD_LETTER_TOPIC, + UPDATED_MAX_DELIVERY_ATTEMPTS, + ) out, _ = capsys.readouterr() assert dead_letter_topic in out assert subscription_dlq in out assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + -@typed_flaky def test_remove_dead_letter_policy( - subscription_dlq: str, capsys: CaptureFixture[str] + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, + dead_letter_topic: str, + capsys: CaptureFixture[str], ) -> None: + + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + subscription_dlq_for_remove_name = ( + f"subscription-test-subscription-dlq-for-remove-{PY_VERSION}-{UUID}" + ) + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_dlq_for_remove_name + ) + + request = { + "name": subscription_path, + "topic": topic, + "dead_letter_policy": DeadLetterPolicy( + dead_letter_topic=dead_letter_topic, max_delivery_attempts=10 + ), + } + subscription = subscriber_client.create_subscription(request) + + subscription_dlq = subscription.name + subscription_after_update = subscriber.remove_dead_letter_policy( - PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ + PROJECT_ID, TOPIC, subscription_dlq_for_remove_name ) out, _ = capsys.readouterr() assert subscription_dlq in out assert subscription_after_update.dead_letter_policy.dead_letter_topic == "" + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + def test_create_subscription_with_ordering( subscriber_client: pubsub_v1.SubscriberClient, - subscription_admin: str, + topic: str, capsys: CaptureFixture[str], ) -> None: + subscription_with_ordering_name = ( + f"subscription-test-subscription-with-ordering-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN + PROJECT_ID, subscription_with_ordering_name ) try: subscriber_client.delete_subscription( @@ -466,21 +402,31 @@ def test_create_subscription_with_ordering( except NotFound: pass - subscriber.create_subscription_with_ordering(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN) + subscriber.create_subscription_with_ordering( + PROJECT_ID, TOPIC, subscription_with_ordering_name + ) out, _ = capsys.readouterr() assert "Created subscription with ordering" in out - assert f"{subscription_admin}" in out + assert f"{subscription_with_ordering_name}" in out assert "enable_message_ordering: true" in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + def test_create_subscription_with_filtering( subscriber_client: pubsub_v1.SubscriberClient, - subscription_admin: str, + topic: str, capsys: CaptureFixture[str], ) -> None: + + subscription_with_filtering_name = ( + f"subscription-test-subscription-with-filtering-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN + PROJECT_ID, subscription_with_filtering_name ) try: subscriber_client.delete_subscription( @@ -490,23 +436,32 @@ def test_create_subscription_with_filtering( pass subscriber.create_subscription_with_filtering( - PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, FILTER + PROJECT_ID, TOPIC, subscription_with_filtering_name, FILTER ) out, _ = capsys.readouterr() assert "Created subscription with filtering enabled" in out - assert f"{subscription_admin}" in out + assert f"{subscription_with_filtering_name}" in out assert '"attributes.author=\\"unknown\\""' in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + def test_create_subscription_with_exactly_once_delivery( subscriber_client: pubsub_v1.SubscriberClient, - subscription_eod_for_create: str, + exactly_once_delivery_topic: str, capsys: CaptureFixture[str], ) -> None: + + subscription_eod_for_create_name = ( + f"subscription-test-subscription-eod-for-create-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_EOD_FOR_CREATE + PROJECT_ID, subscription_eod_for_create_name ) + try: subscriber_client.delete_subscription( request={"subscription": subscription_path} @@ -515,327 +470,440 @@ def test_create_subscription_with_exactly_once_delivery( pass subscriber.create_subscription_with_exactly_once_delivery( - PROJECT_ID, EOD_TOPIC, SUBSCRIPTION_EOD_FOR_CREATE + PROJECT_ID, EOD_TOPIC, subscription_eod_for_create_name ) out, _ = capsys.readouterr() assert "Created subscription with exactly once delivery enabled" in out - assert f"{subscription_eod_for_create}" in out + assert f"{subscription_eod_for_create_name}" in out assert "enable_exactly_once_delivery: true" in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + def test_create_push_subscription( subscriber_client: pubsub_v1.SubscriberClient, - subscription_admin: str, + topic: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + + push_subscription_for_create_name = ( + f"subscription-test-subscription-push-for-create-{PY_VERSION}-{UUID}" ) - # The scope of `subscription_path` is limited to this function. - @typed_backoff - def eventually_consistent_test() -> None: - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN - ) - try: - subscriber_client.delete_subscription( - request={"subscription": subscription_path} - ) - except NotFound: - pass - - subscriber.create_push_subscription( - PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, push_subscription_for_create_name + ) + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} ) + except NotFound: + pass - out, _ = capsys.readouterr() - assert f"{subscription_admin}" in out + subscriber.create_push_subscription( + PROJECT_ID, TOPIC, push_subscription_for_create_name, ENDPOINT + ) - eventually_consistent_test() + out, _ = capsys.readouterr() + assert f"{push_subscription_for_create_name}" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) -def test_update_push_suscription( - subscription_admin: str, +def test_update_push_subscription( + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + push_subscription_for_update_name = ( + f"subscription-test-subscription-push-for-create-{PY_VERSION}-{UUID}" + ) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, push_subscription_for_update_name ) - @typed_backoff - def eventually_consistent_test() -> None: - subscriber.update_push_subscription( - PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} ) - out, _ = capsys.readouterr() - assert "Subscription updated" in out - assert f"{subscription_admin}" in out + subscriber.update_push_subscription( + PROJECT_ID, TOPIC, push_subscription_for_update_name, NEW_ENDPOINT + ) - eventually_consistent_test() + out, _ = capsys.readouterr() + assert "Subscription updated" in out + assert f"{push_subscription_for_update_name}" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_delete_subscription( - subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, ) -> None: - subscriber.delete_subscription(PROJECT_ID, SUBSCRIPTION_ADMIN) - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + subscription_for_delete_name = ( + f"subscription-test-subscription-for-delete-{PY_VERSION}-{UUID}" ) - @typed_backoff - def eventually_consistent_test() -> None: - with pytest.raises(Exception): - subscriber_client.get_subscription( - request={"subscription": subscription_admin} - ) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_for_delete_name + ) - eventually_consistent_test() + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) + + subscriber.delete_subscription(PROJECT_ID, subscription_for_delete_name) + + with pytest.raises(Exception): + subscriber_client.get_subscription( + request={"subscription": subscription_for_delete_name} + ) + + # No clean up required. def test_receive( - publisher_client: pubsub_v1.PublisherClient, + subscriber_client: pubsub_v1.SubscriberClient, topic: str, - subscription_async: str, + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + subscription_async_for_receive_name = ( + f"subscription-test-subscription-async-for-receive-{PY_VERSION}-{UUID}" ) - @typed_backoff - def eventually_consistent_test() -> None: - _ = _publish_messages(publisher_client, topic) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_async_for_receive_name + ) + + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) - subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + _ = _publish_messages(publisher_client, topic) - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out + subscriber.receive_messages(PROJECT_ID, subscription_async_for_receive_name, 5) - eventually_consistent_test() + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async_for_receive_name in out + assert "message" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_receive_with_custom_attributes( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_async: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + subscription_async_receive_with_custom_name = ( + f"subscription-test-subscription-async-receive-with-custom-{PY_VERSION}-{UUID}" ) - @typed_backoff - def eventually_consistent_test() -> None: - _ = _publish_messages(publisher_client, topic, origin="python-sample") + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_async_receive_with_custom_name + ) - subscriber.receive_messages_with_custom_attributes( - PROJECT_ID, SUBSCRIPTION_ASYNC, 5 + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} ) - out, _ = capsys.readouterr() - assert subscription_async in out - assert "message" in out - assert "origin" in out - assert "python-sample" in out + _ = _publish_messages(publisher_client, topic, origin="python-sample") - eventually_consistent_test() + subscriber.receive_messages_with_custom_attributes( + PROJECT_ID, subscription_async_receive_with_custom_name, 5 + ) + + out, _ = capsys.readouterr() + assert subscription_async_receive_with_custom_name in out + assert "message" in out + assert "origin" in out + assert "python-sample" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_receive_with_flow_control( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_async: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=300), + subscription_async_receive_with_flow_control_name = f"subscription-test-subscription-async-receive-with-flow-control-{PY_VERSION}-{UUID}" + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_async_receive_with_flow_control_name ) - @typed_backoff - def eventually_consistent_test() -> None: - _ = _publish_messages(publisher_client, topic) + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) - subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + _ = _publish_messages(publisher_client, topic) - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out + subscriber.receive_messages_with_flow_control( + PROJECT_ID, subscription_async_receive_with_flow_control_name, 5 + ) - eventually_consistent_test() + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async_receive_with_flow_control_name in out + assert "message" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_receive_with_blocking_shutdown( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_async: str, capsys: CaptureFixture[str], ) -> None: + subscription_async_receive_with_blocking_name = f"subscription-test-subscription-async-receive-with-blocking-shutdown-{PY_VERSION}-{UUID}" + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_async_receive_with_blocking_name + ) + + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) + _received = re.compile(r".*received.*message.*", flags=re.IGNORECASE) _done = re.compile(r".*done processing.*message.*", flags=re.IGNORECASE) _canceled = re.compile(r".*streaming pull future canceled.*", flags=re.IGNORECASE) _shut_down = re.compile(r".*done waiting.*stream shutdown.*", flags=re.IGNORECASE) - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=300), + _ = _publish_messages(publisher_client, topic, message_num=3) + + subscriber.receive_messages_with_blocking_shutdown( + PROJECT_ID, subscription_async_receive_with_blocking_name, timeout=5.0 ) - @typed_backoff - def eventually_consistent_test() -> None: - _ = _publish_messages(publisher_client, topic, message_num=3) + out, _ = capsys.readouterr() + out_lines = out.splitlines() + + msg_received_lines = [ + i for i, line in enumerate(out_lines) if _received.search(line) + ] + msg_done_lines = [i for i, line in enumerate(out_lines) if _done.search(line)] + stream_canceled_lines = [ + i for i, line in enumerate(out_lines) if _canceled.search(line) + ] + shutdown_done_waiting_lines = [ + i for i, line in enumerate(out_lines) if _shut_down.search(line) + ] - subscriber.receive_messages_with_blocking_shutdown( - PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0 - ) + try: + assert "Listening" in out + assert subscription_async_receive_with_blocking_name in out - out, _ = capsys.readouterr() - out_lines = out.splitlines() - - msg_received_lines = [ - i for i, line in enumerate(out_lines) if _received.search(line) - ] - msg_done_lines = [i for i, line in enumerate(out_lines) if _done.search(line)] - stream_canceled_lines = [ - i for i, line in enumerate(out_lines) if _canceled.search(line) - ] - shutdown_done_waiting_lines = [ - i for i, line in enumerate(out_lines) if _shut_down.search(line) - ] - - try: - assert "Listening" in out - assert subscription_async in out - - assert len(stream_canceled_lines) == 1 - assert len(shutdown_done_waiting_lines) == 1 - assert len(msg_received_lines) == 3 - assert len(msg_done_lines) == 3 - - # The stream should have been canceled *after* receiving messages, but before - # message processing was done. - assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] - - # Yet, waiting on the stream shutdown should have completed *after* - # the processing of received messages has ended. - assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] - except AssertionError: # pragma: NO COVER - from pprint import pprint - - pprint(out_lines) # To make possible flakiness debugging easier. - raise + assert len(stream_canceled_lines) == 1 + assert len(shutdown_done_waiting_lines) == 1 + assert len(msg_received_lines) == 3 + assert len(msg_done_lines) == 3 - eventually_consistent_test() + # The stream should have been canceled *after* receiving messages, but before + # message processing was done. + assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] + + # Yet, waiting on the stream shutdown should have completed *after* + # the processing of received messages has ended. + assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] + except AssertionError: # pragma: NO COVER + from pprint import pprint + + pprint(out_lines) # To make possible flakiness debugging easier. + raise + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_receive_messages_with_exactly_once_delivery_enabled( + subscriber_client: pubsub_v1.SubscriberClient, regional_publisher_client: pubsub_v1.PublisherClient, exactly_once_delivery_topic: str, - subscription_eod_for_receive: str, capsys: CaptureFixture[str], ) -> None: + subscription_eod_for_receive_name = ( + f"subscription-test-subscription-eod-for-receive-{PY_VERSION}-{UUID}" + ) + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_eod_for_receive_name + ) + + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={ + "name": subscription_path, + "topic": exactly_once_delivery_topic, + "enable_exactly_once_delivery": True, + } + ) + message_ids = _publish_messages( regional_publisher_client, exactly_once_delivery_topic ) subscriber.receive_messages_with_exactly_once_delivery_enabled( - PROJECT_ID, SUBSCRIPTION_EOD_FOR_RECEIVE, 30 + PROJECT_ID, subscription_eod_for_receive_name, 200 ) out, _ = capsys.readouterr() - assert subscription_eod_for_receive in out + assert subscription_eod_for_receive_name in out for message_id in message_ids: assert message_id in out + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + def test_listen_for_errors( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_async: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=60), + subscription_async_listen = ( + f"subscription-test-subscription-async-listen-{PY_VERSION}-{UUID}" ) - @typed_backoff - def eventually_consistent_test() -> None: - _ = _publish_messages(publisher_client, topic) + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_async_listen + ) + + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) - subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + _ = _publish_messages(publisher_client, topic) - out, _ = capsys.readouterr() - assert subscription_async in out - assert "threw an exception" in out + subscriber.listen_for_errors(PROJECT_ID, subscription_async_listen, 5) - eventually_consistent_test() + out, _ = capsys.readouterr() + assert subscription_path in out + assert "threw an exception" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) def test_receive_synchronously( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_sync: str, capsys: CaptureFixture[str], ) -> None: + + subscription_sync_for_receive_name = ( + f"subscription-test-subscription-sync-for-receive-{PY_VERSION}-{UUID}" + ) + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_sync_for_receive_name + ) + + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) + _ = _publish_messages(publisher_client, topic) - subscriber.synchronous_pull(PROJECT_ID, SUBSCRIPTION_SYNC) + subscriber.synchronous_pull(PROJECT_ID, subscription_sync_for_receive_name) out, _ = capsys.readouterr() assert "Received" in out - assert f"{subscription_sync}" in out + assert f"{subscription_sync_for_receive_name}" in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) @typed_flaky def test_receive_synchronously_with_lease( + subscriber_client: pubsub_v1.SubscriberClient, publisher_client: pubsub_v1.PublisherClient, topic: str, - subscription_sync: str, capsys: CaptureFixture[str], ) -> None: - typed_backoff = cast( - Callable[[C], C], - backoff.on_exception(backoff.expo, Unknown, max_time=300), + subscription_sync_for_receive_with_lease_name = f"subscription-test-subscription-sync-for-receive-with-lease-{PY_VERSION}-{UUID}" + + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, subscription_sync_for_receive_with_lease_name ) - @typed_backoff - def run_sample() -> None: - _ = _publish_messages(publisher_client, topic, message_num=10) - # Pausing 10s to allow the subscriber to establish the connection - # because sync pull often returns fewer messages than requested. - # The intention is to fix flaky tests reporting errors like - # `google.api_core.exceptions.Unknown: None Stream removed` as - # in https://github.com/googleapis/python-pubsub/issues/341. - time.sleep(10) - subscriber.synchronous_pull_with_lease_management(PROJECT_ID, SUBSCRIPTION_SYNC) - - run_sample() + try: + subscriber_client.get_subscription(request={"subscription": subscription_path}) + except NotFound: + subscriber_client.create_subscription( + request={"name": subscription_path, "topic": topic} + ) + + _ = _publish_messages(publisher_client, topic, message_num=10) + # Pausing 10s to allow the subscriber to establish the connection + # because sync pull often returns fewer messages than requested. + # The intention is to fix flaky tests reporting errors like + # `google.api_core.exceptions.Unknown: None Stream removed` as + # in https://github.com/googleapis/python-pubsub/issues/341. + time.sleep(10) + subscriber.synchronous_pull_with_lease_management( + PROJECT_ID, subscription_sync_for_receive_with_lease_name + ) out, _ = capsys.readouterr() # Sometimes the subscriber only gets 1 or 2 messages and test fails. # I think it's ok to consider those cases as passing. assert "Received and acknowledged" in out - assert f"messages from {subscription_sync}." in out + assert f"messages from {subscription_path}." in out + + # Clean up. + subscriber_client.delete_subscription(request={"subscription": subscription_path})