From a15104c92912777a575d4d046437082859479db9 Mon Sep 17 00:00:00 2001 From: jaakko manninen Date: Mon, 17 May 2021 01:48:44 +0300 Subject: [PATCH] Cleanup polling --- src/client.jl | 12 +++++++++++- src/producer.jl | 8 -------- src/wrapper.jl | 3 --- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/client.jl b/src/client.jl index 9583a4b..95e297e 100644 --- a/src/client.jl +++ b/src/client.jl @@ -47,8 +47,12 @@ function KafkaClient(typ::Integer, conf::Dict=Dict(); dr_cb=nothing, err_cb=noth end rk = kafka_new(c_conf, Cint(typ)) client = KafkaClient(conf, typ, rk) + polling = true # seems like `kafka_destroy` also destroys its config, so we don't attempt it twice - finalizer(client -> kafka_destroy(rk), client) + finalizer(client -> begin + polling = false + kafka_destroy(rk) + end, client) if dr_cb != nothing # set Julia callback after rk is created DELIVERY_CALLBACKS[rk] = dr_cb @@ -56,6 +60,12 @@ function KafkaClient(typ::Integer, conf::Dict=Dict(); dr_cb=nothing, err_cb=noth if err_cb != nothing ERROR_CALLBACKS[rk] = err_cb end + if dr_cb != nothing || err_cb != nothing + @async while polling + kafka_poll(rk, 1000) + sleep(1) + end + end return client end diff --git a/src/producer.jl b/src/producer.jl index 4ec7994..21f135f 100644 --- a/src/producer.jl +++ b/src/producer.jl @@ -8,14 +8,6 @@ end function KafkaProducer(conf::Dict; dr_cb=nothing, err_cb=nothing) kc = KafkaClient(KAFKA_TYPE_PRODUCER, conf; dr_cb=dr_cb, err_cb=err_cb) - if dr_cb != nothing || err_cb != nothing - polling = true - @async while polling - kafka_poll(kc.rk, 1000) - sleep(1) - end - finalizer(kc -> polling = false, kc) - end return KafkaProducer(kc, Dict()) end diff --git a/src/wrapper.jl b/src/wrapper.jl index bba59bc..d26f422 100644 --- a/src/wrapper.jl +++ b/src/wrapper.jl @@ -201,9 +201,6 @@ function kafka_subscribe(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList if errcode != 0 error("Subscription failed with error $errcode") end - # since we use rd_kafka_consumer_poll, redirect the rd_kafka_poll() queue to the consumer queue. - ccall((:rd_kafka_poll_set_consumer, librdkafka), Cint, - (Ptr{Cvoid},), rk) end