Skip to content

Commit

Permalink
[SB] Remove references to internal streaming method (#29750)
Browse files Browse the repository at this point in the history
* remove streaming

* update stress
  • Loading branch information
l0lawrence authored Apr 17, 2023
1 parent db738d9 commit c48b84e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,8 @@ def _receive(self, receiver, end_time):
max_wait_time=self.max_wait_time,
)
elif self.receive_type == ReceiveType.push:
batch = receiver._get_streaming_message_iter(
max_wait_time=self.max_wait_time
)
else:
batch = []
receiver.max_wait_time = self.max_wait_time
batch = receiver

for message in batch:
self.on_receive(self._state, message, receiver)
Expand Down Expand Up @@ -428,9 +425,8 @@ async def _receive_async(self, receiver, end_time):
message, receiver, end_time
)
elif self.receive_type == ReceiveType.push:
batch = receiver._get_streaming_message_iter(
max_wait_time=self.max_wait_time
)
receiver.max_wait_time = self.max_wait_time
batch = receiver
async for message in batch:
if end_time <= datetime.utcnow():
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
with pytest.raises(ValueError):
await receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
print_message(_logger, message)
Expand Down Expand Up @@ -1480,7 +1477,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *,
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
messages = []
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5)
sender = sb_client.get_queue_sender(servicebus_queue.name)
async with sender, receiver:
content = str(uuid.uuid4())
Expand All @@ -1494,7 +1491,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *,
await sender.send_messages([message_a, message_b])

received_messages = []
async for message in receiver._get_streaming_message_iter(max_wait_time=5):
async for message in receiver:
received_messages.append(message)
await receiver.complete_message(message)

Expand Down Expand Up @@ -1827,11 +1824,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, *
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver:

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)
break

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)

for message in messages:
Expand All @@ -1845,9 +1842,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, *
message_3 = ServiceBusMessage("3")
await sender.send_messages([message_2, message_3])

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)
async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -1897,60 +1894,6 @@ async def test_queue_receive_keep_conn_alive_async(self, uamqp_transport, *, ser

assert len(messages) == 0 # make sure messages are removed from the queue
assert receiver_handler == receiver._handler # make sure no reconnection happened

# @pytest.mark.skip(reason="TODO: _counter doesnt exist in pyamqp")
@pytest.mark.asyncio
@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasserAsync()
async def test_async_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("0")
await sender.send_messages(message)

messages = []
def get_time():
return time.time()

async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = get_time()
async for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
await receiver.complete_message(message)

time_2 = get_time()
async for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = get_time()
assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = get_time()
assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11)

async for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = get_time()
assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4)

async for message in receiver:
messages.append(message)
time_6 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6)

async for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1


@pytest.mark.asyncio
@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self,
with pytest.raises(ValueError):
await receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
count += 1
Expand Down
74 changes: 9 additions & 65 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, uamqp_tra
with pytest.raises(ValueError):
receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
for message in receiver:
print_message(_logger, message)
Expand Down Expand Up @@ -1691,7 +1688,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n

scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5)

with sender, receiver:
content = str(uuid.uuid4())
Expand All @@ -1713,7 +1710,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n
sender.send_messages(message_arry)

received_messages = []
for message in receiver._get_streaming_message_iter(max_wait_time=5):
for message in receiver:
received_messages.append(message)
receiver.complete_message(message)

Expand Down Expand Up @@ -2073,7 +2070,7 @@ def message_content():
)

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10)

with sender, receiver:
message = ServiceBusMessageBatch()
Expand All @@ -2086,7 +2083,7 @@ def message_content():
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = []
for message in receiver._get_streaming_message_iter(max_wait_time=10):
for message in receiver:
messages.append(message)
if not messages:
break
Expand Down Expand Up @@ -2136,11 +2133,11 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus
messages = []
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)
break

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)

for message in messages:
Expand All @@ -2154,9 +2151,9 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus
message_3 = ServiceBusMessage("3")
sender.send_messages([message_2, message_3])

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)
for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -2232,62 +2229,9 @@ def test_queue_receiver_sender_resume_after_link_timeout(self, uamqp_transport,
messages = []
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

for message in receiver._get_streaming_message_iter():
messages.append(message)
assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("0")
sender.send_messages(message)

messages = []
def get_time():
return time.time()

with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = get_time()
time_3 = time_1 # In case inner loop isn't hit, fail sanely.
for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
receiver.complete_message(message)

time_2 = get_time()
for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = get_time()
assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = get_time()
assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11)

for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = get_time()
assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4)

for message in receiver:
messages.append(message)
time_6 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6)

for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1

assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
3 changes: 0 additions & 3 deletions sdk/servicebus/azure-servicebus/tests/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ def test_subscription_by_subscription_client_conn_str_receive_basic(self, uamqp_
with pytest.raises(ValueError):
receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
for message in receiver:
count += 1
Expand Down

0 comments on commit c48b84e

Please sign in to comment.