Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addressing the issue of not terminating unconfirmed-sends due to the termination of link with no error #24141

Merged
merged 2 commits into from
Sep 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
Expand Down Expand Up @@ -37,6 +38,7 @@
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -50,37 +52,53 @@

/**
* Represents a bidirectional link between the message broker and the client. Allows client to send a request to the
* broker and receive the associated response.
* broker and receive the associated response. The {@link RequestResponseChannel} composes a proton-j {@link Sender}
* link and {@link Receiver} link.
*/
public class RequestResponseChannel implements AsyncCloseable {
private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends =
new ConcurrentSkipListMap<>();
private final AtomicBoolean hasError = new AtomicBoolean();
private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
private final ClientLogger logger = new ClientLogger(RequestResponseChannel.class);

// The request response channel is closed when both the receive and send link component are disposed of.
private final AtomicInteger pendingDisposes = new AtomicInteger(2);
private final Sinks.One<Void> closeMono = Sinks.one();

private final Sender sendLink;
private final Receiver receiveLink;
private final String replyTo;
private final MessageSerializer messageSerializer;
private final AmqpRetryOptions retryOptions;
private final ReactorProvider provider;
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AtomicLong requestId = new AtomicLong(0);
private final SendLinkHandler sendLinkHandler;
private final ReceiveLinkHandler receiveLinkHandler;
private final Disposable.Composite subscriptions;
private final SenderSettleMode senderSettleMode;
private final String linkName;
// The request-response-channel endpoint states derived from the latest state of the send and receive links.
private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
// The latest state of the send and receive links.
private volatile AmqpEndpointState sendLinkState;
private volatile AmqpEndpointState receiveLinkState;
// Generates unique Id for each message send over the request-response-channel.
private final AtomicLong requestId = new AtomicLong(0);
// Tracks the sends that are not yet acknowledged by the broker. Map key is the unique Id
// of the send and value is the MonoSink to notify upon broker acknowledgment.
private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends =
new ConcurrentSkipListMap<>();

// Tracks the count of links that is not terminated yet. Once both the receive and send links
// are terminated (i.e. pendingLinkTerminations is zero), the request-response-channel is
// considered as terminated.
private final AtomicInteger pendingLinkTerminations = new AtomicInteger(2);
// The Mono that completes once the request-response-channel is terminated.
private final Sinks.One<Void> closeMono = Sinks.one();
// A flag indicating that an error in either of the links caused link to terminate.
private final AtomicBoolean hasError = new AtomicBoolean();
// A flag indicating that the request-response-channel is closed (after the call to closeAsync()).
private final AtomicBoolean isDisposed = new AtomicBoolean();
// Tracks all subscriptions listening for events from various endpoints (sender, receiver & connection),
// those subscriptions should be disposed when the request-response-channel terminates.
private final Disposable.Composite subscriptions;

private final String connectionId;
private final String linkName;
private final AmqpRetryOptions retryOptions;
private final String replyTo;
private final String activeEndpointTimeoutMessage;

private volatile AmqpEndpointState sendLinkEndpoint;
private volatile AmqpEndpointState receiveLinkEndpoint;
private final MessageSerializer messageSerializer;
// The API calls on proton-j entities (e.g., Sender, Receiver) must happen in the non-blocking thread
// (aka ReactorThread) assigned to the connection's org.apache.qpid.proton.reactor.Reactor object.
// The provider exposes ReactorDispatcher that can schedule such calls on the ReactorThread.
private final ReactorProvider provider;

/**
* Creates a new instance of {@link RequestResponseChannel} to send and receive responses from the {@code
Expand Down Expand Up @@ -113,22 +131,24 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio

this.replyTo = entityPath.replace("$", "") + "-client-reply-to";
this.messageSerializer = messageSerializer;

// Setup send (request) link.
this.sendLink = session.sender(linkName + ":sender");
final Target target = new Target();
target.setAddress(entityPath);
this.sendLink.setTarget(target);
sendLink.setSource(new Source());
final Target senderTarget = new Target();
senderTarget.setAddress(entityPath);
this.sendLink.setTarget(senderTarget);
this.sendLink.setSource(new Source());
this.sendLink.setSenderSettleMode(senderSettleMode);

this.sendLinkHandler = handlerProvider.createSendLinkHandler(connectionId, fullyQualifiedNamespace, linkName,
entityPath);

BaseHandler.setHandler(sendLink, sendLinkHandler);

// Setup receive (response) link.
this.receiveLink = session.receiver(linkName + ":receiver");
final Source source = new Source();
source.setAddress(entityPath);
this.receiveLink.setSource(source);
final Source receiverSource = new Source();
receiverSource.setAddress(entityPath);
this.receiveLink.setSource(receiverSource);

final Target receiverTarget = new Target();
receiverTarget.setAddress(replyTo);
Expand All @@ -138,8 +158,10 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio

this.receiveLinkHandler = handlerProvider.createReceiveLinkHandler(connectionId, fullyQualifiedNamespace,
linkName, entityPath);
BaseHandler.setHandler(this.receiveLink, receiveLinkHandler);
BaseHandler.setHandler(receiveLink, receiveLinkHandler);

// Subscribe to the events from endpoints (Sender, Receiver & Connection) and track the subscriptions.
//
//@formatter:off
this.subscriptions = Disposables.composite(
receiveLinkHandler.getDeliveredMessages()
Expand Down Expand Up @@ -180,12 +202,13 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
);
//@formatter:on

// If we try to do proton-j API calls such as opening/closing/sending on AMQP links, it may
// encounter a race condition. So, we are forced to use the dispatcher.
// Open send and receive links.
//
// Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
try {
provider.getReactorDispatcher().invoke(() -> {
sendLink.open();
receiveLink.open();
this.provider.getReactorDispatcher().invoke(() -> {
this.sendLink.open();
this.receiveLink.open();
});
} catch (IOException e) {
throw logger.logExceptionAsError(new RuntimeException(String.format(
Expand All @@ -194,9 +217,9 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
}

/**
* Gets the endpoint states for the request/response channel.
* Gets the endpoint states for the request-response-channel.
*
* @return The endpoint states for the request/response channel.
* @return The endpoint states for the request-response-channel.
*/
public Flux<AmqpEndpointState> getEndpointStates() {
return endpointStates.asFlux();
Expand Down Expand Up @@ -227,6 +250,7 @@ public Mono<Void> closeAsync() {

return Mono.fromRunnable(() -> {
try {
// Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
provider.getReactorDispatcher().invoke(() -> {
logger.verbose("connectionId[{}] linkName[{}] Closing send link and receive link.",
connectionId, linkName);
Expand Down Expand Up @@ -286,19 +310,18 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
message.setMessageId(messageId);
message.setReplyTo(replyTo);

final Mono<Void> activeEndpoints = Mono.when(
final Mono<Void> onActiveEndpoints = Mono.when(
sendLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE),
receiveLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE));

return RetryUtil.withRetry(activeEndpoints, retryOptions, activeEndpointTimeoutMessage)
return RetryUtil.withRetry(onActiveEndpoints, retryOptions, activeEndpointTimeoutMessage)
.then(Mono.create(sink -> {
try {
logger.verbose("connectionId[{}], linkName[{}]: Scheduling on dispatcher. MessageId[{}]",
connectionId, linkName, messageId);
unconfirmedSends.putIfAbsent(messageId, sink);

// If we try to do proton-j API calls such as sending on AMQP links, it may encounter a race
// condition. So, we are forced to use the dispatcher.
// Schedule API calls on proton-j entities on the ReactorThread associated with the connection.
provider.getReactorDispatcher().invoke(() -> {
final Delivery delivery = sendLink.delivery(UUID.randomUUID().toString()
.replace("-", "").getBytes(UTF_8));
Expand Down Expand Up @@ -379,25 +402,28 @@ private void handleError(Throwable error, String message) {
return false;
});

unconfirmedSends.forEach((key, value) -> value.error(error));
unconfirmedSends.clear();
terminateUnconfirmedSends(error);

closeAsync().subscribe();
}

private void onTerminalState(String handlerName) {
if (pendingDisposes.get() == 0) {
if (pendingLinkTerminations.get() <= 0) {
logger.verbose("connectionId[{}] linkName[{}]: Already disposed send/receive links.");
return;
}

final int remaining = pendingDisposes.decrementAndGet();
final int remaining = pendingLinkTerminations.decrementAndGet();
logger.verbose("connectionId[{}] linkName[{}]: {} disposed. Remaining: {}",
connectionId, linkName, handlerName, remaining);

if (remaining == 0) {
subscriptions.dispose();

terminateUnconfirmedSends(new AmqpException(true,
"The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.",
null));

endpointStates.emitComplete(((signalType, emitResult) -> onEmitSinkFailure(signalType, emitResult,
"Could not emit complete signal.")));

Expand All @@ -413,18 +439,36 @@ private boolean onEmitSinkFailure(SignalType signalType, Sinks.EmitResult emitRe
return false;
}

private synchronized void updateEndpointState(AmqpEndpointState newSendLink, AmqpEndpointState newReceiveLink) {
if (newSendLink != null) {
sendLinkEndpoint = newSendLink;
} else if (newReceiveLink != null) {
receiveLinkEndpoint = newReceiveLink;
// Derive and emits the endpoint state for this RequestResponseChannel from the current endpoint state
// of send and receive links.
private synchronized void updateEndpointState(AmqpEndpointState sendLinkState, AmqpEndpointState receiveLinkState) {
if (sendLinkState != null) {
this.sendLinkState = sendLinkState;
} else if (receiveLinkState != null) {
this.receiveLinkState = receiveLinkState;
}

logger.verbose("connectionId[{}] linkName[{}] sendState[{}] receiveState[{}] Updating endpoint states.",
connectionId, linkName, sendLinkEndpoint, receiveLinkEndpoint);
connectionId, linkName, this.sendLinkState, this.receiveLinkState);

if (this.sendLinkState == this.receiveLinkState) {
this.endpointStates.emitNext(this.sendLinkState, Sinks.EmitFailureHandler.FAIL_FAST);
}
}

if (sendLinkEndpoint == receiveLinkEndpoint) {
endpointStates.emitNext(sendLinkEndpoint, Sinks.EmitFailureHandler.FAIL_FAST);
// Terminate the unconfirmed MonoSinks by notifying the given error.
private void terminateUnconfirmedSends(Throwable error) {
logger.verbose("connectionId[{}] linkName[{}] terminating {} unconfirmed sends (reason: {}).",
connectionId, linkName, unconfirmedSends.size(), error.getMessage());
Map.Entry<UnsignedLong, MonoSink<Message>> next;
int count = 0;
while ((next = unconfirmedSends.pollFirstEntry()) != null) {
// pollFirstEntry: atomic retrieve and remove of each entry.
next.getValue().error(error);
count++;
}
// The below log can also help debug if the external code that error() calls into never return.
logger.verbose("connectionId[{}] linkName[{}] completed the termination of {} unconfirmed sends (reason: {}).",
connectionId, linkName, count, error.getMessage());
}
}