diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py b/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py index a815659f46e4..4eda15b66126 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py @@ -240,11 +240,8 @@ def _receive(self, receiver, end_time): max_wait_time=self.max_wait_time, ) elif self.receive_type == ReceiveType.push: - batch = receiver._get_streaming_message_iter( - max_wait_time=self.max_wait_time - ) - else: - batch = [] + receiver.max_wait_time = self.max_wait_time + batch = receiver for message in batch: self.on_receive(self._state, message, receiver) @@ -428,9 +425,8 @@ async def _receive_async(self, receiver, end_time): message, receiver, end_time ) elif self.receive_type == ReceiveType.push: - batch = receiver._get_streaming_message_iter( - max_wait_time=self.max_wait_time - ) + receiver.max_wait_time = self.max_wait_time + batch = receiver async for message in batch: if end_time <= datetime.utcnow(): break diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 78f6044296ad..6bd636075377 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -123,9 +123,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel with pytest.raises(ValueError): await receiver.receive_messages(max_wait_time=0) - with pytest.raises(ValueError): - await receiver._get_streaming_message_iter(max_wait_time=0) - count = 0 async for message in receiver: print_message(_logger, message) @@ -1480,7 +1477,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client: scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) messages = [] - receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5) sender = sb_client.get_queue_sender(servicebus_queue.name) async with sender, receiver: content = str(uuid.uuid4()) @@ -1494,7 +1491,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *, await sender.send_messages([message_a, message_b]) received_messages = [] - async for message in receiver._get_streaming_message_iter(max_wait_time=5): + async for message in receiver: received_messages.append(message) await receiver.complete_message(message) @@ -1827,11 +1824,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, * messages = [] async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver: - async for message in receiver._get_streaming_message_iter(): + async for message in receiver: messages.append(message) break - async for message in receiver._get_streaming_message_iter(): + async for message in receiver: messages.append(message) for message in messages: @@ -1845,9 +1842,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, * message_3 = ServiceBusMessage("3") await sender.send_messages([message_2, message_3]) - async for message in receiver._get_streaming_message_iter(): + async for message in receiver: messages.append(message) - async for message in receiver._get_streaming_message_iter(): + async for message in receiver: messages.append(message) assert len(messages) == 4 @@ -1897,60 +1894,6 @@ async def test_queue_receive_keep_conn_alive_async(self, uamqp_transport, *, ser assert len(messages) == 0 # make sure messages are removed from the queue assert receiver_handler == receiver._handler # make sure no reconnection happened - - # @pytest.mark.skip(reason="TODO: _counter doesnt exist in pyamqp") - @pytest.mark.asyncio - @pytest.mark.liveTest - @pytest.mark.live_test_only - @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest') - @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') - @ServiceBusQueuePreparer(name_prefix='servicebustest') - @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) - @ArgPasserAsync() - async def test_async_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs): - async with ServiceBusClient.from_connection_string( - servicebus_namespace_connection_string, - logging_enable=False, uamqp_transport=uamqp_transport) as sb_client: - - async with sb_client.get_queue_sender(servicebus_queue.name) as sender: - message = ServiceBusMessage("0") - await sender.send_messages(message) - - messages = [] - def get_time(): - return time.time() - - async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - - time_1 = get_time() - async for message in receiver._get_streaming_message_iter(max_wait_time=10): - messages.append(message) - await receiver.complete_message(message) - - time_2 = get_time() - async for message in receiver._get_streaming_message_iter(max_wait_time=1): - messages.append(message) - time_3 = get_time() - assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2) - time_4 = get_time() - assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11) - - async for message in receiver._get_streaming_message_iter(max_wait_time=3): - messages.append(message) - time_5 = get_time() - assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4) - - async for message in receiver: - messages.append(message) - time_6 = get_time() - assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6) - - async for message in receiver._get_streaming_message_iter(): - messages.append(message) - time_7 = get_time() - assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6) - assert len(messages) == 1 - @pytest.mark.asyncio @pytest.mark.liveTest diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py index 339f2dc1769e..d3dcbc8956d0 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_subscriptions_async.py @@ -72,9 +72,6 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self, with pytest.raises(ValueError): await receiver.receive_messages(max_wait_time=-1) - with pytest.raises(ValueError): - await receiver._get_streaming_message_iter(max_wait_time=0) - count = 0 async for message in receiver: count += 1 diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 8724d595cf57..cdad7c39dc61 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -187,9 +187,6 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, uamqp_tra with pytest.raises(ValueError): receiver.receive_messages(max_wait_time=0) - with pytest.raises(ValueError): - receiver._get_streaming_message_iter(max_wait_time=0) - count = 0 for message in receiver: print_message(_logger, message) @@ -1691,7 +1688,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) sender = sb_client.get_queue_sender(servicebus_queue.name) - receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5) with sender, receiver: content = str(uuid.uuid4()) @@ -1713,7 +1710,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n sender.send_messages(message_arry) received_messages = [] - for message in receiver._get_streaming_message_iter(max_wait_time=5): + for message in receiver: received_messages.append(message) receiver.complete_message(message) @@ -2073,7 +2070,7 @@ def message_content(): ) sender = sb_client.get_queue_sender(servicebus_queue.name) - receiver = sb_client.get_queue_receiver(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) with sender, receiver: message = ServiceBusMessageBatch() @@ -2086,7 +2083,7 @@ def message_content(): message_2nd_received_cnt = 0 while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20: messages = [] - for message in receiver._get_streaming_message_iter(max_wait_time=10): + for message in receiver: messages.append(message) if not messages: break @@ -2136,11 +2133,11 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - for message in receiver._get_streaming_message_iter(): + for message in receiver: messages.append(message) break - for message in receiver._get_streaming_message_iter(): + for message in receiver: messages.append(message) for message in messages: @@ -2154,9 +2151,9 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus message_3 = ServiceBusMessage("3") sender.send_messages([message_2, message_3]) - for message in receiver._get_streaming_message_iter(): + for message in receiver: messages.append(message) - for message in receiver._get_streaming_message_iter(): + for message in receiver: messages.append(message) assert len(messages) == 4 @@ -2232,62 +2229,9 @@ def test_queue_receiver_sender_resume_after_link_timeout(self, uamqp_transport, messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - for message in receiver._get_streaming_message_iter(): - messages.append(message) - assert len(messages) == 2 - - @pytest.mark.liveTest - @pytest.mark.live_test_only - @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest') - @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') - @ServiceBusQueuePreparer(name_prefix='servicebustest') - @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) - @ArgPasser() - def test_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs): - with ServiceBusClient.from_connection_string( - servicebus_namespace_connection_string, - logging_enable=False, uamqp_transport=uamqp_transport) as sb_client: - - with sb_client.get_queue_sender(servicebus_queue.name) as sender: - message = ServiceBusMessage("0") - sender.send_messages(message) - - messages = [] - def get_time(): - return time.time() - - with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: - - time_1 = get_time() - time_3 = time_1 # In case inner loop isn't hit, fail sanely. - for message in receiver._get_streaming_message_iter(max_wait_time=10): - messages.append(message) - receiver.complete_message(message) - - time_2 = get_time() - for message in receiver._get_streaming_message_iter(max_wait_time=1): - messages.append(message) - time_3 = get_time() - assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2) - time_4 = get_time() - assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11) - - for message in receiver._get_streaming_message_iter(max_wait_time=3): - messages.append(message) - time_5 = get_time() - assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4) - for message in receiver: messages.append(message) - time_6 = get_time() - assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6) - - for message in receiver._get_streaming_message_iter(): - messages.append(message) - time_7 = get_time() - assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6) - assert len(messages) == 1 - + assert len(messages) == 2 @pytest.mark.liveTest @pytest.mark.live_test_only diff --git a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py index fa1e03ec8ef4..af6613cf1c79 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_subscriptions.py @@ -70,9 +70,6 @@ def test_subscription_by_subscription_client_conn_str_receive_basic(self, uamqp_ with pytest.raises(ValueError): receiver.receive_messages(max_wait_time=-1) - with pytest.raises(ValueError): - receiver._get_streaming_message_iter(max_wait_time=0) - count = 0 for message in receiver: count += 1