Skip to content

Commit

Permalink
EventHubs: Remove timeout in sync producer - it should rely on async …
Browse files Browse the repository at this point in the history
…retry config (#38229)

* Remove timeout in sync producer - it should rely on async retry config
  • Loading branch information
lmolkova authored Jan 9, 2024
1 parent 458b2da commit b879ce9
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 20 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Removed timeout from blocking wait in `EventHubProducerClient` in `createBatch`, `getEventHubProperties`, and `getPartitionProperties`. ([#38229](https://github.com/Azure/azure-sdk-for-java/pull/38229))

### Other Changes

## 5.17.1 (2023-12-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ PartitionProperties getPartitionProperties(String partitionId) {
*/
EventHubProducerClient createProducer() {
final EventHubProducerAsyncClient producer = client.createProducer();
return new EventHubProducerClient(producer, retry.getTryTimeout());
return new EventHubProducerClient(producer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.messaging.eventhubs.models.SendOptions;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

/**
Expand Down Expand Up @@ -188,16 +187,14 @@
@ServiceClient(builder = EventHubClientBuilder.class)
public class EventHubProducerClient implements Closeable {
private final EventHubProducerAsyncClient producer;
private final Duration tryTimeout;

/**
* Creates a new instance of {@link EventHubProducerClient} that sends messages to an Azure Event Hub.
*
* @throws NullPointerException if {@code producer} or {@code tryTimeout} is null.
*/
EventHubProducerClient(EventHubProducerAsyncClient producer, Duration tryTimeout) {
EventHubProducerClient(EventHubProducerAsyncClient producer) {
this.producer = Objects.requireNonNull(producer, "'producer' cannot be null.");
this.tryTimeout = Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null.");
}

/**
Expand Down Expand Up @@ -226,7 +223,7 @@ public String getFullyQualifiedNamespace() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventHubProperties getEventHubProperties() {
return producer.getEventHubProperties().block(tryTimeout);
return producer.getEventHubProperties().block();
}

/**
Expand All @@ -249,7 +246,7 @@ public IterableStream<String> getPartitionIds() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public PartitionProperties getPartitionProperties(String partitionId) {
return producer.getPartitionProperties(partitionId).block(tryTimeout);
return producer.getPartitionProperties(partitionId).block();
}

/**
Expand All @@ -259,7 +256,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch() {
return producer.createBatch().block(tryTimeout);
return producer.createBatch().block();
}

/**
Expand All @@ -273,7 +270,7 @@ public EventDataBatch createBatch() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch(CreateBatchOptions options) {
return producer.createBatch(options).block(tryTimeout);
return producer.createBatch(options).block();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -78,7 +79,9 @@ public class EventHubProducerClientTest {
private static final String EVENT_HUB_NAME = "my-event-hub-name";
private static final String CLIENT_IDENTIFIER = "my-client-identifier";
private static final EventHubsProducerInstrumentation DEFAULT_INSTRUMENTATION = new EventHubsProducerInstrumentation(null, null, HOSTNAME, EVENT_HUB_NAME);
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30));
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions()
.setTryTimeout(Duration.ofSeconds(30))
.setDelay(Duration.ofSeconds(1));
private final MessageSerializer messageSerializer = new EventHubMessageSerializer();
@Mock
private AmqpSendLink sendLink;
Expand Down Expand Up @@ -136,7 +139,7 @@ public void teardown() {
@Test
public void sendSingleMessage() {
// Arrange
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

// EC is the prefix they use when creating a link that sends to the service round-robin.
Expand Down Expand Up @@ -172,7 +175,7 @@ public void sendStartSpanSingleMessage() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

// EC is the prefix they use when creating a link that sends to the service round-robin.
Expand Down Expand Up @@ -238,7 +241,7 @@ public void sendMessageRetrySpanTest() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));
eventData.getProperties().put("traceparent", "traceparent");

Expand Down Expand Up @@ -288,7 +291,7 @@ public void sendEventsExceedsBatchSize() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_INSTRUMENTATION);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

//Act & Assert
final Iterable<EventData> tooManyEvents = Flux.range(0, 1024).map(number -> {
Expand All @@ -315,7 +318,7 @@ public void sendMultipleMessages() {

final String partitionId = "partition-id-1";
final SendOptions options = new SendOptions().setPartitionId(partitionId);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(argThat(name -> name.endsWith(partitionId)),
Expand Down Expand Up @@ -365,7 +368,7 @@ public void createsEventDataBatch() {

// This event will be 1025 bytes when serialized.
final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]);
final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = hubProducer.createBatch();
Expand All @@ -392,7 +395,7 @@ public void startsMessageSpanOnEventBatch() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
Expand Down Expand Up @@ -473,7 +476,7 @@ public void createsEventDataBatchWithPartitionKey() {
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("some-key")
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = producer.createBatch(options);
Expand Down Expand Up @@ -509,7 +512,7 @@ public void createsEventDataBatchWithPartitionId() {
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId(partitionId)
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = producer.createBatch(options);
Expand Down Expand Up @@ -542,7 +545,7 @@ public void payloadTooLarge() {
// No idea what the overhead for adding partition key is. But we know this will be smaller than the max size.
final CreateBatchOptions options = new CreateBatchOptions()
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventDataBatch batch = producer.createBatch(options);

// Act & Assert
Expand All @@ -553,6 +556,36 @@ public void payloadTooLarge() {
}
}

/**
* Verifies can create a batch on a second try if first fails with transient error.
*/
@Test
public void createBatchWithRetry() {
// Arrange
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

AtomicInteger tries = new AtomicInteger();
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), eq(CLIENT_IDENTIFIER)))
.thenAnswer(invocation -> {
if (tries.incrementAndGet() == 1) {
return Mono.error(new AmqpException(true, "something bad", new AmqpErrorContext("test-namespace")));
} else {
return Mono.just(sendLink);
}
});

// Act
try {
producer.createBatch();
} finally {
producer.close();
}

// Assert
verify(sendLink, times(1)).getLinkSize();
assertEquals(2, tries.get());
}

private void assertStartOptions(StartSpanOptions startOpts, SpanKind kind, int linkCount) {
assertEquals(kind, startOpts.getSpanKind());
assertEquals(EVENT_HUB_NAME, startOpts.getAttributes().get(ENTITY_PATH_KEY));
Expand Down

0 comments on commit b879ce9

Please sign in to comment.