diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 77d5de40514c8..3332ced69344d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -338,7 +338,7 @@ public Mono scheduleMessage(ServiceBusMessage message, OffsetDateTime sche * Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is * enqueued and made available to receivers only at the scheduled enqueue time. * - * @param messages Message to be sent to the Service Bus Queue. + * @param messages Messages to be sent to the Service Bus Queue. * @param scheduledEnqueueTime OffsetDateTime at which the message should appear in the Service Bus queue or topic. * * @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message. @@ -353,7 +353,7 @@ public Flux scheduleMessages(Iterable messages, OffsetD * Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is * enqueued and made available to receivers only at the scheduled enqueue time. * - * @param messages Message to be sent to the Service Bus Queue. + * @param messages Messages to be sent to the Service Bus Queue. * @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic. * @param transactionContext to be set on batch message before scheduling them on Service Bus. * @@ -371,13 +371,26 @@ public Flux scheduleMessages(Iterable messages, OffsetD return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return createMessageBatch().flatMapMany(messageBatch -> { - messages.forEach(message -> messageBatch.tryAddMessage(message)); - return getSendLink().flatMapMany(link -> connectionProcessor - .flatMap(connection -> connection.getManagementNode(entityName, entityType)) - .flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime, - messageBatch.getMaxSizeInBytes(), link.getLinkName(), transactionContext))); - }); + return getSendLink().flatMapMany(link -> createMessageBatch() + .flatMapMany(messageBatch -> { + int index = 0; + for (ServiceBusMessage message : messages) { + if (!messageBatch.tryAddMessage(message)) { + final String error = String.format(Locale.US, + "Messages exceed max allowed size for all the messages together. " + + "Failed to add message at index '%s'.", index); + throw logger.logExceptionAsError(new AmqpException(false, + AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext())); + } + ++index; + } + + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityName, entityType)) + .flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), + scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(), + transactionContext)); + })); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 203b5f8a77ca0..1f832c0702116 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -70,6 +70,8 @@ import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -288,6 +290,31 @@ void createsMessageBatchWithSize() { .verifyComplete(); } + @Test + void scheduleMessageSizeTooBig() { + // Arrange + int maxLinkSize = 1024; + int batchSize = maxLinkSize + 10; + + OffsetDateTime instant = mock(OffsetDateTime.class); + final List messages = TestUtils.getServiceBusMessages(batchSize, UUID.randomUUID().toString()); + + final AmqpSendLink link = mock(AmqpSendLink.class); + when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull())) + .thenReturn(Mono.just(link)); + when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize)); + + // Act & Assert + StepVerifier.create(sender.scheduleMessages(messages, instant)) + .verifyErrorMatches(throwable -> { + assertTrue(throwable instanceof AmqpException); + assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED); + return true; + }); + } + /** * Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction). */