Skip to content

Commit

Permalink
Merge pull request #2831 from ozangunalp/amqp_sender_retry
Browse files Browse the repository at this point in the history
AMQP connector sender reconnects the sender when retrying sends.
  • Loading branch information
ozangunalp authored Nov 26, 2024
2 parents 7948ad7 + 510f8dc commit 0f2e4fd
Show file tree
Hide file tree
Showing 8 changed files with 700 additions and 113 deletions.
14 changes: 10 additions & 4 deletions documentation/src/main/docs/amqp/sending-amqp-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,16 @@ cannot be sent successfully), the message is nacked.

## Back Pressure and Credits

The back-pressure is handled by AMQP *credits*. The outbound connector
only requests the amount of allowed credits. When the amount of credits
reaches 0, it waits (in a non-blocking fashion) until the broker grants
more credits to the AMQP sender.
The back-pressure is handled by the `max-inflight-messages` attribute and AMQP *credits*.
The outbound connector requests messages minimum between `max-inflight-messages` and credits allowed by the broker.
When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.

When `max-inflight-messages` is set to 0, only AMQP credits apply to limit the requests.

Note that if an AMQP message send fails, it is retried until `reconnect-attempts` is reached.
If the client reconnects to the broker during the retry, failing messages are sent again but the message order is not preserved.

To preserve the message order in this case you can set `max-inflight-messages` to

