-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EventHubs: Remove timeout in sync producer - it should rely on async retry config #38229
Conversation
API change check API changes are not detected in this pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you~!
@@ -226,7 +223,7 @@ public String getFullyQualifiedNamespace() { | |||
*/ | |||
@ServiceMethod(returns = ReturnType.SINGLE) | |||
public EventHubProperties getEventHubProperties() { | |||
return producer.getEventHubProperties().block(tryTimeout); | |||
return producer.getEventHubProperties().block(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand correctly, inside the async client, there is already a tryTimeout applied when performing this operation, so it's duplicated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, async client already applies retry policy (and checks for timeouts).
I can't validate all possible branches, but getEventHubProperties
uses management channel
Line 161 in c74bb20
return channelMono.flatMap(channel -> channel.sendWithAck(request) |
which uses RetryUtil.withRetry
underneath
Line 340 in ac66fd5
return RetryUtil.withRetry(onActiveEndpoints, retryOptions, activeEndpointTimeoutMessage) |
Lines 68 to 73 in ac66fd5
public static <T> Mono<T> withRetry(Mono<T> source, AmqpRetryOptions retryOptions, String errorMessage, | |
boolean allowsLongOperation) { | |
if (!allowsLongOperation) { | |
source = source.timeout(retryOptions.getTryTimeout()); | |
} | |
return source.retryWhen(createRetry(retryOptions)) |
Getting management channel, creating session, etc is also guarded with timeouts, so if timeout happens, it will be retried based on the retry policy.
E.g.
- first sendWithAck will time out after 30 sec
- next one will succeed
Since we had timeout on the sync client, the second try never had a chance to happen - operation was cancelled without applying retries.
So it's not a duplication, it prevents retry policy from handling transient timeouts
Sync client applied a timeout (try timeout) on blocking calls to create batch or get hub/partition properties.
As a result, underlying async client does not have a chance to finish retries according to its configuration (in case of timeouts/transient issues).
Sync client should not apply additional timeouts and fully rely on the retry config of async clients.