Skip to content

Commit

Permalink
Sb track2 schedule multiple message validate batch size (Azure#16767)
Browse files Browse the repository at this point in the history
added validation in schedule.
  • Loading branch information
hemanttanwar authored Oct 29, 2020
1 parent 8be885f commit 1de0f26
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime sche
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param messages Messages to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime OffsetDateTime at which the message should appear in the Service Bus queue or topic.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
Expand All @@ -353,7 +353,7 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param messages Messages to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic.
* @param transactionContext to be set on batch message before scheduling them on Service Bus.
*
Expand All @@ -371,13 +371,26 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return createMessageBatch().flatMapMany(messageBatch -> {
messages.forEach(message -> messageBatch.tryAddMessage(message));
return getSendLink().flatMapMany(link -> connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
messageBatch.getMaxSizeInBytes(), link.getLinkName(), transactionContext)));
});
return getSendLink().flatMapMany(link -> createMessageBatch()
.flatMapMany(messageBatch -> {
int index = 0;
for (ServiceBusMessage message : messages) {
if (!messageBatch.tryAddMessage(message)) {
final String error = String.format(Locale.US,
"Messages exceed max allowed size for all the messages together. "
+ "Failed to add message at index '%s'.", index);
throw logger.logExceptionAsError(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext()));
}
++index;
}

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(),
scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(),
transactionContext));
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -288,6 +290,31 @@ void createsMessageBatchWithSize() {
.verifyComplete();
}

@Test
void scheduleMessageSizeTooBig() {
// Arrange
int maxLinkSize = 1024;
int batchSize = maxLinkSize + 10;

OffsetDateTime instant = mock(OffsetDateTime.class);
final List<ServiceBusMessage> messages = TestUtils.getServiceBusMessages(batchSize, UUID.randomUUID().toString());

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));

when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), any(AmqpRetryOptions.class), isNull()))
.thenReturn(Mono.just(link));
when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));

// Act & Assert
StepVerifier.create(sender.scheduleMessages(messages, instant))
.verifyErrorMatches(throwable -> {
assertTrue(throwable instanceof AmqpException);
assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED);
return true;
});
}

/**
* Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction).
*/
Expand Down

0 comments on commit 1de0f26

Please sign in to comment.