Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker selection and client termination [KIP-714] #4382

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,13 @@ static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
rd_kafka_consumer_close(rk);
}

/* With the consumer closed, terminate the rest of librdkafka. */
/* Await telemetry termination. This method blocks until the last
* PushTelemetry request is sent (if possible). */
if (!(flags & RD_KAFKA_DESTROY_F_IMMEDIATE))
rd_kafka_telemetry_await_termination(rk);

/* With the consumer and telemetry closed, terminate the rest of
* librdkafka. */
rd_atomic32_set(&rk->rk_terminate,
flags | RD_KAFKA_DESTROY_F_TERMINATE);

Expand Down Expand Up @@ -2250,6 +2256,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);

mtx_init(&rk->rk_telemetry.lock, mtx_plain);
cnd_init(&rk->rk_telemetry.termination_cnd);

rd_atomic64_init(&rk->rk_ts_last_poll, rk->rk_ts_created);
rd_atomic32_init(&rk->rk_flushing, 0);
Expand Down Expand Up @@ -3989,6 +3996,15 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
rd_kafka_purge(rk, rko->rko_u.purge.flags);
break;

case RD_KAFKA_OP_SET_TELEMETRY_BROKER:
rd_kafka_set_telemetry_broker_maybe(
rk, rko->rko_u.telemetry_broker.rkb);
break;

case RD_KAFKA_OP_TERMINATE_TELEMETRY:
rd_kafka_telemetry_schedule_termination(rko->rko_rk);
break;

default:
/* If op has a callback set (e.g., OAUTHBEARER_REFRESH),
* call it. */
Expand Down
117 changes: 74 additions & 43 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,29 +237,28 @@ static void rd_kafka_broker_features_set(rd_kafka_broker_t *rkb, int features) {


/**
* @brief Check and return supported ApiVersion for \p ApiKey.
*
* @returns the highest supported ApiVersion in the specified range (inclusive)
* or -1 if the ApiKey is not supported or no matching ApiVersion.
* The current feature set is also returned in \p featuresp
* @locks none
* @locality any
* @brief Implementation for rd_kafka_broker_ApiVersion_supported and
* rd_kafka_broker_ApiVersion_supported0.
*/
int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
int16_t ApiKey,
int16_t minver,
int16_t maxver,
int *featuresp) {
static int16_t
rd_kafka_broker_ApiVersion_supported_implementation(rd_kafka_broker_t *rkb,
int16_t ApiKey,
int16_t minver,
int16_t maxver,
int *featuresp,
rd_bool_t do_lock) {
struct rd_kafka_ApiVersion skel = {.ApiKey = ApiKey};
struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp;

rd_kafka_broker_lock(rkb);
if (do_lock)
rd_kafka_broker_lock(rkb);
if (featuresp)
*featuresp = rkb->rkb_features;

if (rkb->rkb_features & RD_KAFKA_FEATURE_UNITTEST) {
/* For unit tests let the broker support everything. */
rd_kafka_broker_unlock(rkb);
if (do_lock)
rd_kafka_broker_unlock(rkb);
return maxver;
}

Expand All @@ -268,7 +267,9 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
sizeof(*rkb->rkb_ApiVersions), rd_kafka_ApiVersion_key_cmp);
if (retp)
ret = *retp;
rd_kafka_broker_unlock(rkb);

if (do_lock)
rd_kafka_broker_unlock(rkb);

if (!retp)
return -1;
Expand All @@ -285,6 +286,47 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
}


/**
* @brief Check and return supported ApiVersion for \p ApiKey.
*
* @returns the highest supported ApiVersion in the specified range (inclusive)
* or -1 if the ApiKey is not supported or no matching ApiVersion.
* The current feature set is also returned in \p featuresp
* @locks none
* @locks_acquired rd_kafka_broker_lock()
* @locality any
*/
int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
int16_t ApiKey,
int16_t minver,
int16_t maxver,
int *featuresp) {
return rd_kafka_broker_ApiVersion_supported_implementation(
rkb, ApiKey, minver, maxver, featuresp, rd_true /* do_lock */);
}


/**
* @brief Check and return supported ApiVersion for \p ApiKey.
*
* @returns the highest supported ApiVersion in the specified range (inclusive)
* or -1 if the ApiKey is not supported or no matching ApiVersion.
* The current feature set is also returned in \p featuresp
*
* @note Same as rd_kafka_broker_ApiVersion_supported except for locking.
* @locks rd_kafka_broker_lock()
* @locks_acquired none
* @locality any
*/
int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb,
int16_t ApiKey,
int16_t minver,
int16_t maxver,
int *featuresp) {
return rd_kafka_broker_ApiVersion_supported_implementation(
rkb, ApiKey, minver, maxver, featuresp, rd_false /* do_lock */);
}

