Skip to content

Commit

Permalink
Metadata cache by topic id and
Browse files Browse the repository at this point in the history
fixes for failing metadata tests:
- cache is updated on full metadata refresh
  and not cleared
- fast metadata refresh stops without
  leader change when there are no stale
  leader epochs
- handling broker isn't updated on stale leader
  epoch
  • Loading branch information
emasab committed Mar 26, 2024
1 parent ce3ce56 commit 268cb01
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 106 deletions.
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,36 @@ librdkafka v2.3.1 is a maintenance release:
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Integration tests can be started in KRaft mode and run against any
GitHub Kafka branch other than the released versions.
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4660)
* Fixes to metadata cache expiration, metadata refresh interruption and
to avoid usage of stale metadata (#4660).
* Fix to main loop timeout calculation leading to a tight loop for a period
of 1 ms max (#4660).


## Fixes

### General fixes

* Metadata cache was cleared on full metadata refresh, leading to unnecessary
refreshes and accasional `UNKNOWN_TOPIC_OR_PART` errors. Solved by updating
cache for existing or hinted entries instead of clearing them.
Happening since 2.1.0 (#4660).
* Metadata refreshes without partition leader change could lead to a loop of
intervaled metadata calls. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4660).
* A partition migration could happen, using stale metadata, when the partition
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4660).
* In librdkafka main thread loop, when it was awaken less that 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4660).




# librdkafka v2.3.0
Expand Down
7 changes: 6 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,12 @@ static int rd_kafka_thread_main(void *arg) {
RD_KAFKA_CGRP_STATE_TERM)))) {
rd_ts_t sleeptime = rd_kafka_timers_next(
&rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
/* Round up to avoid calling serve with timeout 0 ms
* in a tight loop until 1 ms has passed. */
int timeout_ms = (int)(sleeptime / 1000);
if (sleeptime % 1000 > 0)
timeout_ms++;
rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);
Expand Down
134 changes: 94 additions & 40 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,14 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_metadata_internal_t *mdi = NULL;
rd_kafka_metadata_t *md = NULL;
size_t rkb_namelen;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;

const rd_list_t *requested_topics = request_topics;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
const int log_decode_errors = LOG_ERR;
rd_list_t *missing_topics = NULL;
rd_list_t *missing_topic_ids = NULL;

const rd_list_t *requested_topics = request_topics;
const rd_list_t *requested_topic_ids = NULL;
rd_bool_t all_topics = rd_false;
rd_bool_t cgrp_update = rd_false;
rd_bool_t has_reliable_leader_epochs =
rd_kafka_has_reliable_leader_epochs(rkb);
int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
Expand All @@ -496,8 +498,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_bool_t compute_racks = has_client_rack;

if (request) {
requested_topics = request->rkbuf_u.Metadata.topics;
all_topics = request->rkbuf_u.Metadata.all_topics;
requested_topics = request->rkbuf_u.Metadata.topics;
requested_topic_ids = request->rkbuf_u.Metadata.topic_ids;
all_topics = request->rkbuf_u.Metadata.all_topics;
cgrp_update =
request->rkbuf_u.Metadata.cgrp_update && rk->rk_cgrp;
compute_racks |= request->rkbuf_u.Metadata.force_racks;
Expand All @@ -519,6 +522,9 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
if (requested_topics)
missing_topics =
rd_list_copy(requested_topics, rd_list_string_copy, NULL);
if (requested_topic_ids)
missing_topic_ids =
rd_list_copy(requested_topic_ids, rd_list_Uuid_copy, NULL);

rd_kafka_broker_lock(rkb);
rkb_namelen = strlen(rkb->rkb_name) + 1;
Expand Down Expand Up @@ -833,34 +839,37 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);

// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
if (requested_topics)
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
md->topics[i].topic,
(void *)strcmp));
if (!all_topics) {
/* Only update cache when not asking
* for all topics. */

rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt);
cache_changes++;
rd_kafka_wrunlock(rk);
}
}
if (requested_topic_ids)
rd_list_free_cb(
missing_topic_ids,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
rd_kafka_wrlock(rk);
cache_changes +=
rd_kafka_metadata_cache_topic_update(
rk, &md->topics[i], &mdi->topics[i],
rd_false /*propagate later*/,
/* use has_client_rack rather than
compute_racks. We need cached rack ids
only in case we need to rejoin the group
if they change and client.rack is set
(KIP-881). */
has_client_rack, mdi->brokers,
md->broker_cnt,
all_topics /*cache entry needs to exist
*if all_topics*/);
rd_kafka_wrunlock(rk);
}

// TODO: Should be done for missing_topic_ids as well.
/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
Expand Down Expand Up @@ -892,6 +901,41 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}
}
}
if (missing_topic_ids) {
rd_kafka_Uuid_t *topic_id;
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"%d/%d requested topic(s) seen in metadata",
rd_list_cnt(requested_topic_ids) -
rd_list_cnt(missing_topic_ids),
rd_list_cnt(requested_topic_ids));
for (i = 0; i < rd_list_cnt(missing_topic_ids); i++) {
rd_kafka_Uuid_t *missing_topic_id =
missing_topic_ids->rl_elems[i];
rd_rkb_dbg(rkb, TOPIC, "METADATA", "wanted %s",
rd_kafka_Uuid_base64str(missing_topic_id));
}
RD_LIST_FOREACH(topic_id, missing_topic_ids, i) {
rd_kafka_topic_t *rkt;

rd_kafka_rdlock(rk);
rkt = rd_kafka_topic_find_by_topic_id(rkb->rkb_rk,
*topic_id);
rd_kafka_rdunlock(rk);
if (rkt) {
/* Received metadata response contained no
* information about topic 'rkt' and thus
* indicates the topic is not available in the
* cluster.
* Mark the topic as non-existent */
rd_kafka_topic_wrlock(rkt);
rd_kafka_topic_set_notexists(
rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
rd_kafka_topic_wrunlock(rkt);

rd_kafka_topic_destroy0(rkt);
}
}
}


