Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_assign() hangs forever when closing consumer #4884

Open
1 of 7 tasks
aiquestion opened this issue Oct 25, 2024 · 0 comments
Open
1 of 7 tasks

rd_kafka_assign() hangs forever when closing consumer #4884

aiquestion opened this issue Oct 25, 2024 · 0 comments

Comments

@aiquestion
Copy link
Contributor

Description

we enconter a issue when closing the consumer in Rust SDK, it hangs calling rd_kafka_assign() in rebalance_cb.
code is like (using C code)

rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE|RD_KAFKA_EVENT_OFFSET_COMMIT|RD_KAFKA_EVENT_ERROR|RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH);
rd_kafka_poll_set_consumer(rk);
rd_kafka_queue_t * queue = rd_kafka_queue_get_consumer(rk);
// should poll here, but we don't do poll to reproduce the bug

// close consumer
rd_kafka_consumer_close_queue(rk, queue);
while (!rd_kafka_consumer_closed(rk)) {
                rd_kafka_event_t *rkev;
                rkev = rd_kafka_queue_poll(queue, 60 * 1000);
                fprintf(stderr, "%% Get event %s \n", rd_kafka_event_name(rkev));
                if (rkev) {
                        if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_REBALANCE) {
                                rebalance_cb_event(rk, rkev);
                        }
                }
}

static void rebalance_cb_event(rd_kafka_t *rk,
                               rd_kafka_event_t* rkev) {
        rd_kafka_error_t *error     = NULL;
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
        switch (rd_kafka_event_error(rkev)) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_assign(
                            rk, rd_kafka_topic_partition_list_new(0));
                }
                else {
                        ret_err = rd_kafka_assign(
                            rk, rd_kafka_topic_partition_list_new(0));
                        printf(stderr, "assign ret: %s\n",
                               rd_kafka_err2str(ret_err));
                        sleep(3);
                }
                break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
                if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
                        error = rd_kafka_incremental_unassign(rk, rd_kafka_topic_partition_list_new(0));
                } else {
                        ret_err  = rd_kafka_assign(rk, NULL);
                        wait_eof = 0;
                }
                break;
        default:
                rd_kafka_assign(rk, NULL);
                break;
        }
        if (error) {
                fprintf(stderr, "incremental assign failure: %s\n",
                        rd_kafka_error_string(error));
                rd_kafka_error_destroy(error);
        } else if (ret_err) {
                fprintf(stderr, "assign failure: %s\n",
                        rd_kafka_err2str(ret_err));
        }
}

In some cases the program will never exist, and the call stack hangs in rd_kafka_assign

  * frame #0: 0x00000001916019ec libsystem_kernel.dylib`__psynch_cvwait + 8
    frame #1: 0x000000019163f55c libsystem_pthread.dylib`_pthread_cond_wait + 1228
    frame #2: 0x00000001003ee258 rdkafka_complex_consumer_example`cnd_wait + 12
    frame #3: 0x0000000100397c14 rdkafka_complex_consumer_example`rd_kafka_q_pop_serve + 620
    frame #4: 0x000000010039a764 rdkafka_complex_consumer_example`rd_kafka_op_req + 148
    frame #5: 0x00000001003ce394 rdkafka_complex_consumer_example`rd_kafka_assign + 80
    frame #6: 0x00000001003686f0 rdkafka_complex_consumer_example`main [inlined] rebalance_cb_event(rk=0x000000014e010a00, rkev=<unavailable>) at rdkafka_complex_consumer_example.c:175:35 [opt]
    frame #7: 0x0000000100368634 rdkafka_complex_consumer_example`main(argc=<unavailable>, argv=<unavailable>) at rdkafka_complex_consumer_example.c:619:33 [opt]

After investigation, it seems that:

  • consumer start and join/sync group success.
  • tigger a rebalance event, since user didn't poll, it will not be handled
  • rd_kafka_consumer_close_queue to trigger a RD_KAFKA_OP_TERMINATE in cgrp
  • when closing rd_kafka_queue_poll will be called, and rebalance event handled
  • RD_KAFKA_OP_ASSIGN send into cgrp_op queue
  • RD_KAFKA_OP_TERMINATE handled, and exit the rd_kafka_q_serve
  • rd_kafka_cgrp_serve called rd_kafka_q_purge(rkcg->rkcg_ops); to purge the RD_KAFKA_OP_ASSIGN event
  • main thread never get reply from RD_KAFKA_OP_ASSIGN, so it hangs forever
// in rdkafka.c thread_main:
                int cnt = rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
                if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
                        rd_kafka_cgrp_serve(rk->rk_cgrp);

a proper fix is to handle all rkcg->rkcg_ops before rd_kafka_q_purge(rkcg->rkcg_ops);
PR

How to reproduce

It's a random case, i managed to re-produce it after i modify many code:

  • change heart beat response to always error ( so we are not in the group)
  • change rd_kafka_q_serve in main thread to handle 1 op at a time
  • disable rd_kafka_cgrp_serve before RD_KAFKA_OP_GET_REBALANCE_PROTOCOL called so RD_KAFKA_OP_ASSIGN can be in queue first and handle after rd_kafka_cgrp_serve called.
    aiquestion@1bfc1e4

build rdkafka_complex_consumer_example.c and run it. it will hang. after apply the fix in PR, it can close successfully.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): <REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
  • Apache Kafka version: <REPLACE with e.g., 0.10.2.3>
  • librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • Operating system: <REPLACE with e.g., Centos 5 (x64)>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant