Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SB] Remove references to internal streaming method #29750

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,8 @@ def _receive(self, receiver, end_time):
max_wait_time=self.max_wait_time,
)
elif self.receive_type == ReceiveType.push:
batch = receiver._get_streaming_message_iter(
max_wait_time=self.max_wait_time
)
else:
batch = []
receiver.max_wait_time = self.max_wait_time
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
batch = receiver

for message in batch:
self.on_receive(self._state, message, receiver)
Expand Down Expand Up @@ -428,9 +425,8 @@ async def _receive_async(self, receiver, end_time):
message, receiver, end_time
)
elif self.receive_type == ReceiveType.push:
batch = receiver._get_streaming_message_iter(
max_wait_time=self.max_wait_time
)
receiver.max_wait_time = self.max_wait_time
batch = receiver
async for message in batch:
if end_time <= datetime.utcnow():
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
with pytest.raises(ValueError):
await receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
print_message(_logger, message)
Expand Down Expand Up @@ -1480,7 +1477,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *,
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
messages = []
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5)
sender = sb_client.get_queue_sender(servicebus_queue.name)
async with sender, receiver:
content = str(uuid.uuid4())
Expand All @@ -1494,7 +1491,7 @@ async def test_async_queue_schedule_multiple_messages(self, uamqp_transport, *,
await sender.send_messages([message_a, message_b])

received_messages = []
async for message in receiver._get_streaming_message_iter(max_wait_time=5):
async for message in receiver:
received_messages.append(message)
await receiver.complete_message(message)

Expand Down Expand Up @@ -1827,11 +1824,11 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, *
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10) as receiver:

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)
break

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)

for message in messages:
Expand All @@ -1845,9 +1842,9 @@ async def test_async_queue_receiver_alive_after_timeout(self, uamqp_transport, *
message_3 = ServiceBusMessage("3")
await sender.send_messages([message_2, message_3])

async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)
async for message in receiver._get_streaming_message_iter():
async for message in receiver:
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -1897,60 +1894,6 @@ async def test_queue_receive_keep_conn_alive_async(self, uamqp_transport, *, ser

assert len(messages) == 0 # make sure messages are removed from the queue
assert receiver_handler == receiver._handler # make sure no reconnection happened

# @pytest.mark.skip(reason="TODO: _counter doesnt exist in pyamqp")
@pytest.mark.asyncio
@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasserAsync()
async def test_async_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("0")
await sender.send_messages(message)

messages = []
def get_time():
return time.time()

async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = get_time()
async for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
await receiver.complete_message(message)

time_2 = get_time()
async for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = get_time()
assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = get_time()
assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11)

async for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = get_time()
assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4)

async for message in receiver:
messages.append(message)
time_6 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6)

async for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1


@pytest.mark.asyncio
@pytest.mark.liveTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ async def test_subscription_by_subscription_client_conn_str_receive_basic(self,
with pytest.raises(ValueError):
await receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
await receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
async for message in receiver:
count += 1
Expand Down
74 changes: 9 additions & 65 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, uamqp_tra
with pytest.raises(ValueError):
receiver.receive_messages(max_wait_time=0)

with pytest.raises(ValueError):
receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
for message in receiver:
print_message(_logger, message)
Expand Down Expand Up @@ -1691,7 +1688,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n

scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch_count=20, max_wait_time=5)

with sender, receiver:
content = str(uuid.uuid4())
Expand All @@ -1713,7 +1710,7 @@ def test_queue_schedule_multiple_messages(self, uamqp_transport, *, servicebus_n
sender.send_messages(message_arry)

received_messages = []
for message in receiver._get_streaming_message_iter(max_wait_time=5):
for message in receiver:
received_messages.append(message)
receiver.complete_message(message)

Expand Down Expand Up @@ -2073,7 +2070,7 @@ def message_content():
)

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=10)

with sender, receiver:
message = ServiceBusMessageBatch()
Expand All @@ -2086,7 +2083,7 @@ def message_content():
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = []
for message in receiver._get_streaming_message_iter(max_wait_time=10):
for message in receiver:
messages.append(message)
if not messages:
break
Expand Down Expand Up @@ -2136,11 +2133,11 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus
messages = []
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)
break

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)

for message in messages:
Expand All @@ -2154,9 +2151,9 @@ def test_queue_receiver_alive_after_timeout(self, uamqp_transport, *, servicebus
message_3 = ServiceBusMessage("3")
sender.send_messages([message_2, message_3])

for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)
for message in receiver._get_streaming_message_iter():
for message in receiver:
messages.append(message)

assert len(messages) == 4
Expand Down Expand Up @@ -2232,62 +2229,9 @@ def test_queue_receiver_sender_resume_after_link_timeout(self, uamqp_transport,
messages = []
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

for message in receiver._get_streaming_message_iter():
messages.append(message)
assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_queue_receiver_respects_max_wait_time_overrides(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("0")
sender.send_messages(message)

messages = []
def get_time():
return time.time()

with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

time_1 = get_time()
time_3 = time_1 # In case inner loop isn't hit, fail sanely.
for message in receiver._get_streaming_message_iter(max_wait_time=10):
messages.append(message)
receiver.complete_message(message)

time_2 = get_time()
for message in receiver._get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = get_time()
assert timedelta(seconds=.5) < timedelta(seconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = get_time()
assert timedelta(seconds=8) < timedelta(seconds=(time_4 - time_3)) <= timedelta(seconds=11)

for message in receiver._get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = get_time()
assert timedelta(seconds=1) < timedelta(seconds=(time_5 - time_4)) <= timedelta(seconds=4)

for message in receiver:
messages.append(message)
time_6 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_6 - time_5)) <= timedelta(seconds=6)

for message in receiver._get_streaming_message_iter():
messages.append(message)
time_7 = get_time()
assert timedelta(seconds=3) < timedelta(seconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1

assert len(messages) == 2

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
3 changes: 0 additions & 3 deletions sdk/servicebus/azure-servicebus/tests/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ def test_subscription_by_subscription_client_conn_str_receive_basic(self, uamqp_
with pytest.raises(ValueError):
receiver.receive_messages(max_wait_time=-1)

with pytest.raises(ValueError):
receiver._get_streaming_message_iter(max_wait_time=0)

count = 0
for message in receiver:
count += 1
Expand Down