/**
* @brief Set broker state.
*
Expand Down Expand Up @@ -673,6 +715,9 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
/* TODO(milind): check if this right. */
mtx_lock(&rkb->rkb_rk->rk_telemetry.lock);
if (rkb->rkb_rk->rk_telemetry.preferred_broker == rkb) {
rd_kafka_dbg(rkb->rkb_rk, TELEMETRY, "TELBRKLOST",
"Lost telemetry broker %s due to state change",
rkb->rkb_name);
rd_kafka_broker_destroy(
rkb->rkb_rk->rk_telemetry.preferred_broker);
rkb->rkb_rk->rk_telemetry.preferred_broker = NULL;
Expand Down Expand Up @@ -1349,15 +1394,15 @@ void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk) {
* @locks rd_kafka_*lock() MUST be held
* @locality any
*/
static rd_kafka_broker_t *
rd_kafka_broker_random0(const char *func,
int line,
rd_kafka_t *rk,
rd_bool_t is_up,
int state,
int *filtered_cnt,
int (*filter)(rd_kafka_broker_t *rk, void *opaque),
void *opaque) {
rd_kafka_broker_t *rd_kafka_broker_random0(const char *func,
int line,
rd_kafka_t *rk,
rd_bool_t is_up,
int state,
int *filtered_cnt,
int (*filter)(rd_kafka_broker_t *rk,
void *opaque),
void *opaque) {
rd_kafka_broker_t *rkb, *good = NULL;
int cnt = 0;
int fcnt = 0;
Expand Down Expand Up @@ -1392,11 +1437,6 @@ rd_kafka_broker_random0(const char *func,
return good;
}

#define rd_kafka_broker_random(rk, state, filter, opaque) \
rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state, \
NULL, filter, opaque)


/**
* @returns the broker (with refcnt increased) with the highest weight based
* based on the provided weighing function.
Expand Down Expand Up @@ -2277,20 +2317,11 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {
rkb, RD_KAFKAP_GetTelemetrySubscriptions, 0, 0, &features) !=
-1) {
rd_kafka_t *rk = rkb->rkb_rk;

mtx_lock(&rkb->rkb_rk->rk_telemetry.lock);
if (!rkb->rkb_rk->rk_telemetry.preferred_broker &&
rk->rk_telemetry.state == RD_KAFKA_TELEMETRY_AWAIT_BROKER) {
rd_kafka_broker_keep(rkb);
rk->rk_telemetry.preferred_broker = rkb;
rk->rk_telemetry.state =
RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SCHEDULED;
rd_kafka_timer_start_oneshot(
&rk->rk_timers, &rk->rk_telemetry.request_timer,
rd_false, 1 /* immediate */,
rd_kafka_telemetry_fsm_tmr_cb, (void *)rk);
}
mtx_unlock(&rkb->rkb_rk->rk_telemetry.lock);
rd_kafka_op_t *rko =
rd_kafka_op_new(RD_KAFKA_OP_SET_TELEMETRY_BROKER);
rd_kafka_broker_keep(rkb);
rko->rko_u.telemetry_broker.rkb = rkb;
rd_kafka_q_enq(rk->rk_ops, rko);
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/rdkafka_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ int16_t rd_kafka_broker_ApiVersion_supported(rd_kafka_broker_t *rkb,
int16_t maxver,
int *featuresp);

int16_t rd_kafka_broker_ApiVersion_supported0(rd_kafka_broker_t *rkb,
int16_t ApiKey,
int16_t minver,
int16_t maxver,
int *featuresp);

rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0_fl(const char *func,
int line,
rd_kafka_t *rk,
Expand Down Expand Up @@ -572,6 +578,25 @@ int rd_kafka_brokers_wait_state_change_async(rd_kafka_t *rk,
rd_kafka_enq_once_t *eonce);
void rd_kafka_brokers_broadcast_state_change(rd_kafka_t *rk);

rd_kafka_broker_t *rd_kafka_broker_random0(const char *func,
int line,
rd_kafka_t *rk,
rd_bool_t is_up,
int state,
int *filtered_cnt,
int (*filter)(rd_kafka_broker_t *rk,
void *opaque),
void *opaque);

#define rd_kafka_broker_random(rk, state, filter, opaque) \
rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_false, state, \
NULL, filter, opaque)

