Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: PubSub incompatibility with api-core 1.17.0+ #103

Merged
merged 10 commits into from
Jun 9, 2020
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/gapic/subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,11 @@ def streaming_pull(
client_info=self._client_info,
)

# Wrappers in api-core should not automatically pre-fetch the first
# stream result, as this breaks the stream when re-opening it.
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
pradn marked this conversation as resolved.
Show resolved Hide resolved
self.transport.streaming_pull._prefetch_first_result_ = False

return self._inner_api_calls["streaming_pull"](
requests, retry=retry, timeout=timeout, metadata=metadata
)
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
# 'Development Status :: 5 - Production/Stable'
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
# google-api-core[grpc] 1.17.0 causes problems, thus restricting its
# version until the issue gets fixed.
# google-api-core[grpc] 1.17.0 up to 1.19.1 causes problems with stream
# recovery, thus those versions should not be used.
# https://github.com/googleapis/python-pubsub/issues/74
"google-api-core[grpc] >= 1.14.0, < 1.17.0",
"google-api-core[grpc] >= 1.14.0, != 1.17.*, != 1.18.*, != 1.19.*",
"grpc-google-iam-v1 >= 0.12.3, < 0.13dev",
'enum34; python_version < "3.4"',
]
Expand Down
14 changes: 14 additions & 0 deletions synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,20 @@ def _merge_dict(d1, d2):
"from google.iam.v1 import iam_policy_pb2_grpc as iam_policy_pb2",
)

# Monkey patch the streaming_pull() GAPIC method to disable pre-fetching stream
# results.
s.replace(
"google/cloud/pubsub_v1/gapic/subscriber_client.py",
r"return self\._inner_api_calls\['streaming_pull'\]\(.*",
"""
# Wrappers in api-core should not automatically pre-fetch the first
# stream result, as this breaks the stream when re-opening it.
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
self.transport.streaming_pull._prefetch_first_result_ = False

\g<0>"""
)

# Add missing blank line before Attributes: in generated docstrings
# https://github.com/googleapis/protoc-docs-plugin/pull/31
s.replace(
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,15 @@ def test_closes_channel_as_context_manager():
pass

mock_transport.channel.close.assert_called()


def test_streaming_pull_gapic_monkeypatch():
transport = mock.NonCallableMock(spec=["streaming_pull"])
transport.streaming_pull = mock.Mock(spec=[])
client = subscriber.Client(transport=transport)

client.streaming_pull(requests=iter([]))

assert client.api.transport is transport
assert hasattr(transport.streaming_pull, "_prefetch_first_result_")
assert not transport.streaming_pull._prefetch_first_result_