rd_kafka_wrlock(rkb->rkb_rk);
Expand Down Expand Up @@ -956,17 +1000,18 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
"Caching full metadata with "
"%d broker(s) and %d topic(s): %s",
md->broker_cnt, md->topic_cnt, reason);
} else {
if (cache_changes)
rd_kafka_metadata_cache_propagate_changes(rk);
rd_kafka_metadata_cache_expiry_start(rk);
}

if (cache_changes) {
rd_kafka_metadata_cache_propagate_changes(rk);
}
rd_kafka_metadata_cache_expiry_start(rk);

// TODO: Should be done for requested_topic_ids as well.
/* Remove cache hints for the originally requested topics. */
if (requested_topics)
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
if (requested_topic_ids)
rd_kafka_metadata_cache_purge_hints(rk, requested_topic_ids);

rd_kafka_wrunlock(rkb->rkb_rk);

Expand All @@ -982,7 +1027,8 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
* which may contain only a sub-set of the subscribed topics (namely
* the effective subscription of available topics) as to not
* propagate non-included topics as non-existent. */
if (cgrp_update && (requested_topics || all_topics))
if (cgrp_update &&
(requested_topics || requested_topic_ids || all_topics))
rd_kafka_cgrp_metadata_update_check(rkb->rkb_rk->rk_cgrp,
rd_true /*do join*/);

Expand All @@ -995,10 +1041,10 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
}

done:

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);

/* This metadata request was triggered by someone wanting
* the metadata information back as a reply, so send that reply now.
Expand All @@ -1013,18 +1059,26 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
err_parse:
err = rkbuf->rkbuf_err;
err:
// TODO: Should be done for requested_topic_ids as well.
if (requested_topics) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints(rk, requested_topics);
rd_kafka_wrunlock(rkb->rkb_rk);
}
if (requested_topic_ids) {
/* Failed requests shall purge cache hints for
* the requested topics. */
rd_kafka_wrlock(rkb->rkb_rk);
rd_kafka_metadata_cache_purge_hints_by_id(rk,
requested_topic_ids);
rd_kafka_wrunlock(rkb->rkb_rk);
}

// TODO: Should be done for requested_topic_ids as well.
if (missing_topics)
rd_list_destroy(missing_topics);
if (missing_topic_ids)
rd_list_destroy(missing_topic_ids);
rd_tmpabuf_destroy(&tbuf);

return err;
Expand Down
17 changes: 13 additions & 4 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ rd_kafka_metadata_new_topic_with_partition_replicas_mock(int replication_factor,
*/

struct rd_kafka_metadata_cache_entry {
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode; /* rkmc_avl */
rd_avl_node_t rkmce_avlnode_by_id; /* rkmc_avl_by_id */
TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */
rd_ts_t rkmce_ts_expires; /* Expire time */
rd_ts_t rkmce_ts_insert; /* Insert time */
Expand All @@ -243,6 +244,7 @@ struct rd_kafka_metadata_cache_entry {

struct rd_kafka_metadata_cache {
rd_avl_t rkmc_avl;
rd_avl_t rkmc_avl_by_id;
TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry;
rd_kafka_timer_t rkmc_expiry_tmr;
int rkmc_cnt;
Expand Down Expand Up @@ -271,19 +273,26 @@ struct rd_kafka_metadata_cache {
int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic);
void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk);
int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts);
void rd_kafka_metadata_cache_topic_update(
int rd_kafka_metadata_cache_topic_update(
rd_kafka_t *rk,
const rd_kafka_metadata_topic_t *mdt,
rd_kafka_metadata_topic_t *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
rd_bool_t propagate,
rd_bool_t include_metadata,
rd_kafka_metadata_broker_internal_t *brokers,
size_t broker_cnt);
size_t broker_cnt,
rd_bool_t only_existing);
void rd_kafka_metadata_cache_propagate_changes(rd_kafka_t *rk);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find(rd_kafka_t *rk, const char *topic, int valid);
struct rd_kafka_metadata_cache_entry *
rd_kafka_metadata_cache_find_by_id(rd_kafka_t *rk,
const rd_kafka_Uuid_t,
int valid);
void rd_kafka_metadata_cache_purge_hints(rd_kafka_t *rk,
const rd_list_t *topics);
void rd_kafka_metadata_cache_purge_hints_by_id(rd_kafka_t *rk,
const rd_list_t *topics);
int rd_kafka_metadata_cache_hint(rd_kafka_t *rk,
const rd_list_t *topics,
rd_list_t *dst,
Expand Down
Loading

0 comments on commit 268cb01

Please sign in to comment.