diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 8fd54266e7dd2..9e71777a523f3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Removed timeout from blocking wait in `EventHubProducerClient` in `createBatch`, `getEventHubProperties`, and `getPartitionProperties`. ([#38229](https://github.com/Azure/azure-sdk-for-java/pull/38229)) + ### Other Changes ## 5.17.1 (2023-12-07) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java index e281c448a8e97..506b23368da14 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java @@ -69,7 +69,7 @@ PartitionProperties getPartitionProperties(String partitionId) { */ EventHubProducerClient createProducer() { final EventHubProducerAsyncClient producer = client.createProducer(); - return new EventHubProducerClient(producer, retry.getTryTimeout()); + return new EventHubProducerClient(producer); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java index 1c8eb1ed3dc40..11b8307942582 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java @@ -12,7 +12,6 @@ import com.azure.messaging.eventhubs.models.SendOptions; import java.io.Closeable; -import java.time.Duration; import java.util.Objects; /** @@ -188,16 +187,14 @@ @ServiceClient(builder = EventHubClientBuilder.class) public class EventHubProducerClient implements Closeable { private final EventHubProducerAsyncClient producer; - private final Duration tryTimeout; /** * Creates a new instance of {@link EventHubProducerClient} that sends messages to an Azure Event Hub. * * @throws NullPointerException if {@code producer} or {@code tryTimeout} is null. */ - EventHubProducerClient(EventHubProducerAsyncClient producer, Duration tryTimeout) { + EventHubProducerClient(EventHubProducerAsyncClient producer) { this.producer = Objects.requireNonNull(producer, "'producer' cannot be null."); - this.tryTimeout = Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null."); } /** @@ -226,7 +223,7 @@ public String getFullyQualifiedNamespace() { */ @ServiceMethod(returns = ReturnType.SINGLE) public EventHubProperties getEventHubProperties() { - return producer.getEventHubProperties().block(tryTimeout); + return producer.getEventHubProperties().block(); } /** @@ -249,7 +246,7 @@ public IterableStream getPartitionIds() { */ @ServiceMethod(returns = ReturnType.SINGLE) public PartitionProperties getPartitionProperties(String partitionId) { - return producer.getPartitionProperties(partitionId).block(tryTimeout); + return producer.getPartitionProperties(partitionId).block(); } /** @@ -259,7 +256,7 @@ public PartitionProperties getPartitionProperties(String partitionId) { */ @ServiceMethod(returns = ReturnType.SINGLE) public EventDataBatch createBatch() { - return producer.createBatch().block(tryTimeout); + return producer.createBatch().block(); } /** @@ -273,7 +270,7 @@ public EventDataBatch createBatch() { */ @ServiceMethod(returns = ReturnType.SINGLE) public EventDataBatch createBatch(CreateBatchOptions options) { - return producer.createBatch(options).block(tryTimeout); + return producer.createBatch(options).block(); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index c83bf38a8932e..af3a939115984 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -43,6 +43,7 @@ import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; @@ -78,7 +79,9 @@ public class EventHubProducerClientTest { private static final String EVENT_HUB_NAME = "my-event-hub-name"; private static final String CLIENT_IDENTIFIER = "my-client-identifier"; private static final EventHubsProducerInstrumentation DEFAULT_INSTRUMENTATION = new EventHubsProducerInstrumentation(null, null, HOSTNAME, EVENT_HUB_NAME); - private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30)); + private final AmqpRetryOptions retryOptions = new AmqpRetryOptions() + .setTryTimeout(Duration.ofSeconds(30)) + .setDelay(Duration.ofSeconds(1)); private final MessageSerializer messageSerializer = new EventHubMessageSerializer(); @Mock private AmqpSendLink sendLink; @@ -136,7 +139,7 @@ public void teardown() { @Test public void sendSingleMessage() { // Arrange - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); // EC is the prefix they use when creating a link that sends to the service round-robin. @@ -172,7 +175,7 @@ public void sendStartSpanSingleMessage() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, instrumentation); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); // EC is the prefix they use when creating a link that sends to the service round-robin. @@ -238,7 +241,7 @@ public void sendMessageRetrySpanTest() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, instrumentation); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); final EventData eventData = new EventData("hello-world".getBytes(UTF_8)); eventData.getProperties().put("traceparent", "traceparent"); @@ -288,7 +291,7 @@ public void sendEventsExceedsBatchSize() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_INSTRUMENTATION); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); //Act & Assert final Iterable tooManyEvents = Flux.range(0, 1024).map(number -> { @@ -315,7 +318,7 @@ public void sendMultipleMessages() { final String partitionId = "partition-id-1"; final SendOptions options = new SendOptions().setPartitionId(partitionId); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); // EC is the prefix they use when creating a link that sends to the service round-robin. when(connection.createSendLink(argThat(name -> name.endsWith(partitionId)), @@ -365,7 +368,7 @@ public void createsEventDataBatch() { // This event will be 1025 bytes when serialized. final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]); - final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer); // Act final EventDataBatch batch = hubProducer.createBatch(); @@ -392,7 +395,7 @@ public void startsMessageSpanOnEventBatch() { final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, instrumentation); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); final AmqpSendLink link = mock(AmqpSendLink.class); when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES)); @@ -473,7 +476,7 @@ public void createsEventDataBatchWithPartitionKey() { final CreateBatchOptions options = new CreateBatchOptions() .setPartitionKey("some-key") .setMaximumSizeInBytes(maxBatchSize); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); // Act final EventDataBatch batch = producer.createBatch(options); @@ -509,7 +512,7 @@ public void createsEventDataBatchWithPartitionId() { final CreateBatchOptions options = new CreateBatchOptions() .setPartitionId(partitionId) .setMaximumSizeInBytes(maxBatchSize); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); // Act final EventDataBatch batch = producer.createBatch(options); @@ -542,7 +545,7 @@ public void payloadTooLarge() { // No idea what the overhead for adding partition key is. But we know this will be smaller than the max size. final CreateBatchOptions options = new CreateBatchOptions() .setMaximumSizeInBytes(maxBatchSize); - final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); final EventDataBatch batch = producer.createBatch(options); // Act & Assert @@ -553,6 +556,36 @@ public void payloadTooLarge() { } } + /** + * Verifies can create a batch on a second try if first fails with transient error. + */ + @Test + public void createBatchWithRetry() { + // Arrange + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer); + + AtomicInteger tries = new AtomicInteger(); + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), eq(CLIENT_IDENTIFIER))) + .thenAnswer(invocation -> { + if (tries.incrementAndGet() == 1) { + return Mono.error(new AmqpException(true, "something bad", new AmqpErrorContext("test-namespace"))); + } else { + return Mono.just(sendLink); + } + }); + + // Act + try { + producer.createBatch(); + } finally { + producer.close(); + } + + // Assert + verify(sendLink, times(1)).getLinkSize(); + assertEquals(2, tries.get()); + } + private void assertStartOptions(StartSpanOptions startOpts, SpanKind kind, int linkCount) { assertEquals(kind, startOpts.getSpanKind()); assertEquals(EVENT_HUB_NAME, startOpts.getAttributes().get(ENTITY_PATH_KEY));