Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactive messaging - broker configuration per channel is buggy with kafka healthcheck #15347

Closed
geniusit opened this issue Feb 26, 2021 · 2 comments · Fixed by #15368
Closed

Comments

@geniusit
Copy link

geniusit commented Feb 26, 2021

The kafka healthcheck requires global broker configuration. This is not good.
The healthcheck should work by channel and the global broker configuration has no reason to be when there is a per channel configuration.

Expected behavior
No need to set the global broker configuration if I ve already a by channel configuration.

Actual behavior
When I only configure the broker by channel (no kafka.bootstrap.server), the healthcheck fails with this exception :

mp.messaging.incoming.channel1.connector=smallrye-kafka
mp.messaging.incoming.channel1.bootstrap.servers=broker1
mp.messaging.incoming.channel1.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.channel1.group.id=mybuggygroupid

mp.messaging.outgoing.channel2.connector=smallrye-kafka
mp.messaging.outgoing.channel2.bootstrap.servers=broker2
mp.messaging.outgoing.channel2.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
2021-02-26 09:36:30,493 ERROR [io.sma.health] (vert.x-worker-thread-1) SRHCK01000: Error processing Health Checks: org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:477)
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:142)
        at org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:216)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:71)
        at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
        at io.quarkus.kafka.client.health.KafkaHealthCheck.init(KafkaHealthCheck.java:34)
        at io.quarkus.kafka.client.health.KafkaHealthCheck_Bean.create(KafkaHealthCheck_Bean.zig:279)
        at io.quarkus.kafka.client.health.KafkaHealthCheck_Bean.create(KafkaHealthCheck_Bean.zig:295)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
        at io.quarkus.arc.impl.AbstractSharedContext.access$000(AbstractSharedContext.java:14)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:26)
        at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:17)
        at io.quarkus.kafka.client.health.KafkaHealthCheck_ClientProxy.arc$delegate(KafkaHealthCheck_ClientProxy.zig:67)
        at io.quarkus.kafka.client.health.KafkaHealthCheck_ClientProxy.call(KafkaHealthCheck_ClientProxy.zig:124)
        at io.smallrye.mutiny.operators.uni.builders.SuppliedtemUni.subscribing(SuppliedtemUni.java:28)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.operators.UniOnFailureFlatMap.subscribing(UniOnFailureFlatMap.java:30)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.operators.UniOnItemTransform.subscribing(UniOnItemTransform.java:20)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.operators.UniAndCombination$UniHandler.subscribe(UniAndCombination.java:201)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
        at io.smallrye.mutiny.operators.UniAndCombination$AndSupervisor.run(UniAndCombination.java:69)
        at io.smallrye.mutiny.operators.UniAndCombination$AndSupervisor.access$000(UniAndCombination.java:51)
        at io.smallrye.mutiny.operators.UniAndCombination.subscribing(UniAndCombination.java:48)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$1(SmallRyeThreadContext.java:530)
        at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:47)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:54)
        at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:49)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:30)
        at io.smallrye.mutiny.operators.UniBlockingAwait.await(UniBlockingAwait.java:53)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
        at io.smallrye.health.SmallRyeHealthReporter.getHealth(SmallRyeHealthReporter.java:262)
        at io.smallrye.health.SmallRyeHealthReporter.getHealth(SmallRyeHealthReporter.java:183)
        at io.smallrye.health.SmallRyeHealthReporter_ClientProxy.getHealth(SmallRyeHealthReporter_ClientProxy.zig:278)
        at io.quarkus.smallrye.health.runtime.SmallRyeHealthHandler.getHealth(SmallRyeHealthHandler.java:11)
        at io.quarkus.smallrye.health.runtime.SmallRyeHealthHandlerBase.doHandle(SmallRyeHealthHandlerBase.java:38)
        at io.quarkus.smallrye.health.runtime.SmallRyeHealthHandlerBase.handle(SmallRyeHealthHandlerBase.java:29)
        at io.quarkus.smallrye.health.runtime.SmallRyeHealthHandler.handle(SmallRyeHealthHandler.java:7)
        at io.quarkus.smallrye.health.runtime.SmallRyeHealthHandlerBase.handle(SmallRyeHealthHandlerBase.java:17)
        at io.vertx.ext.web.impl.BlockingHandlerDecorator.lambda$handle$0(BlockingHandlerDecorator.java:48)
        at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

So I change my configuration like this :

Configuration

mp.messaging.incoming.channel1.connector=smallrye-kafka
mp.messaging.incoming.channel1.bootstrap.servers=broker1
mp.messaging.incoming.channel1.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.channel1.group.id=mybuggygroupid

mp.messaging.outgoing.channel2.connector=smallrye-kafka
mp.messaging.outgoing.channel2.bootstrap.servers=broker2
mp.messaging.outgoing.channel2.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

kafka.bootstrap.servers=bootstrap.servers=broker2

mybuggygroupid will belong to the broker2! But it should belong to the broker1. This is because of the global broker configuration.

I set broker2 has global because I prefer to health the broker2 than the broker1. But I would like to health both !

So the workaround is to disable the kafka healthcheck and remove the global configuration
quarkus.kafka.health.enabled=false

@quarkus-bot
Copy link

quarkus-bot bot commented Feb 26, 2021

@cescoffier
Copy link
Member

There a 2 different health checks:

  1. the health check provided by the quarkus-kafka extension, which can be enabled or disabled using quarkus.kafka.health.enabled=false
  2. the health checks coming from reactive messaging, which can be enabled / disabled / configured by channel

If I'm not mistaken, in your case you want to disable (1) and have (2) configured per channel.

I guess we need to extend the documentation about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants