Skip to content

Commit

Permalink
Don't let potential failures in message consumers cause dev mode shut…
Browse files Browse the repository at this point in the history
…down to be blocked permanently
  • Loading branch information
jaikiran committed Apr 8, 2021
1 parent 15b9b65 commit 3834312
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigura
if (!messageConsumerConfigurations.isEmpty()) {
EventBus eventBus = vertx.eventBus();
CountDownLatch latch = new CountDownLatch(messageConsumerConfigurations.size());
final List<Throwable> registrationFailures = new ArrayList();
for (Entry<String, ConsumeEvent> entry : messageConsumerConfigurations.entrySet()) {
EventConsumerInvoker invoker = createInvoker(entry.getKey());
String address = entry.getValue().value();
Expand Down Expand Up @@ -117,8 +118,9 @@ public void handle(Promise<Object> event) {

@Override
public void handle(AsyncResult<Void> ar) {
if (ar.succeeded()) {
latch.countDown();
latch.countDown();
if (ar.failed()) {
registrationFailures.add(ar.cause());
}
}
});
Expand All @@ -130,6 +132,10 @@ public void handle(AsyncResult<Void> ar) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Unable to register all message consumer methods", e);
}
if (!registrationFailures.isEmpty()) {
// just log/raise the first failure
throw new RuntimeException("Registration of one or more message consumers failed", registrationFailures.get(0));
}
}
}

Expand All @@ -147,8 +153,9 @@ void unregisterMessageConsumers() {
CountDownLatch latch = new CountDownLatch(messageConsumers.size());
for (MessageConsumer<?> messageConsumer : messageConsumers) {
messageConsumer.unregister(ar -> {
if (ar.succeeded()) {
latch.countDown();
latch.countDown();
if (ar.failed()) {
LOGGER.warn("Message consumer unregistration failed", ar.cause());
}
});
}
Expand Down

0 comments on commit 3834312

Please sign in to comment.