From 4fbfe008cb2d9ee27eb6a29318f7f728b52c1065 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 9 Nov 2023 10:13:38 +0100 Subject: [PATCH] Clarify MessageProducer write future behavior (#4935) See #4922 When publishing, the OutboundDeliveryContext (promise) can be completed or failed several times. This leads to confusing exceptions in application logs. The promise should be completed with tryFail/tryComplete to avoid redundant exceptions. Besides, the MessageProducer write future behavior should be clarified. The returned Future completion depends on the producer type: - send or request: the future is completed successfully if the message has been written; otherwise, the future is failed - publish: the future is failed if there is no recipient; otherwise, the future is completed successfully In any case, a successfully completed Future is not a delivery guarantee. Signed-off-by: Thomas Segismont --- .../vertx/core/eventbus/MessageProducer.java | 15 +++++--- .../core/eventbus/impl/EventBusImpl.java | 17 ++++++++- .../impl/OutboundDeliveryContext.java | 6 +-- .../core/eventbus/ClusteredEventBusTest.java | 37 +++++++++++++++++-- 4 files changed, 59 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/MessageProducer.java b/src/main/java/io/vertx/core/eventbus/MessageProducer.java index 56eb097ee21..812b3c16b55 100644 --- a/src/main/java/io/vertx/core/eventbus/MessageProducer.java +++ b/src/main/java/io/vertx/core/eventbus/MessageProducer.java @@ -13,11 +13,7 @@ import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.http.RequestOptions; -import io.vertx.core.streams.WriteStream; /** * Represents a stream of message that can be written to. @@ -45,9 +41,16 @@ public interface MessageProducer { /** * Write a message to the event-bus, either sending or publishing. * + * The returned {@link Future} completion depends on the producer type: + * + *
    + *
  • send or request: the future is completed successfully if the message has been written; otherwise, the future is failed
  • + *
  • publish: the future is failed if there is no recipient; otherwise, the future is completed successfully
  • + *
+ * + * In any case, a successfully completed {@link Future} is not a delivery guarantee. + * * @param body the message body - * @return a future called when the message has been successfully or failed to be written, this is not a delivery - * guarantee */ Future write(T body); diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index c9427329d5a..a32e218ecf1 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -11,7 +11,10 @@ package io.vertx.core.eventbus.impl; -import io.vertx.core.*; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; import io.vertx.core.eventbus.*; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; @@ -436,7 +439,17 @@ public Future sendOrPubInternal(MessageImpl message, DeliveryOptions o checkStarted(); OutboundDeliveryContext ctx = newSendContext(message, options, handler); sendOrPubInternal(ctx); - return ctx.writePromise.future(); + Future future = ctx.writePromise.future(); + if (message.send) { + return future; + } + return future.recover(throwable -> { + // For publish, we only care if there are no handlers + if (throwable instanceof ReplyException) { + return Future.failedFuture(throwable); + } + return Future.succeededFuture(); + }); } private Future unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index 29c26f40803..495eeda5cbd 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -10,9 +10,7 @@ */ package io.vertx.core.eventbus.impl; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.ReplyException; @@ -95,9 +93,9 @@ private void written(Throwable failure) { // Notify promise finally if (writePromise != null) { if (failure == null) { - writePromise.complete(); + writePromise.tryComplete(); } else { - writePromise.fail(failure); + writePromise.tryFail(failure); } } } diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java index 4063e6c5e9c..2793b3afa85 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java @@ -15,10 +15,7 @@ import io.vertx.core.impl.VertxInternal; import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableObject; import io.vertx.core.shareddata.AsyncMapTest.SomeSerializableObject; -import io.vertx.core.spi.cluster.NodeSelector; -import io.vertx.core.spi.cluster.RegistrationUpdateEvent; -import io.vertx.core.spi.cluster.WrappedClusterManager; -import io.vertx.core.spi.cluster.WrappedNodeSelector; +import io.vertx.core.spi.cluster.*; import io.vertx.test.core.TestUtils; import io.vertx.test.tls.Cert; import org.junit.Test; @@ -520,6 +517,38 @@ public void init(Vertx vertx, NodeSelector nodeSelector) { assertFalse(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1)); } + @Test + public void testPublisherCanReceiveNoHandlersFailure() { + Supplier options = () -> getOptions().setClusterManager(new WrappedClusterManager(getClusterManager()) { + @Override + public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise promise) { + promise.complete(); + } + }); + startNodes(options.get(), options.get()); + + MessageConsumer consumer0 = vertices[0].eventBus().consumer("foo", msg -> { + fail(); + }); + MessageConsumer consumer1 = vertices[1].eventBus().consumer("foo", msg -> { + fail(); + }); + consumer0.completion().compose(v -> consumer1.completion()).onComplete(onSuccess(regs -> { + consumer0.unregister().compose(v -> consumer1.unregister()).onComplete(onSuccess(unregs -> { + vertices[0].eventBus().publisher("foo").write("bar").onComplete(onFailure(t -> { + if (t instanceof ReplyException) { + ReplyException replyException = (ReplyException) t; + assertEquals(ReplyFailure.NO_HANDLERS, replyException.failureType()); + testComplete(); + } else { + fail(); + } + })); + })); + })); + await(); + } + @Test public void testLocalConsumerNeverGetsMessagePublishedFromRemote() throws Exception { startNodes(2);