Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHubs: Remove timeout in sync producer - it should rely on async retry config #38229

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Removed timeout from blocking wait in `EventHubProducerClient` in `createBatch`, `getEventHubProperties`, and `getPartitionProperties`. ([#38229](https://github.com/Azure/azure-sdk-for-java/pull/38229))

### Other Changes

## 5.17.1 (2023-12-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ PartitionProperties getPartitionProperties(String partitionId) {
*/
EventHubProducerClient createProducer() {
final EventHubProducerAsyncClient producer = client.createProducer();
return new EventHubProducerClient(producer, retry.getTryTimeout());
return new EventHubProducerClient(producer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.messaging.eventhubs.models.SendOptions;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

/**
Expand Down Expand Up @@ -188,16 +187,14 @@
@ServiceClient(builder = EventHubClientBuilder.class)
public class EventHubProducerClient implements Closeable {
private final EventHubProducerAsyncClient producer;
private final Duration tryTimeout;

/**
* Creates a new instance of {@link EventHubProducerClient} that sends messages to an Azure Event Hub.
*
* @throws NullPointerException if {@code producer} or {@code tryTimeout} is null.
*/
EventHubProducerClient(EventHubProducerAsyncClient producer, Duration tryTimeout) {
EventHubProducerClient(EventHubProducerAsyncClient producer) {
this.producer = Objects.requireNonNull(producer, "'producer' cannot be null.");
this.tryTimeout = Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null.");
}

/**
Expand Down Expand Up @@ -226,7 +223,7 @@ public String getFullyQualifiedNamespace() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventHubProperties getEventHubProperties() {
return producer.getEventHubProperties().block(tryTimeout);
return producer.getEventHubProperties().block();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand correctly, inside the async client, there is already a tryTimeout applied when performing this operation, so it's duplicated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, async client already applies retry policy (and checks for timeouts).
I can't validate all possible branches, but getEventHubProperties uses management channel

which uses RetryUtil.withRetry underneath

return RetryUtil.withRetry(onActiveEndpoints, retryOptions, activeEndpointTimeoutMessage)

public static <T> Mono<T> withRetry(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage,
boolean allowsLongOperation) {
if (!allowsLongOperation) {
source = source.timeout(retryOptions.getTryTimeout());
}
return source.retryWhen(createRetry(retryOptions))

Getting management channel, creating session, etc is also guarded with timeouts, so if timeout happens, it will be retried based on the retry policy.

E.g.

  • first sendWithAck will time out after 30 sec
  • next one will succeed

Since we had timeout on the sync client, the second try never had a chance to happen - operation was cancelled without applying retries.

So it's not a duplication, it prevents retry policy from handling transient timeouts

}

/**
Expand All @@ -249,7 +246,7 @@ public IterableStream<String> getPartitionIds() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public PartitionProperties getPartitionProperties(String partitionId) {
return producer.getPartitionProperties(partitionId).block(tryTimeout);
return producer.getPartitionProperties(partitionId).block();
}

/**
Expand All @@ -259,7 +256,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch() {
return producer.createBatch().block(tryTimeout);
return producer.createBatch().block();
}

/**
Expand All @@ -273,7 +270,7 @@ public EventDataBatch createBatch() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch(CreateBatchOptions options) {
return producer.createBatch(options).block(tryTimeout);
return producer.createBatch(options).block();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -78,7 +79,9 @@ public class EventHubProducerClientTest {
private static final String EVENT_HUB_NAME = "my-event-hub-name";
private static final String CLIENT_IDENTIFIER = "my-client-identifier";
private static final EventHubsProducerInstrumentation DEFAULT_INSTRUMENTATION = new EventHubsProducerInstrumentation(null, null, HOSTNAME, EVENT_HUB_NAME);
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30));
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions()
.setTryTimeout(Duration.ofSeconds(30))
.setDelay(Duration.ofSeconds(1));
private final MessageSerializer messageSerializer = new EventHubMessageSerializer();
@Mock
private AmqpSendLink sendLink;
Expand Down Expand Up @@ -136,7 +139,7 @@ public void teardown() {
@Test
public void sendSingleMessage() {
// Arrange
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

// EC is the prefix they use when creating a link that sends to the service round-robin.
Expand Down Expand Up @@ -172,7 +175,7 @@ public void sendStartSpanSingleMessage() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

// EC is the prefix they use when creating a link that sends to the service round-robin.
Expand Down Expand Up @@ -238,7 +241,7 @@ public void sendMessageRetrySpanTest() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));
eventData.getProperties().put("traceparent", "traceparent");

Expand Down Expand Up @@ -288,7 +291,7 @@ public void sendEventsExceedsBatchSize() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_INSTRUMENTATION);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

//Act & Assert
final Iterable<EventData> tooManyEvents = Flux.range(0, 1024).map(number -> {
Expand All @@ -315,7 +318,7 @@ public void sendMultipleMessages() {

final String partitionId = "partition-id-1";
final SendOptions options = new SendOptions().setPartitionId(partitionId);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(argThat(name -> name.endsWith(partitionId)),
Expand Down Expand Up @@ -365,7 +368,7 @@ public void createsEventDataBatch() {

// This event will be 1025 bytes when serialized.
final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]);
final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient hubProducer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = hubProducer.createBatch();
Expand All @@ -392,7 +395,7 @@ public void startsMessageSpanOnEventBatch() {
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, messageSerializer, Schedulers.parallel(),
false, onClientClosed, CLIENT_IDENTIFIER, instrumentation);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
Expand Down Expand Up @@ -473,7 +476,7 @@ public void createsEventDataBatchWithPartitionKey() {
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("some-key")
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = producer.createBatch(options);
Expand Down Expand Up @@ -509,7 +512,7 @@ public void createsEventDataBatchWithPartitionId() {
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId(partitionId)
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

// Act
final EventDataBatch batch = producer.createBatch(options);
Expand Down Expand Up @@ -542,7 +545,7 @@ public void payloadTooLarge() {
// No idea what the overhead for adding partition key is. But we know this will be smaller than the max size.
final CreateBatchOptions options = new CreateBatchOptions()
.setMaximumSizeInBytes(maxBatchSize);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);
final EventDataBatch batch = producer.createBatch(options);

// Act & Assert
Expand All @@ -553,6 +556,36 @@ public void payloadTooLarge() {
}
}

/**
* Verifies can create a batch on a second try if first fails with transient error.
*/
@Test
public void createBatchWithRetry() {
// Arrange
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer);

AtomicInteger tries = new AtomicInteger();
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any(), eq(CLIENT_IDENTIFIER)))
.thenAnswer(invocation -> {
if (tries.incrementAndGet() == 1) {
return Mono.error(new AmqpException(true, "something bad", new AmqpErrorContext("test-namespace")));
} else {
return Mono.just(sendLink);
}
});

// Act
try {
producer.createBatch();
} finally {
producer.close();
}

// Assert
verify(sendLink, times(1)).getLinkSize();
assertEquals(2, tries.get());
}

private void assertStartOptions(StartSpanOptions startOpts, SpanKind kind, int linkCount) {
assertEquals(kind, startOpts.getSpanKind());
assertEquals(EVENT_HUB_NAME, startOpts.getAttributes().get(ENTITY_PATH_KEY));
Expand Down