Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WakeupException thrown by Consumer.poll() is wrapped in UndeclaredThrowableException by KafkaTelemetry.wrap() #7450

Closed
cschieb opened this issue Dec 19, 2022 · 1 comment · Fixed by #7452

Comments

@cschieb
Copy link

cschieb commented Dec 19, 2022

Relevant Library Versions

JDK 17
opentelemetry-java-instrumentation:opentelemetry-kafka-clients-2.6:1.18.0-alpha
kafka-clients:3.2.0
reactor-kafka:1.3.12

Issue Description

KafkaConsumer.poll() can throw a WakeupException (unchecked) if wakeup() is called from another thread.

KafkaConsumer.java

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            do {
                // WakeupException can be thrown from here
                client.maybeTriggerWakeup();

ConsumerNetworkClient.java

    public void maybeTriggerWakeup() {
        if (!wakeupDisabled.get() && wakeup.get()) {
            log.debug("Raising WakeupException in response to user wakeup");
            wakeup.set(false);
            throw new WakeupException();
        }
    }

ConsumerEventLoop from reactor-kafka has an explicit catch for WakeupException wherein it simply logs a message and returns an empty record set.

ConsumerEventLoop.java#run()

          ConsumerRecords<K, V> records;
          try {
              records = consumer.poll(pollTimeout);
          } catch (WakeupException e) {
              log.debug("Consumer woken");
              records = ConsumerRecords.empty();
          }

We have a custom Consumer implementation that does some up-front metrics configuration and then just defers all Consumer method calls to KafkaConsumer. We wrap this Consumer implementation using KafkaTelemetry.wrap(Consumer<K,V>) which, from what I can tell, sets up method proxies to handle span creation/propagation.

KafkaTelemetry.java

  public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
    return (Consumer<K, V>)
        Proxy.newProxyInstance(
            KafkaTelemetry.class.getClassLoader(),
            new Class<?>[] {Consumer.class},
            (proxy, method, args) -> {
              Object result = method.invoke(consumer, args);
              // ConsumerRecords<K, V> poll(long timeout)
              // ConsumerRecords<K, V> poll(Duration duration)
              if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
                buildAndFinishSpan((ConsumerRecords) result);
              }
              return result;
            });
  }

The issue occurs when a WakeupException is thrown by poll() - the expectation is that the exception would bubble up to ConsumerEventLoop and be caught and handled accordingly. Instead, Method.invoke() wraps the WakeupException in an InvocationTargetException (which is correct per its documentation). This, being a checked exception that is not defined to be thrown by poll(), is wrapped by InvocationHandler in an UndeclaredThrowableException. The end result is that the UndeclaredThrowableException bubbles up to our application code unexpectedly.

Method.java

     * @throws    InvocationTargetException if the underlying method
     *              throws an exception.
     * @throws    NullPointerException      if the specified object is null
     *              and the method is an instance method.
     * @throws    ExceptionInInitializerError if the initialization
     * provoked by this method fails.
     */
    @CallerSensitive
    @ForceInline // to ensure Reflection.getCallerClass optimization
    @IntrinsicCandidate
    public Object invoke(Object obj, Object... args)
        throws IllegalAccessException, IllegalArgumentException,
           InvocationTargetException
    {

InvocationHandler.java

     * @throws  Throwable the exception to throw from the method
     * invocation on the proxy instance.  The exception's type must be
     * assignable either to any of the exception types declared in the
     * {@code throws} clause of the interface method or to the
     * unchecked exception types {@code java.lang.RuntimeException}
     * or {@code java.lang.Error}.  If a checked exception is
     * thrown by this method that is not assignable to any of the
     * exception types declared in the {@code throws} clause of
     * the interface method, then an
     * {@link UndeclaredThrowableException} containing the
     * exception that was thrown by this method will be thrown by the
     * method invocation on the proxy instance.
     *
     * @see     UndeclaredThrowableException
     */
    public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;

Stacktrace

java.lang.reflect.UndeclaredThrowableException: null
        at jdk.proxy3/jdk.proxy3.$Proxy212.poll(Unknown Source)
        at myproject.MyCustomConsumer.poll(MyCustomConsumer.java) -- Redacted actual classname, this line just calls KafkaConsumer.poll()
        at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:331)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.reflect.InvocationTargetException: null
        at jdk.internal.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry.lambda$wrap$1(KafkaTelemetry.java:111)
        ... 10 common frames omitted
Caused by: org.apache.kafka.common.errors.WakeupException: null
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1306)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1242)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215)
        ... 14 common frames omitted

Proposed Fix

Proposal to fix this would be to unwrap the InvocationTargetException using getCause() in the proxy definition within KafkaTelemetry.java. That way, any unchecked exceptions (and checked exceptions defined on proxied methods) would bubble up with a more clear stacktrace. Any checked exceptions not defined on the proxied method would still be wrapped in an UndeclaredThrowableException, which seems appropriate.

I have not contributed to this project yet, but would be happy to take this on if needed.

@cschieb
Copy link
Author

cschieb commented Dec 22, 2022

Thanks @laurit for the quick turnaround on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant