Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delivery & error callbacks and polling #13

Merged
merged 6 commits into from
May 18, 2021
Merged

Conversation

kschzt
Copy link
Contributor

@kschzt kschzt commented May 16, 2021

  • Fix delivery cb and implement error callback
  • Add @async polling Task if callbacks are used
  • Add @test for error callback
  • Document callbacks and seek()
  • Re-export produce()

kschzt added 2 commits May 17, 2021 00:54
document callbacks and seeking
add test for error callback
@kschzt kschzt changed the title Delivery & error callbacks Fix delivery & error callbacks and polling May 16, 2021
@dfdx dfdx mentioned this pull request May 17, 2021
@dfdx
Copy link
Owner

dfdx commented May 17, 2021

I added this as an inline comment previously, but I'm not sure you can see it. Sorry if I'm repeating it.

@async while polling
    kafka_poll(rk, 0)
    sleep(1)    
end

@async runs a task in the same thread making a it a subject for cooperative concurrency. For example, if we run a task with @async in REPL, the REPL will switch to other tasks every now and then. However, if one of the tasks runs non-cooperatively, keeping the thread for a long time, all other tasks will be frozen. To see it, try the following:

# run a loop in a separate task
# sleep() makes a scheduler to switch to other tasks
@async while true 
    println("working")
    sleep(1) 
end
# run a blocking loop in the current task
for i=1:1_000_000_000
    rand(1000)
end

While the current task makes the computation, the the additional task doesn't print anything.

I never had a chance to try it, but Threads.@spawn should behave exactly like @async except for running the new task on any available thread. This might not be critical given the IO nature of producer/consumer, but I think it's still better to be on the safe side.

(Note that my previous code using the Timer was also exposed to this issue).

@kschzt
Copy link
Contributor Author

kschzt commented May 18, 2021

I've switched it to use Threads.@Spawn instead of @async. There are no locks around rk or polling so I'm left with a small doubt about thread-safety, but it does work in a consumer + transformer + producer setting so ...

@kschzt
Copy link
Contributor Author

kschzt commented May 18, 2021

Though, using rk from both threads works and is documented to be thread-safe in librdkafka, and polling is only changed in the finalizer, so I'm happy.

@kschzt
Copy link
Contributor Author

kschzt commented May 18, 2021

@attdona Thanks for the great point about exceptions in #14. It led me to try and catch an error thrown in the thread, which I wasn't able to do (so far). Catching errors from threads doesn't seem possible at the moment (!) so I'm reverting this to an @async Task. At least it will irregularly poll, which should be fine as long as the session.timeout.ms isn't crossed. The Task is only activated if either callback is set. Doing this with a thread needs JuliaLang/julia#33248, from what I can tell.

@dfdx dfdx merged commit e0fc8b4 into dfdx:master May 18, 2021
@kschzt
Copy link
Contributor Author

kschzt commented May 18, 2021

@dfdx I did find a way to catch the error from the thread like so:

    @async try 
        t=Threads.@spawn while polling
            kafka_poll(rk, 0)
            sleep(1)
            throw(error("foo"))
        end
        wait(t)
    catch(e)
        bt = stacktrace(catch_backtrace())
        showerror(stderr, e, bt)
        rethrow(e)
    end

ie. a Task to wait for the thread to complete. But to catch the exception and crash the app, the application would also have to use an @async try block around the KafkaConsumer/Producer. I think going with a Task is best for now.

@dfdx
Copy link
Owner

dfdx commented May 18, 2021

Yeah, until we run into a specific issue, I believe single-threaded version is the safest one.

Thank you for driving this change!

@attdona
Copy link

attdona commented May 19, 2021

I think now it is more difficult to have an uncatchable exception, but this is not completely safe ...

For what I know @async function and timer callback suffer the same problem: when they terminate the internal julia "kernel" does keep them around in a "NOT RUNNABLE" state:

if they are the last active task (the last task run by the scheduler) the SIGINT is delivered to such not runnable task and this cause a fatal: error thrown and no exception handler available.

You should reproduce this issue with this snippet. It is a contrived example just for expose the problem.

using RDKafka

Base.exit_on_sigint(false)

function report(msg)
    @info "report got: $msg"
end 

consumer = KafkaConsumer("<server>:9092", "my-consumer-group", err_cb=report)
try
    parlist = [("<topic>", 0)]
    subscribe(consumer, parlist)
    timeout_ms = 100
    while true
        msg = poll(String, String, consumer, timeout_ms)
        @show(msg)
        sleep(200)
    end
catch e
    # just trace InterruptException
    @info e
end

result (Ctrl-C after waiting a second after the msg = nothing trace):

[ Info: Activating project in /home/sint/dev/youman
  Activating environment at `~/dev/youman/Project.toml`
[ Info: Importing Revise
[ Info: Importing OhMyREPL
msg = nothing
^C^Cfatal: error thrown and no exception handler available.
InterruptException()
jl_mutex_unlock at /buildworker/worker/package_linux64/build/src/locks.h:134 [inlined]
jl_task_get_next at /buildworker/worker/package_linux64/build/src/partr.c:475
poptask at ./task.jl:760
wait at ./task.jl:768 [inlined]
task_done_hook at ./task.jl:494
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
jl_finish_task at /buildworker/worker/package_linux64/build/src/task.c:208
start_task at /buildworker/worker/package_linux64/build/src/task.c:850
unknown function (ip: (nil))

@dfdx
Copy link
Owner

dfdx commented May 19, 2021

But if we start the polling in the KafkaClient and stop it in the dedicated close(::KafkaClient) method (instead of finalizer) as in #14, there will be no tasks in the NOT RUNNABLE state and thus SIGINT won't lead to this error, right?

Also note that due to polling = dr_cb != nothing || err_cb != nothing the kafka_poll will only be invoked when a user provides the callbacks. If we document it properly, a user will at least have a choice to use callbacks or make the program properly interruptable with Ctrl+C (which should be fine e.g. for consumers).


As a side note, here's the relevant issue in the Julia repo: JuliaLang/julia#34184

@attdona
Copy link

attdona commented May 20, 2021

I Agree, I think a close method is much better than a finalizer, but mainly because the user has more control of "kafka" handle.

To avoid the problem error thrown and no exception handler available the only thing should be remove @async task.

For a consumer app I'll reconsidered the idea of @kschzt of using rd_kafka_poll_set_consumer (#14 (comment) ) because for what I understand kafka will block delivery of kafka_poll callabacks if there is a blocking rd_kafka_consumer_poll().

Since the consumer will pass most of the time polling for messages I suppose that the reason why exists rd_kafka_poll_set_consumer it is for delivering as soon as possible error and info messages to the user (poll() in consumer is ok but suboptimal?)

@dfdx
Copy link
Owner

dfdx commented May 20, 2021

rd_kafka_poll_set_consumer() is marked as experimental but may work for a consumer. For a producer, we need to run kafka_poll() at regular intervals anyway, so I don't see how we can avoid running it in either a task or a thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants