-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add broker selection and client termination [KIP-714] #4382
Conversation
src/rdkafka_telemetry.c
Outdated
rd_kafka_dbg(rk, TELEMETRY, "TELTERM", | ||
"Awaiting termination of telemetry."); | ||
mtx_lock(&rk->rk_telemetry.lock); | ||
cnd_wait(&rk->rk_telemetry.termination_cnd, &rk->rk_telemetry.lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it make sense to do a timed_wait if for some case RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SCHEDULED gets blocked somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
How much should we wait for? 1s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm not sure, but I think 1s should be good. We can probably tweak it later if the terminating push takes more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, 1s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM!
Main changes:
To accommodate this:
Makes changes so that the entire state machine runs on the main thread.
Communication with the broker thread (when preferred broker has to be set) and with the app thread (during termination) is done via ops or conditional variables.
Also updates some documentation comments.