Skip to content

Commit

Permalink
Clarify MessageProducer write future behavior (eclipse-vertx#4935)
Browse files Browse the repository at this point in the history
See eclipse-vertx#4922

When publishing, the OutboundDeliveryContext (promise) can be completed or failed several times. This leads to confusing exceptions in application logs.

The promise should be completed with tryFail/tryComplete to avoid redundant exceptions.

Besides, the MessageProducer write future behavior should be clarified.
The returned Future completion depends on the producer type:

- send or request: the future is completed successfully if the message has been written; otherwise, the future is failed
-  publish: the future is failed if there is no recipient; otherwise, the future is completed successfully

In any case, a successfully completed Future is not a delivery guarantee.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont authored Nov 9, 2023
1 parent 24ca7d4 commit 4fbfe00
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 16 deletions.
15 changes: 9 additions & 6 deletions src/main/java/io/vertx/core/eventbus/MessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.streams.WriteStream;

/**
* Represents a stream of message that can be written to.
Expand Down Expand Up @@ -45,9 +41,16 @@ public interface MessageProducer<T> {
/**
* Write a message to the event-bus, either sending or publishing.
*
* The returned {@link Future} completion depends on the producer type:
*
* <ul>
* <li>send or request: the future is completed successfully if the message has been written; otherwise, the future is failed</li>
* <li>publish: the future is failed if there is no recipient; otherwise, the future is completed successfully</li>
* </ul>
*
* In any case, a successfully completed {@link Future} is not a delivery guarantee.
*
* @param body the message body
* @return a future called when the message has been successfully or failed to be written, this is not a delivery
* guarantee
*/
Future<Void> write(T body);

Expand Down
17 changes: 15 additions & 2 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

package io.vertx.core.eventbus.impl;

import io.vertx.core.*;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -436,7 +439,17 @@ public <T> Future<Void> sendOrPubInternal(MessageImpl message, DeliveryOptions o
checkStarted();
OutboundDeliveryContext<T> ctx = newSendContext(message, options, handler);
sendOrPubInternal(ctx);
return ctx.writePromise.future();
Future<Void> future = ctx.writePromise.future();
if (message.send) {
return future;
}
return future.recover(throwable -> {
// For publish, we only care if there are no handlers
if (throwable instanceof ReplyException) {
return Future.failedFuture(throwable);
}
return Future.succeededFuture();
});
}

private Future<Void> unregisterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
*/
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.ReplyException;
Expand Down Expand Up @@ -95,9 +93,9 @@ private void written(Throwable failure) {
// Notify promise finally
if (writePromise != null) {
if (failure == null) {
writePromise.complete();
writePromise.tryComplete();
} else {
writePromise.fail(failure);
writePromise.tryFail(failure);
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableObject;
import io.vertx.core.shareddata.AsyncMapTest.SomeSerializableObject;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.core.spi.cluster.WrappedClusterManager;
import io.vertx.core.spi.cluster.WrappedNodeSelector;
import io.vertx.core.spi.cluster.*;
import io.vertx.test.core.TestUtils;
import io.vertx.test.tls.Cert;
import org.junit.Test;
Expand Down Expand Up @@ -520,6 +517,38 @@ public void init(Vertx vertx, NodeSelector nodeSelector) {
assertFalse(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1));
}

@Test
public void testPublisherCanReceiveNoHandlersFailure() {
Supplier<VertxOptions> options = () -> getOptions().setClusterManager(new WrappedClusterManager(getClusterManager()) {
@Override
public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
promise.complete();
}
});
startNodes(options.get(), options.get());

MessageConsumer<Object> consumer0 = vertices[0].eventBus().consumer("foo", msg -> {
fail();
});
MessageConsumer<Object> consumer1 = vertices[1].eventBus().consumer("foo", msg -> {
fail();
});
consumer0.completion().compose(v -> consumer1.completion()).onComplete(onSuccess(regs -> {
consumer0.unregister().compose(v -> consumer1.unregister()).onComplete(onSuccess(unregs -> {
vertices[0].eventBus().publisher("foo").write("bar").onComplete(onFailure(t -> {
if (t instanceof ReplyException) {
ReplyException replyException = (ReplyException) t;
assertEquals(ReplyFailure.NO_HANDLERS, replyException.failureType());
testComplete();
} else {
fail();
}
}));
}));
}));
await();
}

@Test
public void testLocalConsumerNeverGetsMessagePublishedFromRemote() throws Exception {
startNodes(2);
Expand Down

0 comments on commit 4fbfe00

Please sign in to comment.