Skip to content

Commit

Permalink
tests: mitigate flaky snippets tests (#432)
Browse files Browse the repository at this point in the history
* test: add additional debug info to snippet test

* Reduce the sleep timeout in message callback

* Auto retry flaky delete schema test

* More flaky tests whack-a-mole
  • Loading branch information
plamut authored Jun 23, 2021
1 parent 2446818 commit 05e3f2f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
2 changes: 2 additions & 0 deletions samples/snippets/schema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import os
import uuid

from flaky import flaky
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient, SchemaServiceClient, SubscriberClient
from google.pubsub_v1.types import Encoding
Expand Down Expand Up @@ -251,6 +252,7 @@ def test_subscribe_with_proto_schema(
assert "Received a binary-encoded message" in out


@flaky(max_runs=3, min_passes=1)
def test_delete_schema(proto_schema, capsys):
schema.delete_schema(PROJECT_ID, PROTO_SCHEMA_ID)
out, _ = capsys.readouterr()
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def receive_messages_with_blocking_shutdown(project_id, subscription_id, timeout

def callback(message):
print(f"Received {message.data}.")
time.sleep(timeout + 5.0) # Pocess longer than streaming pull future timeout.
time.sleep(timeout + 3.0) # Pocess longer than streaming pull future timeout.
message.ack()
print(f"Done processing the message {message.data}.")

Expand Down
30 changes: 18 additions & 12 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,21 +425,26 @@ def test_receive_with_blocking_shutdown(
if re.search(r".*done waiting.*stream shutdown.*", line, flags=re.IGNORECASE)
]

assert "Listening" in out
assert subscription_async in out
try:
assert "Listening" in out
assert subscription_async in out

assert len(stream_canceled_lines) == 1
assert len(shutdown_done_waiting_lines) == 1
assert len(msg_received_lines) == 3
assert len(msg_done_lines) == 3
assert len(stream_canceled_lines) == 1
assert len(shutdown_done_waiting_lines) == 1
assert len(msg_received_lines) == 3
assert len(msg_done_lines) == 3

# The stream should have been canceled *after* receiving messages, but before
# message processing was done.
assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0]
# The stream should have been canceled *after* receiving messages, but before
# message processing was done.
assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0]

# Yet, waiting on the stream shutdown should have completed *after* the processing
# of received messages has ended.
assert msg_done_lines[-1] < shutdown_done_waiting_lines[0]
# Yet, waiting on the stream shutdown should have completed *after*
# the processing of received messages has ended.
assert msg_done_lines[-1] < shutdown_done_waiting_lines[0]
except AssertionError: # pragma: NO COVER
from pprint import pprint
pprint(out_lines) # To make possible flakiness debugging easier.
raise


def test_listen_for_errors(publisher_client, topic, subscription_async, capsys):
Expand All @@ -464,6 +469,7 @@ def test_receive_synchronously(publisher_client, topic, subscription_sync, capsy
assert f"{subscription_sync}" in out


@flaky(max_runs=3, min_passes=1)
def test_receive_synchronously_with_lease(
publisher_client, topic, subscription_sync, capsys
):
Expand Down

0 comments on commit 05e3f2f

Please sign in to comment.