## Configuration Reference

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@
@ConnectorAttribute(name = "durable", direction = OUTGOING, description = "Whether sent AMQP messages are marked durable", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "ttl", direction = OUTGOING, description = "The time-to-live of the send AMQP messages. 0 to disable the TTL", type = "long", defaultValue = "0")
@ConnectorAttribute(name = "credit-retrieval-period", direction = OUTGOING, description = "The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.", type = "int", defaultValue = "2000")
@ConnectorAttribute(name = "max-inflight-messages", type = "long", direction = OUTGOING, description = "The maximum number of messages to be written to the broker concurrently. The number of sent messages waiting to be acknowledged by the broker are limited by this value and credits granted by the broker. The default value `0` means only credits apply.", defaultValue = "0")
@ConnectorAttribute(name = "use-anonymous-sender", direction = OUTGOING, description = "Whether or not the connector should use an anonymous sender. Default value is `true` if the broker supports it, `false` otherwise. If not supported, it is not possible to dynamically change the destination address.", type = "boolean")
@ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "cloud-events-source", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself", alias = "cloud-events-default-source")
@ConnectorAttribute(name = "cloud-events-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself", alias = "cloud-events-default-type")
@ConnectorAttribute(name = "cloud-events-subject", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself", alias = "cloud-events-default-subject")
@ConnectorAttribute(name = "cloud-events-data-content-type", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself", alias = "cloud-events-default-data-content-type")
@ConnectorAttribute(name = "cloud-events-data-schema", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself", alias = "cloud-events-default-data-schema")
@ConnectorAttribute(name = "cloud-events-insert-timestamp", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether or not the connector should insert automatically the `time` attribute into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias = "cloud-events-default-timestamp", defaultValue = "true")
@ConnectorAttribute(name = "cloud-events-mode", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue = "binary")
@ConnectorAttribute(name = "cloud-events-source", type = "string", direction = OUTGOING, description = "Configure the default `source` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `source` attribute itself", alias = "cloud-events-default-source")
@ConnectorAttribute(name = "cloud-events-type", type = "string", direction = OUTGOING, description = "Configure the default `type` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `type` attribute itself", alias = "cloud-events-default-type")
@ConnectorAttribute(name = "cloud-events-subject", type = "string", direction = OUTGOING, description = "Configure the default `subject` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `subject` attribute itself", alias = "cloud-events-default-subject")
@ConnectorAttribute(name = "cloud-events-data-content-type", type = "string", direction = OUTGOING, description = "Configure the default `datacontenttype` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `datacontenttype` attribute itself", alias = "cloud-events-default-data-content-type")
@ConnectorAttribute(name = "cloud-events-data-schema", type = "string", direction = OUTGOING, description = "Configure the default `dataschema` attribute of the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `dataschema` attribute itself", alias = "cloud-events-default-data-schema")
@ConnectorAttribute(name = "cloud-events-insert-timestamp", type = "boolean", direction = OUTGOING, description = "Whether or not the connector should insert automatically the `time` attribute into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias = "cloud-events-default-timestamp", defaultValue = "true")
@ConnectorAttribute(name = "cloud-events-mode", type = "string", direction = OUTGOING, description = "The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue = "binary")

public class AmqpConnector implements InboundConnector, OutboundConnector, HealthReporter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;
Expand All @@ -21,7 +20,7 @@
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.tuples.Tuple3;
import io.smallrye.reactive.messaging.amqp.ce.AmqpCloudEventHelper;
import io.smallrye.reactive.messaging.amqp.tracing.AmqpOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
Expand All @@ -32,7 +31,6 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,

private final ConnectionHolder holder;
private final Uni<AmqpSender> retrieveSender;
private final AtomicLong requested = new AtomicLong();
private final AmqpConnectorOutgoingConfiguration configuration;
private final AmqpConnector connector;

Expand All @@ -59,6 +57,8 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,
*/
private volatile boolean creditRetrievalInProgress = false;

private final long maxInflights;

public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender,
Instance<OpenTelemetry> openTelemetryInstance) {
Expand All @@ -77,6 +77,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,

this.retryAttempts = configuration.getReconnectAttempts();
this.retryInterval = configuration.getReconnectInterval();
this.maxInflights = configuration.getMaxInflightMessages();

if (tracingEnabled) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance);
Expand All @@ -86,8 +87,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
}

@Override
public void subscribe(
Subscriber<? super Message<?>> subscriber) {
public void subscribe(Subscriber<? super Message<?>> subscriber) {
if (!downstream.compareAndSet(null, subscriber)) {
Subscriptions.fail(subscriber, ex.illegalStateOnlyOneSubscriberAllowed());
} else {
Expand Down Expand Up @@ -157,9 +157,10 @@ private long setCreditsAndRequest(AmqpSender sender) {
long credits = sender.remainingCredits();
Subscription subscription = upstream.get();
if (credits != 0L && subscription != Subscriptions.CANCELLED) {
requested.set(credits);
// Request upfront the sender remaining credits or the max inflights
long request = maxInflights > 0 ? Math.min(credits, maxInflights) : credits;
log.retrievedCreditsForChannel(configuration.getChannel(), credits);
subscription.request(credits);
subscription.request(request);
return credits;
}
if (credits == 0L && subscription != Subscriptions.CANCELLED) {
Expand All @@ -168,6 +169,13 @@ private long setCreditsAndRequest(AmqpSender sender) {
return 0L;
}

void requestUpstream() {
Subscription subscription = upstream.get();
if (subscription != null && subscription != Subscriptions.CANCELLED) {
subscription.request(1);
}
}

@Override
public void onNext(Message<?> message) {
if (isCancelled()) {
Expand All @@ -176,28 +184,27 @@ public void onNext(Message<?> message) {

Subscriber<? super Message<?>> subscriber = this.downstream.get();

retrieveSender
.onItem().transformToUni(sender -> {
try {
return send(sender, message, durable, ttl, configuredAddress, isAnonymous)
.onItem().transform(m -> Tuple2.of(sender, m));
} catch (Exception e) {
// Message can be sent - nacking and skipping.
message.nack(e);
log.serializationFailure(configuration.getChannel(), e);
return Uni.createFrom().nullItem();
}
})
.subscribe().with(
tuple -> {
if (tuple != null) { // Serialization issue
subscriber.onNext(tuple.getItem2());
if (requested.decrementAndGet() == 0) { // no more credit, request more
onNoMoreCredit(tuple.getItem1());
}
try {
send(message, durable, ttl, configuredAddress, isAnonymous)
.subscribe().with(tuple -> {
if (tuple != null) { // No serialization issue
subscriber.onNext(tuple.getItem1());
long remainingCredits = tuple.getItem3();
if (remainingCredits == 0) { // no more credit, request more
onNoMoreCredit(tuple.getItem2());
} else { // keep the request one more message
requestUpstream();
}
},
subscriber::onError);
} else {
requestUpstream();
}
}, subscriber::onError);
} catch (Exception e) {
// Message can be sent - nacking and skipping.
message.nack(e);
log.serializationFailure(configuration.getChannel(), e);
requestUpstream();
}
}

private void onNoMoreCredit(AmqpSender sender) {
Expand Down Expand Up @@ -255,11 +262,8 @@ public void request(long l) {
// Delay the retrieval of the sender and the request until we get a request.
if (!once.getAndSet(true)) {
getSenderAndCredits()
.onItem().ignore().andContinueWithNull()
.subscribe().with(s -> {
}, f -> {
downstream.get().onError(f);
});
}, f -> downstream.get().onError(f));
}
}

Expand All @@ -271,7 +275,29 @@ public void cancel() {
}
}

private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable, long ttl, String configuredAddress,
private Uni<Tuple3<Message<?>, AmqpSender, Long>> send(Message<?> msg, boolean durable, long ttl, String configuredAddress,
boolean isAnonymousSender) {
final io.vertx.mutiny.amqp.AmqpMessage amqp = getMessage(msg, durable, ttl, configuredAddress, isAnonymousSender);
if (amqp == null) {
return Uni.createFrom().nullItem();
}
if (tracingEnabled) {
amqpInstrumenter.traceOutgoing(msg, new AmqpMessage<>(amqp, null, null, false, true));
}
return retrieveSender.onItem().transformToUni(s -> s.sendWithAck(amqp)
// We are on Vert.x context that created the client, we can access the remaining credits and update it.
.replaceWith(() -> Tuple3.<Message<?>, AmqpSender, Long> of(msg, s, s.remainingCredits())))
.onFailure().retry().withBackOff(ofSeconds(1), ofSeconds(retryInterval)).atMost(retryAttempts)
.onItemOrFailure().call((s, failure) -> {
if (failure != null) {
return Uni.createFrom().completionStage(msg.nack(failure));
} else {
return Uni.createFrom().completionStage(msg.ack());
}
});
}

private io.vertx.mutiny.amqp.AmqpMessage getMessage(Message<?> msg, boolean durable, long ttl, String configuredAddress,
boolean isAnonymousSender) {
io.vertx.mutiny.amqp.AmqpMessage amqp;
OutgoingCloudEventMetadata<?> ceMetadata = msg.getMetadata(OutgoingCloudEventMetadata.class)
Expand Down Expand Up @@ -306,28 +332,15 @@ private Uni<Message<?>> send(AmqpSender sender, Message<?> msg, boolean durable,
String actualAddress = getActualAddress(msg, amqp, configuredAddress, isAnonymousSender);
if (connector.getClients().isEmpty()) {
log.messageNoSend(actualAddress);
return Uni.createFrom().item(msg);
return null;
}

if (!actualAddress.equals(amqp.address())) {
amqp.getDelegate().unwrap().setAddress(actualAddress);
}

if (tracingEnabled) {
amqpInstrumenter.traceOutgoing(msg, new AmqpMessage<>(amqp, null, null, false, true));
}

log.sendingMessageToAddress(actualAddress);
return sender.sendWithAck(amqp)
.onFailure().retry().withBackOff(ofSeconds(1), ofSeconds(retryInterval)).atMost(retryAttempts)
.onItemOrFailure().transformToUni((success, failure) -> {
if (failure != null) {
return Uni.createFrom().completionStage(msg.nack(failure));
} else {
return Uni.createFrom().completionStage(msg.ack());
}
})
.onItem().transform(x -> msg);
return amqp;
}

private String getActualAddress(Message<?> message, io.vertx.mutiny.amqp.AmqpMessage amqp, String configuredAddress,
Expand Down
Loading

0 comments on commit 0f2e4fd

Please sign in to comment.