From 0d247e6b189409b4d57c95dbbbf3df3e0fac0fa2 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 14 Mar 2023 11:02:53 -0400 Subject: [PATCH] fix: set x-goog-request-params for streaming pull request (#884) * samples: schema evolution * Add command-line commands * Fix tag for rollback * Make formatting fixes * Formatting fixes * Fix exceptions * fix: Set x-goog-request-params for streaming pull request --- .../subscriber/_protocol/streaming_pull_manager.py | 4 ++++ .../pubsub_v1/subscriber/test_streaming_pull_manager.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 13974ebe4..2f5a31e49 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -279,6 +279,9 @@ def __init__( self._await_callbacks_on_shutdown = await_callbacks_on_shutdown self._ack_histogram = histogram.Histogram() self._last_histogram_size = 0 + self._stream_metadata = [ + ["x-goog-request-params", "subscription=" + subscription] + ] # If max_duration_per_lease_extension is the default # we set the stream_ack_deadline to the default of 60 @@ -845,6 +848,7 @@ def open( initial_request=get_initial_request, should_recover=self._should_recover, should_terminate=self._should_terminate, + metadata=self._stream_metadata, throttle_reopen=True, ) self._rpc.add_done_callback(self._on_rpc_done) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index e01299ef9..199aea256 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -91,6 +91,7 @@ def test__wrap_callback_errors_error(): def test_constructor_and_default_state(): + mock.sentinel.subscription = str() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription ) @@ -113,6 +114,7 @@ def test_constructor_and_default_state(): def test_constructor_with_default_options(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl() manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, @@ -128,6 +130,7 @@ def test_constructor_with_default_options(): def test_constructor_with_min_and_max_duration_per_lease_extension_(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( min_duration_per_lease_extension=15, max_duration_per_lease_extension=20 ) @@ -142,6 +145,7 @@ def test_constructor_with_min_and_max_duration_per_lease_extension_(): def test_constructor_with_min_duration_per_lease_extension_too_low(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( min_duration_per_lease_extension=9, max_duration_per_lease_extension=9 ) @@ -156,6 +160,7 @@ def test_constructor_with_min_duration_per_lease_extension_too_low(): def test_constructor_with_max_duration_per_lease_extension_too_high(): + mock.sentinel.subscription = str() flow_control_ = types.FlowControl( max_duration_per_lease_extension=601, min_duration_per_lease_extension=601 ) @@ -1181,6 +1186,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi initial_request=mock.ANY, should_recover=manager._should_recover, should_terminate=manager._should_terminate, + metadata=manager._stream_metadata, throttle_reopen=True, ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]