From 122c1e41c03e15657cee9be8757af7798cc095f1 Mon Sep 17 00:00:00 2001 From: Milind L Date: Thu, 3 Aug 2023 14:49:13 +0530 Subject: [PATCH 1/2] Add broker selection and client termination --- src/rdkafka.c | 23 +++- src/rdkafka_broker.c | 117 +++++++++++------- src/rdkafka_broker.h | 25 ++++ src/rdkafka_int.h | 18 ++- src/rdkafka_op.c | 8 ++ src/rdkafka_op.h | 9 ++ src/rdkafka_request.c | 8 +- src/rdkafka_telemetry.c | 263 +++++++++++++++++++++++++++++++++++----- src/rdkafka_telemetry.h | 13 +- 9 files changed, 400 insertions(+), 84 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index fe3d456081..c403a0ced6 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1079,6 +1079,11 @@ static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) { rd_kafka_consumer_close(rk); } + /* Await telemetry termination. This method blocks until the last + * PushTelemetry request is sent (if possible). */ + if (!(flags & RD_KAFKA_DESTROY_F_IMMEDIATE)) + rd_kafka_telemetry_await_termination(rk); + /* With the consumer closed, terminate the rest of librdkafka. */ rd_atomic32_set(&rk->rk_terminate, flags | RD_KAFKA_DESTROY_F_TERMINATE); @@ -2112,9 +2117,11 @@ static int rd_kafka_thread_main(void *arg) { cnd_broadcast(&rk->rk_init_cnd); mtx_unlock(&rk->rk_init_lock); - while (likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) || - (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state != - RD_KAFKA_CGRP_STATE_TERM)))) { + while ( + likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) || + (rk->rk_cgrp && + (rk->rk_cgrp->rkcg_state != RD_KAFKA_CGRP_STATE_TERM)) || + (rk->rk_telemetry.state != RD_KAFKA_TELEMETRY_TERMINATED))) { rd_ts_t sleeptime = rd_kafka_timers_next( &rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/); rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0, @@ -2250,6 +2257,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain); mtx_init(&rk->rk_telemetry.lock, mtx_plain); + cnd_init(&rk->rk_telemetry.termination_cnd); rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created); rd_atomic32_init(&rk->rk_flushing, 0); @@ -3989,6 +3997,15 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, rd_kafka_purge(rk, rko->rko_u.purge.flags); break; + case RD_KAFKA_OP_SET_TELEMETRY_BROKER: + rd_kafka_set_telemetry_broker_maybe( + rk, rko->rko_u.telemetry_broker.rkb); + break; + + case RD_KAFKA_OP_TERMINATE_TELEMETRY: + rd_kafka_telemetry_schedule_termination(rko->rko_rk); + break; + default: /* If op has a callback set (e.g., OAUTHBEARER_REFRESH), * call it. */ diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index ad4784d5a1..62a7033295 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -237,29 +237,28 @@ static void rd_kafka_broker_features_set(rd_kafka_broker_t *rkb, int features) { /** - * @brief Check and return supported ApiVersion for \p ApiKey. - * - * @returns the highest supported ApiVersion in the specified range (inclusive) - * or -1 if the ApiKey is not supported or no matching ApiVersion. - * The current feature set is also returned in \p featuresp - * @locks none - * @locality any + * @brief Implementation for rd_kafka_broker_ApiVersion_supported and + * rd_kafka_broker_ApiVersion_supported0. */ -int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, - int16_t ApiKey, - int16_t minver, - int16_t maxver, - int *featuresp) { +static int16_t +rd_kafka_broker_ApiVersion_supported_implementation(rd_kafka_broker_t *rkb, + int16_t ApiKey, + int16_t minver, + int16_t maxver, + int *featuresp, + rd_bool_t do_lock) { struct rd_kafka_ApiVersion skel = {.ApiKey = ApiKey}; struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp; - rd_kafka_broker_lock(rkb); + if (do_lock) + rd_kafka_broker_lock(rkb); if (featuresp) *featuresp = rkb->rkb_features; if (rkb->rkb_features & RD_KAFKA_FEATURE_UNITTEST) { /* For unit tests let the broker support everything. */ - rd_kafka_broker_unlock(rkb); + if (do_lock) + rd_kafka_broker_unlock(rkb); return maxver; } @@ -268,7 +267,9 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, sizeof(*rkb->rkb_ApiVersions), rd_kafka_ApiVersion_key_cmp); if (retp) ret = *retp; - rd_kafka_broker_unlock(rkb); + + if (do_lock) + rd_kafka_broker_unlock(rkb); if (!retp) return -1; @@ -285,6 +286,47 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, } +/** + * @brief Check and return supported ApiVersion for \p ApiKey. + * + * @returns the highest supported ApiVersion in the specified range (inclusive) + * or -1 if the ApiKey is not supported or no matching ApiVersion. + * The current feature set is also returned in \p featuresp + * @locks none + * @locks_acquired rd_kafka_broker_lock() + * @locality any + */ +int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, + int16_t ApiKey, + int16_t minver, + int16_t maxver, + int *featuresp) { + return rd_kafka_broker_ApiVersion_supported_implementation( + rkb, ApiKey, minver, maxver, featuresp, rd_true /* do_lock */); +} + + +/** + * @brief Check and return supported ApiVersion for \p ApiKey. + * + * @returns the highest supported ApiVersion in the specified range (inclusive) + * or -1 if the ApiKey is not supported or no matching ApiVersion. + * The current feature set is also returned in \p featuresp + * + * @note Same as rd_kafka_broker_ApiVersion_supported except for locking. + * @locks rd_kafka_broker_lock() + * @locks_acquired none + * @locality any + */ +int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb, + int16_t ApiKey, + int16_t minver, + int16_t maxver, + int *featuresp) { + return rd_kafka_broker_ApiVersion_supported_implementation( + rkb, ApiKey, minver, maxver, featuresp, rd_false /* do_lock */); +} + /** * @brief Set broker state. * @@ -673,6 +715,9 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb, /* TODO(milind): check if this right. */ mtx_lock(&rkb->rkb_rk->rk_telemetry.lock); if (rkb->rkb_rk->rk_telemetry.preferred_broker == rkb) { + rd_kafka_dbg(rkb->rkb_rk, TELEMETRY, "TELBRKLOST", + "Lost telemetry broker %s due to state change", + rkb->rkb_name); rd_kafka_broker_destroy( rkb->rkb_rk->rk_telemetry.preferred_broker); rkb->rkb_rk->rk_telemetry.preferred_broker = NULL; @@ -1349,15 +1394,15 @@ void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk) { * @locks rd_kafka_*lock() MUST be held * @locality any */ -static rd_kafka_broker_t * -rd_kafka_broker_random0(const char *func, - int line, - rd_kafka_t *rk, - rd_bool_t is_up, - int state, - int *filtered_cnt, - int (*filter)(rd_kafka_broker_t *rk, void *opaque), - void *opaque) { +rd_kafka_broker_t *rd_kafka_broker_random0(const char *func, + int line, + rd_kafka_t *rk, + rd_bool_t is_up, + int state, + int *filtered_cnt, + int (*filter)(rd_kafka_broker_t *rk, + void *opaque), + void *opaque) { rd_kafka_broker_t *rkb, *good = NULL; int cnt = 0; int fcnt = 0; @@ -1392,11 +1437,6 @@ rd_kafka_broker_random0(const char *func, return good; } -#define rd_kafka_broker_random(rk, state, filter, opaque) \ - rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state, \ - NULL, filter, opaque) - - /** * @returns the broker (with refcnt increased) with the highest weight based * based on the provided weighing function. @@ -2277,20 +2317,11 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) { rkb, RD_KAFKAP_GetTelemetrySubscriptions, 0, 0, &features) != -1) { rd_kafka_t *rk = rkb->rkb_rk; - - mtx_lock(&rkb->rkb_rk->rk_telemetry.lock); - if (!rkb->rkb_rk->rk_telemetry.preferred_broker && - rk->rk_telemetry.state == RD_KAFKA_TELEMETRY_AWAIT_BROKER) { - rd_kafka_broker_keep(rkb); - rk->rk_telemetry.preferred_broker = rkb; - rk->rk_telemetry.state = - RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SCHEDULED; - rd_kafka_timer_start_oneshot( - &rk->rk_timers, &rk->rk_telemetry.request_timer, - rd_false, 1 /* immediate */, - rd_kafka_telemetry_fsm_tmr_cb, (void *)rk); - } - mtx_unlock(&rkb->rkb_rk->rk_telemetry.lock); + rd_kafka_op_t *rko = + rd_kafka_op_new(RD_KAFKA_OP_SET_TELEMETRY_BROKER); + rd_kafka_broker_keep(rkb); + rko->rko_u.telemetry_broker.rkb = rkb; + rd_kafka_q_enq(rk->rk_ops, rko); } } diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index 3ade1f76f9..e06b96518b 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -415,6 +415,12 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb, int16_t maxver, int *featuresp); +int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb, + int16_t ApiKey, + int16_t minver, + int16_t maxver, + int *featuresp); + rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func, int line, rd_kafka_t *rk, @@ -572,6 +578,25 @@ int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk, rd_kafka_enq_once_t *eonce); void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk); +rd_kafka_broker_t *rd_kafka_broker_random0(const char *func, + int line, + rd_kafka_t *rk, + rd_bool_t is_up, + int state, + int *filtered_cnt, + int (*filter)(rd_kafka_broker_t *rk, + void *opaque), + void *opaque); + +#define rd_kafka_broker_random(rk, state, filter, opaque) \ + rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state, \ + NULL, filter, opaque) + +#define rd_kafka_broker_random_up(rk, filter, opaque) \ + rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_true, \ + RD_KAFKA_BROKER_STATE_UP, NULL, filter, \ + opaque) + /** diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 6dc301c374..f65c12610d 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -240,7 +240,9 @@ typedef enum { RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SENT, RD_KAFKA_TELEMETRY_PUSH_SCHEDULED, RD_KAFKA_TELEMETRY_PUSH_SENT, - RD_KAFKA_TELEMETRY_TERMINATING + RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SCHEDULED, + RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SENT, + RD_KAFKA_TELEMETRY_TERMINATED, } rd_kafka_telemetry_state_t; @@ -251,7 +253,9 @@ rd_kafka_telemetry_state2str(rd_kafka_telemetry_state_t state) { "GetSubscriptionsSent", "PushScheduled", "PushSent", - "Terminating"}; + "TerminatingPushScheduled", + "TerminatingPushSent", + "Terminated"}; return names[state]; } @@ -639,17 +643,21 @@ struct rd_kafka_s { } rk_sasl; struct { - /* Fields for the control flow. */ + /* Fields for the control flow - unless guarded by lock, only + * accessed from main thread. */ /**< Current state of the telemetry state machine. */ rd_kafka_telemetry_state_t state; - /**< Preferred broker for sending metrics to. */ + /**< Preferred broker for sending telemetry (Lock protected). */ rd_kafka_broker_t *preferred_broker; /**< Timer for all the requests we schedule. */ rd_kafka_timer_t request_timer; /**< Lock for preferred telemetry broker and state. */ mtx_t lock; + /**< Used to wait for termination (Lock protected). */ + cnd_t termination_cnd; - /* Fields obtained from broker as a result of GetSubscriptions. + /* Fields obtained from broker as a result of GetSubscriptions - + * only accessed from main thread. */ rd_kafka_uuid_t client_instance_id; int32_t subscription_id; diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 9ebb611c91..78159db6ce 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -114,6 +114,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:ALTERUSERSCRAMCREDENTIALS", [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = "REPLY:DESCRIBEUSERSCRAMCREDENTIALS", + [RD_KAFKA_OP_SET_TELEMETRY_BROKER] = "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER", + [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", }; if (type & RD_KAFKA_OP_REPLY) @@ -270,6 +272,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_SET_TELEMETRY_BROKER] = sizeof(rko->rko_u.telemetry_broker), + [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -468,6 +472,10 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { rd_kafka_topic_partition_list_destroy); break; + case RD_KAFKA_OP_SET_TELEMETRY_BROKER: + RD_IF_FREE(rko->rko_u.telemetry_broker.rkb, rd_kafka_broker_destroy); + break; + default: break; } diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 84564e8dba..ca89b839e9 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -172,6 +172,10 @@ typedef enum { RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin: AlterUserScramCredentials u.admin_request >*/ + RD_KAFKA_OP_SET_TELEMETRY_BROKER, /**< Set preferred broker for + telemetry. */ + RD_KAFKA_OP_TERMINATE_TELEMETRY, /**< Start termination sequence for + telemetry. */ RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -661,6 +665,11 @@ struct rd_kafka_op_s { } leaders; + struct { + /** Preferred broker for telemetry. */ + rd_kafka_broker_t *rkb; + } telemetry_broker; + } rko_u; }; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 7aff3c86f1..649d5f6e7b 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -5365,11 +5365,15 @@ void rd_kafka_handle_GetTelemetrySubscriptions(rd_kafka_t *rk, } rd_kafka_handle_get_telemetry_subscriptions(rk, err); + return; + err_parse: err = rkbuf->rkbuf_err; goto err; err: + /* TODO: Add error handling actions, possibly call + * rd_kafka_handle_get_telemetry_subscriptions with error. */ return; } @@ -5400,12 +5404,14 @@ void rd_kafka_handle_PushTelemetry(rd_kafka_t *rk, goto err; } rd_kafka_handle_push_telemetry(rk, err); - + return; err_parse: err = rkbuf->rkbuf_err; goto err; err: + /* TODO: Add error handling actions, possibly call + * rd_kafka_handle_push_telemetry with error. */ return; } diff --git a/src/rdkafka_telemetry.c b/src/rdkafka_telemetry.c index 16fb2235bd..efdf5bd49a 100644 --- a/src/rdkafka_telemetry.c +++ b/src/rdkafka_telemetry.c @@ -33,21 +33,21 @@ #include "rdkafka_request.h" /** - * @brief Returns the preferred metrics broker or NULL if unavailable. + * @brief Filters broker by availability of GetTelemetrySubscription. * - * @locks_acquired rk_telemetry.lock + * @return 0 if GetTelemetrySubscription is supported, 1 otherwise. + * + * @locks rd_kafka_broker_lock() */ -static rd_kafka_broker_t *rd_kafka_get_preferred_broker(rd_kafka_t *rk) { - rd_kafka_broker_t *rkb = NULL; - - mtx_lock(&rk->rk_telemetry.lock); - if (rk->rk_telemetry.preferred_broker) { - rkb = rk->rk_telemetry.preferred_broker; - } - /* TODO: handle recalculation of preferred broker in case broker goes - * down. For now just return. */ - mtx_unlock(&rk->rk_telemetry.lock); - return rkb; +static int +rd_kafka_filter_broker_by_GetTelemetrySubscription(rd_kafka_broker_t *rkb, + void *opaque) { + int features; + if (rd_kafka_broker_ApiVersion_supported0( + rkb, RD_KAFKAP_GetTelemetrySubscriptions, 0, 0, &features) != + -1) + return 0; + return 1; } /** @@ -56,6 +56,9 @@ static rd_kafka_broker_t *rd_kafka_get_preferred_broker(rd_kafka_t *rk) { * @param clear_control_flow_fields This determines if the control flow fields * need to be cleared. This should only be set * to true if the rk is terminating. + * @locality main thread + * @locks none + * @locks_acquired rk_telemetry.lock */ void rd_kafka_telemetry_clear(rd_kafka_t *rk, rd_bool_t clear_control_flow_fields) { @@ -68,6 +71,7 @@ void rd_kafka_telemetry_clear(rd_kafka_t *rk, } mtx_unlock(&rk->rk_telemetry.lock); mtx_destroy(&rk->rk_telemetry.lock); + cnd_destroy(&rk->rk_telemetry.termination_cnd); } if (rk->rk_telemetry.accepted_compression_types_cnt) { @@ -86,9 +90,32 @@ void rd_kafka_telemetry_clear(rd_kafka_t *rk, } } +/** + * @brief Sets the telemetry state to TERMINATED and signals the conditional + * variable + * + * @locality main thread + * @locks none + * @locks_acquired rk_telemetry.lock + */ +static void rd_kafka_telemetry_set_terminated(rd_kafka_t *rk) { + rd_dassert(thrd_is_current(rk->rk_thread)); + + rd_kafka_dbg(rk, TELEMETRY, "TELTERM", + "Setting state to TERMINATED and signalling"); + + rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_TERMINATED; + mtx_lock(&rk->rk_telemetry.lock); + cnd_signal(&rk->rk_telemetry.termination_cnd); + mtx_unlock(&rk->rk_telemetry.lock); +} + /** * @brief Enqueues a GetTelemetrySubscriptionsRequest. * + * @locks none + * @locks_acquired none + * @locality main thread */ static void rd_kafka_send_get_telemetry_subscriptions(rd_kafka_t *rk, rd_kafka_broker_t *rkb) { @@ -107,7 +134,13 @@ static void rd_kafka_send_get_telemetry_subscriptions(rd_kafka_t *rk, rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SENT; } - +/** + * @brief Handles parsed GetTelemetrySubscriptions response. + * + * @locks none + * @locks_acquired none + * @locality main thread + */ void rd_kafka_handle_get_telemetry_subscriptions(rd_kafka_t *rk, rd_kafka_resp_err_t err) { rd_ts_t next_scheduled; @@ -149,6 +182,7 @@ void rd_kafka_handle_get_telemetry_subscriptions(rd_kafka_t *rk, next_scheduled, rd_kafka_telemetry_fsm_tmr_cb, rk); } + static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_bool_t terminating) { @@ -161,16 +195,38 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, void *metrics_payload = NULL; char *compression_type = "gzip"; - rd_kafka_dbg(rk, TELEMETRY, "PUSHSENT", "Sending PushTelemetryRequest"); + rd_kafka_dbg(rk, TELEMETRY, "PUSHSENT", + "Sending PushTelemetryRequest with terminating = %d", + terminating); rd_kafka_PushTelemetryRequest( rkb, &rk->rk_telemetry.client_instance_id, rk->rk_telemetry.subscription_id, terminating, compression_type, metrics_payload, 0, NULL, 0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry, NULL); - rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_PUSH_SENT; + + rk->rk_telemetry.state = terminating + ? RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SENT + : RD_KAFKA_TELEMETRY_PUSH_SENT; } + void rd_kafka_handle_push_telemetry(rd_kafka_t *rk, rd_kafka_resp_err_t err) { + + /* We only make a best-effort attempt to push telemetry while + * terminating, and don't care about any errors. */ + if (rk->rk_telemetry.state == + RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SENT) { + rd_kafka_telemetry_set_terminated(rk); + return; + } + + /* There's a possiblity that we sent a PushTelemetryRequest, and + * scheduled a termination before getting the response. In that case, we + * will enter this method in the TERMINATED state when/if we get a + * response, and we should not take any action. */ + if (rk->rk_telemetry.state != RD_KAFKA_TELEMETRY_PUSH_SENT) + return; + if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { rd_kafka_dbg(rk, TELEMETRY, "PUSHOK", "PushTelemetryRequest succeeded"); @@ -180,6 +236,7 @@ void rd_kafka_handle_push_telemetry(rd_kafka_t *rk, rd_kafka_resp_err_t err) { rk->rk_telemetry.push_interval_ms * 1000, rd_kafka_telemetry_fsm_tmr_cb, (void *)rk); } else { /* error */ + /* TODO: add specific error handling. */ rd_kafka_dbg(rk, TELEMETRY, "PUSHERR", "PushTelemetryRequest failed: %s", rd_kafka_err2str(err)); @@ -192,24 +249,155 @@ void rd_kafka_handle_push_telemetry(rd_kafka_t *rk, rd_kafka_resp_err_t err) { } } +/** + * @brief This method starts the termination for telemetry and awaits + * completion. + * + * @locks none + * @locks_acquired rk_telemetry.lock + * @locality app thread (normal case) or the main thread (when terminated + * during creation). + */ +void rd_kafka_telemetry_await_termination(rd_kafka_t *rk) { + rd_kafka_op_t *rko; + + /* In the case where we have a termination during creation, we can't + * send any telemetry. */ + if (thrd_is_current(rk->rk_thread)) { + /* We can change state since we're on the main thread. */ + rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_TERMINATED; + return; + } + + rko = rd_kafka_op_new(RD_KAFKA_OP_TERMINATE_TELEMETRY); + rko->rko_rk = rk; + rd_kafka_q_enq(rk->rk_ops, rko); + + /* Await termination sequence completion. */ + rd_kafka_dbg(rk, TELEMETRY, "TELTERM", + "Awaiting termination of telemetry."); + mtx_lock(&rk->rk_telemetry.lock); + cnd_wait(&rk->rk_telemetry.termination_cnd, &rk->rk_telemetry.lock); + mtx_unlock(&rk->rk_telemetry.lock); +} + +/** + * @brief Send a final push request before terminating. + * + * @locks none + * @locks_acquired none + * @locality main thread + * @note This method is on a best-effort basis. + */ +void rd_kafka_telemetry_schedule_termination(rd_kafka_t *rk) { + rd_kafka_dbg( + rk, TELEMETRY, "TELTERM", + "Starting rd_kafka_telemetry_schedule_termination in state %s", + rd_kafka_telemetry_state2str(rk->rk_telemetry.state)); + + if (rk->rk_telemetry.state != RD_KAFKA_TELEMETRY_PUSH_SCHEDULED) { + rd_kafka_telemetry_set_terminated(rk); + return; + } + + rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SCHEDULED; + + rd_kafka_dbg(rk, TELEMETRY, "TELTERM", + "Sending final request for Push"); + rd_kafka_timer_override_once( + &rk->rk_timers, &rk->rk_telemetry.request_timer, 0 /* immediate */); +} + + +/** + * @brief Sets telemetry broker if we are in AWAIT_BROKER state. + * + * @locks none + * @locks_acquired rk_telemetry.lock + * @locality main thread + */ +void rd_kafka_set_telemetry_broker_maybe(rd_kafka_t *rk, + rd_kafka_broker_t *rkb) { + rd_dassert(thrd_is_current(rk->rk_thread)); + + /* The op triggering this method is scheduled by brokers without knowing + * if a preferred broker is already set. If it is set, this method is a + * no-op. */ + if (rk->rk_telemetry.state != RD_KAFKA_TELEMETRY_AWAIT_BROKER) + return; + + mtx_lock(&rk->rk_telemetry.lock); + + if (rk->rk_telemetry.preferred_broker) { + mtx_unlock(&rk->rk_telemetry.lock); + return; + } + + rd_kafka_broker_keep(rkb); + rk->rk_telemetry.preferred_broker = rkb; + + mtx_unlock(&rk->rk_telemetry.lock); + + rd_kafka_dbg(rk, TELEMETRY, "TELBRKSET", + "Setting telemetry broker to %s\n", rkb->rkb_name); + + rk->rk_telemetry.state = RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SCHEDULED; + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rk->rk_telemetry.request_timer, rd_false, + 0 /* immediate */, rd_kafka_telemetry_fsm_tmr_cb, (void *)rk); +} + +/** + * @brief Returns the preferred metrics broker or NULL if unavailable. + * + * @locks none + * @locks_acquired rk_telemetry.lock, rd_kafka_wrlock() + * @locality main thread + */ +static rd_kafka_broker_t *rd_kafka_get_preferred_broker(rd_kafka_t *rk) { + rd_kafka_broker_t *rkb = NULL; + + mtx_lock(&rk->rk_telemetry.lock); + if (rk->rk_telemetry.preferred_broker) + rkb = rk->rk_telemetry.preferred_broker; + else { + /* If there is no preferred broker, that means that our previous + * one failed. Iterate through all available brokers to find + * one. */ + rd_kafka_wrlock(rk); + rkb = rd_kafka_broker_random_up( + rk, rd_kafka_filter_broker_by_GetTelemetrySubscription, + NULL); + rd_kafka_wrunlock(rk); + + /* No need to increase refcnt as broker_random_up does it + * already. */ + rk->rk_telemetry.preferred_broker = rkb; + + rd_kafka_dbg(rk, TELEMETRY, "TELBRKSET", + "Lost preferred broker, switching to new " + "preferred broker %d\n", + rkb ? rd_kafka_broker_id(rkb) : -1); + } + mtx_unlock(&rk->rk_telemetry.lock); + + return rkb; +} + /** * @brief Progress the telemetry state machine. * + * @locks none + * @locks_acquired none * @locality main thread */ static void rd_kafka_telemetry_fsm(rd_kafka_t *rk) { - rd_kafka_telemetry_state_t state; - rd_kafka_broker_t *preferred_broker; + rd_kafka_broker_t *preferred_broker = NULL; rd_dassert(rk); + rd_dassert(thrd_is_current(rk->rk_thread)); - /* We don't require a lock here, as the only way we can reach this - * function is if we've already set the state from the broker thread, - * and further state transitions happen only on the main thread. */ - mtx_lock(&rk->rk_telemetry.lock); - state = rk->rk_telemetry.state; - mtx_unlock(&rk->rk_telemetry.lock); - switch (state) { + switch (rk->rk_telemetry.state) { case RD_KAFKA_TELEMETRY_AWAIT_BROKER: rd_dassert(!*"Should never be awaiting a broker when the telemetry fsm is called."); break; @@ -217,7 +405,8 @@ static void rd_kafka_telemetry_fsm(rd_kafka_t *rk) { case RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SCHEDULED: preferred_broker = rd_kafka_get_preferred_broker(rk); if (!preferred_broker) { - state = RD_KAFKA_TELEMETRY_AWAIT_BROKER; + rk->rk_telemetry.state = + RD_KAFKA_TELEMETRY_AWAIT_BROKER; break; } rd_kafka_send_get_telemetry_subscriptions(rk, preferred_broker); @@ -226,7 +415,8 @@ static void rd_kafka_telemetry_fsm(rd_kafka_t *rk) { case RD_KAFKA_TELEMETRY_PUSH_SCHEDULED: preferred_broker = rd_kafka_get_preferred_broker(rk); if (!preferred_broker) { - state = RD_KAFKA_TELEMETRY_AWAIT_BROKER; + rk->rk_telemetry.state = + RD_KAFKA_TELEMETRY_AWAIT_BROKER; break; } rd_kafka_send_push_telemetry(rk, preferred_broker, rd_false); @@ -234,23 +424,38 @@ static void rd_kafka_telemetry_fsm(rd_kafka_t *rk) { case RD_KAFKA_TELEMETRY_PUSH_SENT: case RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SENT: + case RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SENT: rd_dassert(!*"Should never be awaiting response when the telemetry fsm is called."); break; - case RD_KAFKA_TELEMETRY_TERMINATING: + case RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SCHEDULED: preferred_broker = rd_kafka_get_preferred_broker(rk); if (!preferred_broker) { - state = RD_KAFKA_TELEMETRY_AWAIT_BROKER; + /* If there's no preferred broker, set state to + * terminated immediately to stop the app thread from + * waiting indefinitely. */ + rd_kafka_telemetry_set_terminated(rk); break; } rd_kafka_send_push_telemetry(rk, preferred_broker, rd_true); break; + case RD_KAFKA_TELEMETRY_TERMINATED: + rd_dassert(!*"Should not be terminated when the telemetry fsm is called."); + break; + default: rd_assert(!*"Unknown state"); } } +/** + * @brief Callback for FSM timer. + * + * @locks none + * @locks_acquired none + * @locality main thread + */ void rd_kafka_telemetry_fsm_tmr_cb(rd_kafka_timers_t *rkts, void *rk) { rd_kafka_telemetry_fsm(rk); } diff --git a/src/rdkafka_telemetry.h b/src/rdkafka_telemetry.h index 930573399e..d41f5b874d 100644 --- a/src/rdkafka_telemetry.h +++ b/src/rdkafka_telemetry.h @@ -30,13 +30,20 @@ #ifndef _RD_KAFKA_TELEMETRY_H_ #define _RD_KAFKA_TELEMETRY_H_ -void rd_kafka_telemetry_fsm_tmr_cb(rd_kafka_timers_t *rkts, void *rk); - void rd_kafka_handle_get_telemetry_subscriptions(rd_kafka_t *rk, rd_kafka_resp_err_t err); + void rd_kafka_handle_push_telemetry(rd_kafka_t *rk, rd_kafka_resp_err_t err); void rd_kafka_telemetry_clear(rd_kafka_t *rk, rd_bool_t clear_control_flow_fields); -#endif /* _RD_KAFKA_TELEMETRY_H_ */ \ No newline at end of file +void rd_kafka_telemetry_await_termination(rd_kafka_t *rk); + +void rd_kafka_telemetry_schedule_termination(rd_kafka_t *rk); + +void rd_kafka_set_telemetry_broker_maybe(rd_kafka_t *rk, rd_kafka_broker_t *rkb); + +void rd_kafka_telemetry_fsm_tmr_cb(rd_kafka_timers_t *rkts, void *rk); + +#endif /* _RD_KAFKA_TELEMETRY_H_ */ From 2e23b449aed1a61f7612110535ee8a96d680f758 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 7 Aug 2023 10:46:10 +0530 Subject: [PATCH 2/2] Address review comments --- src/rdkafka.c | 11 +++++------ src/rdkafka_telemetry.c | 9 ++++++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index c403a0ced6..9af1717037 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1084,7 +1084,8 @@ static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) { if (!(flags & RD_KAFKA_DESTROY_F_IMMEDIATE)) rd_kafka_telemetry_await_termination(rk); - /* With the consumer closed, terminate the rest of librdkafka. */ + /* With the consumer and telemetry closed, terminate the rest of + * librdkafka. */ rd_atomic32_set(&rk->rk_terminate, flags | RD_KAFKA_DESTROY_F_TERMINATE); @@ -2117,11 +2118,9 @@ static int rd_kafka_thread_main(void *arg) { cnd_broadcast(&rk->rk_init_cnd); mtx_unlock(&rk->rk_init_lock); - while ( - likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) || - (rk->rk_cgrp && - (rk->rk_cgrp->rkcg_state != RD_KAFKA_CGRP_STATE_TERM)) || - (rk->rk_telemetry.state != RD_KAFKA_TELEMETRY_TERMINATED))) { + while (likely(!rd_kafka_terminating(rk) || rd_kafka_q_len(rk->rk_ops) || + (rk->rk_cgrp && (rk->rk_cgrp->rkcg_state != + RD_KAFKA_CGRP_STATE_TERM)))) { rd_ts_t sleeptime = rd_kafka_timers_next( &rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/); rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0, diff --git a/src/rdkafka_telemetry.c b/src/rdkafka_telemetry.c index efdf5bd49a..6d4c54b48d 100644 --- a/src/rdkafka_telemetry.c +++ b/src/rdkafka_telemetry.c @@ -277,8 +277,15 @@ void rd_kafka_telemetry_await_termination(rd_kafka_t *rk) { rd_kafka_dbg(rk, TELEMETRY, "TELTERM", "Awaiting termination of telemetry."); mtx_lock(&rk->rk_telemetry.lock); - cnd_wait(&rk->rk_telemetry.termination_cnd, &rk->rk_telemetry.lock); + cnd_timedwait_ms(&rk->rk_telemetry.termination_cnd, + &rk->rk_telemetry.lock, + /* TODO(milind): Evaluate this timeout after completion + of all metrics push, is it too much, or too less if + we include serialization? */ + 1000 /* timeout for waiting */); mtx_unlock(&rk->rk_telemetry.lock); + rd_kafka_dbg(rk, TELEMETRY, "TELTERM", + "Ended waiting for termination of telemetry."); } /**