#define rd_kafka_broker_random_up(rk, filter, opaque) \
rd_kafka_broker_random0(__FUNCTION__, __LINE__, rk, rd_true, \
RD_KAFKA_BROKER_STATE_UP, NULL, filter, \
opaque)



/**
Expand Down
18 changes: 13 additions & 5 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ typedef enum {
RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SENT,
RD_KAFKA_TELEMETRY_PUSH_SCHEDULED,
RD_KAFKA_TELEMETRY_PUSH_SENT,
RD_KAFKA_TELEMETRY_TERMINATING
RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SCHEDULED,
RD_KAFKA_TELEMETRY_TERMINATING_PUSH_SENT,
RD_KAFKA_TELEMETRY_TERMINATED,
} rd_kafka_telemetry_state_t;


Expand All @@ -251,7 +253,9 @@ rd_kafka_telemetry_state2str(rd_kafka_telemetry_state_t state) {
"GetSubscriptionsSent",
"PushScheduled",
"PushSent",
"Terminating"};
"TerminatingPushScheduled",
"TerminatingPushSent",
"Terminated"};
return names[state];
}

Expand Down Expand Up @@ -639,17 +643,21 @@ struct rd_kafka_s {
} rk_sasl;

struct {
/* Fields for the control flow. */
/* Fields for the control flow - unless guarded by lock, only
* accessed from main thread. */
/**< Current state of the telemetry state machine. */
rd_kafka_telemetry_state_t state;
/**< Preferred broker for sending metrics to. */
/**< Preferred broker for sending telemetry (Lock protected). */
rd_kafka_broker_t *preferred_broker;
/**< Timer for all the requests we schedule. */
rd_kafka_timer_t request_timer;
/**< Lock for preferred telemetry broker and state. */
mtx_t lock;
/**< Used to wait for termination (Lock protected). */
cnd_t termination_cnd;

/* Fields obtained from broker as a result of GetSubscriptions.
/* Fields obtained from broker as a result of GetSubscriptions -
* only accessed from main thread.
*/
rd_kafka_uuid_t client_instance_id;
int32_t subscription_id;
Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
"REPLY:ALTERUSERSCRAMCREDENTIALS",
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
"REPLY:DESCRIBEUSERSCRAMCREDENTIALS",
[RD_KAFKA_OP_SET_TELEMETRY_BROKER] = "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER",
[RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY",
};

if (type & RD_KAFKA_OP_REPLY)
Expand Down Expand Up @@ -270,6 +272,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] =
sizeof(rko->rko_u.admin_request),
[RD_KAFKA_OP_SET_TELEMETRY_BROKER] = sizeof(rko->rko_u.telemetry_broker),
[RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY,
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];

Expand Down Expand Up @@ -468,6 +472,10 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
rd_kafka_topic_partition_list_destroy);
break;

case RD_KAFKA_OP_SET_TELEMETRY_BROKER:
RD_IF_FREE(rko->rko_u.telemetry_broker.rkb, rd_kafka_broker_destroy);
break;

default:
break;
}
Expand Down
9 changes: 9 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ typedef enum {
RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin:
AlterUserScramCredentials
u.admin_request >*/
RD_KAFKA_OP_SET_TELEMETRY_BROKER, /**< Set preferred broker for
telemetry. */
RD_KAFKA_OP_TERMINATE_TELEMETRY, /**< Start termination sequence for
telemetry. */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;

Expand Down Expand Up @@ -661,6 +665,11 @@ struct rd_kafka_op_s {

} leaders;

struct {
/** Preferred broker for telemetry. */
rd_kafka_broker_t *rkb;
} telemetry_broker;

} rko_u;
};

Expand Down
8 changes: 7 additions & 1 deletion src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5365,11 +5365,15 @@ void rd_kafka_handle_GetTelemetrySubscriptions(rd_kafka_t *rk,
}

rd_kafka_handle_get_telemetry_subscriptions(rk, err);
return;

err_parse:
err = rkbuf->rkbuf_err;
goto err;

err:
/* TODO: Add error handling actions, possibly call
* rd_kafka_handle_get_telemetry_subscriptions with error. */
return;
}

Expand Down Expand Up @@ -5400,12 +5404,14 @@ void rd_kafka_handle_PushTelemetry(rd_kafka_t *rk,
goto err;
}
rd_kafka_handle_push_telemetry(rk, err);

return;
err_parse:
err = rkbuf->rkbuf_err;
goto err;

err:
/* TODO: Add error handling actions, possibly call
* rd_kafka_handle_push_telemetry with error. */
return;
}

Expand Down
Loading