Skip to content

Commit

Permalink
Cleanup polling
Browse files Browse the repository at this point in the history
  • Loading branch information
kschzt committed May 16, 2021
1 parent 5452e0a commit a15104c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
12 changes: 11 additions & 1 deletion src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,25 @@ function KafkaClient(typ::Integer, conf::Dict=Dict(); dr_cb=nothing, err_cb=noth
end
rk = kafka_new(c_conf, Cint(typ))
client = KafkaClient(conf, typ, rk)
polling = true
# seems like `kafka_destroy` also destroys its config, so we don't attempt it twice
finalizer(client -> kafka_destroy(rk), client)
finalizer(client -> begin
polling = false
kafka_destroy(rk)
end, client)
if dr_cb != nothing
# set Julia callback after rk is created
DELIVERY_CALLBACKS[rk] = dr_cb
end
if err_cb != nothing
ERROR_CALLBACKS[rk] = err_cb
end
if dr_cb != nothing || err_cb != nothing
@async while polling
kafka_poll(rk, 1000)
sleep(1)
end
end
return client
end

Expand Down
8 changes: 0 additions & 8 deletions src/producer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ end

function KafkaProducer(conf::Dict; dr_cb=nothing, err_cb=nothing)
kc = KafkaClient(KAFKA_TYPE_PRODUCER, conf; dr_cb=dr_cb, err_cb=err_cb)
if dr_cb != nothing || err_cb != nothing
polling = true
@async while polling
kafka_poll(kc.rk, 1000)
sleep(1)
end
finalizer(kc -> polling = false, kc)
end
return KafkaProducer(kc, Dict())
end

Expand Down
3 changes: 0 additions & 3 deletions src/wrapper.jl
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ function kafka_subscribe(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList
if errcode != 0
error("Subscription failed with error $errcode")
end
# since we use rd_kafka_consumer_poll, redirect the rd_kafka_poll() queue to the consumer queue.
ccall((:rd_kafka_poll_set_consumer, librdkafka), Cint,
(Ptr{Cvoid},), rk)
end


Expand Down

0 comments on commit a15104c

Please sign in to comment.