Skip to content

Commit

Permalink
KIP-345: add the group instance id
Browse files Browse the repository at this point in the history
to the DescribeConsumerGroups response
  • Loading branch information
emasab committed Dec 2, 2022
1 parent 545c3f7 commit 73b9c63
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To restore the previous behaviour, set `ssl.endpoint.identification.algorithm` t
Improvement in documentation for this property.
* Added a `resolve_cb` configuration setting that permits using custom DNS resolution logic.
* Added `rd_kafka_mock_broker_error_stack_cnt()`.
* Admin API DescribeGroups() now provides the group instance id for static members [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) (#4091).

## Fixes

Expand Down
12 changes: 8 additions & 4 deletions examples/describe_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ print_groups_info(const rd_kafka_DescribeConsumerGroups_result_t *grpdesc,
for (j = 0; j < member_cnt; j++) {
const rd_kafka_MemberDescription_t *member =
rd_kafka_ConsumerGroupDescription_member(group, j);
printf(" Member \"%s\" with client-id %s, host %s\n",
rd_kafka_MemberDescription_consumer_id(member),
rd_kafka_MemberDescription_client_id(member),
rd_kafka_MemberDescription_host(member));
printf(
" Member \"%s\" with client-id %s,"
" group instance id: %s, host %s\n",
rd_kafka_MemberDescription_consumer_id(member),
rd_kafka_MemberDescription_client_id(member),
rd_kafka_MemberDescription_group_instance_id(
member),
rd_kafka_MemberDescription_host(member));
const rd_kafka_MemberAssignment_t *assignment =
rd_kafka_MemberDescription_assignment(member);
const rd_kafka_topic_partition_list_t
Expand Down
14 changes: 14 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -7934,6 +7934,20 @@ RD_EXPORT
const char *rd_kafka_MemberDescription_client_id(
const rd_kafka_MemberDescription_t *member);

/**
* @brief Gets group instance id of \p member.
*
* @param member The group member.
*
* @return The group instance id, or NULL if not available.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p member object.
*/
RD_EXPORT
const char *rd_kafka_MemberDescription_group_instance_id(
const rd_kafka_MemberDescription_t *member);

/**
* @brief Gets consumer id of \p member.
*
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6015,6 +6015,12 @@ const char *rd_kafka_MemberDescription_client_id(
return member->client_id;
}

const char *rd_kafka_MemberDescription_group_instance_id(
const rd_kafka_MemberDescription_t *member) {
rd_assert(member != NULL);
return member->group_instance_id;
}

const char *rd_kafka_MemberDescription_consumer_id(
const rd_kafka_MemberDescription_t *member) {
rd_assert(member != NULL);
Expand Down
67 changes: 63 additions & 4 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2597,6 +2597,9 @@ static void do_test_DescribeConsumerGroups(const char *what,
expected_DescribeConsumerGroups_result_t
expected[TEST_DESCRIBE_GROUPS_CNT] = RD_ZERO_INIT;
const char *describe_groups[TEST_DESCRIBE_GROUPS_CNT];
char group_instance_ids[TEST_DESCRIBE_GROUPS_CNT][512];
char client_ids[TEST_DESCRIBE_GROUPS_CNT][512];
rd_kafka_t *rks[TEST_DESCRIBE_GROUPS_CNT];
const rd_kafka_DescribeConsumerGroups_result_t *res;

SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, request_timeout %d",
Expand Down Expand Up @@ -2627,10 +2630,27 @@ static void do_test_DescribeConsumerGroups(const char *what,
test_produce_msgs_easy(topic, testid, 0, msgs_cnt);

for (i = 0; i < TEST_DESCRIBE_GROUPS_CNT; i++) {
rd_kafka_conf_t *conf;
char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
if (i < known_groups) {
test_consume_msgs_easy(group_id, topic, testid, -1,
msgs_cnt, NULL);
snprintf(group_instance_ids[i],
sizeof(group_instance_ids[i]),
"group_instance_id_%" PRId32, i);
snprintf(client_ids[i], sizeof(client_ids[i]),
"client_id_%" PRId32, i);

test_conf_init(&conf, NULL, 5);
test_conf_set(conf, "client.id", client_ids[i]);
test_conf_set(conf, "group.instance.id",
group_instance_ids[i]);
test_conf_set(conf, "session.timeout.ms", "5000");
test_conf_set(conf, "auto.offset.reset", "earliest");
rks[i] =
test_create_consumer(group_id, NULL, conf, NULL);
test_consumer_subscribe(rks[i], topic);
/* Consume messages */
test_consumer_poll("consumer", rks[i], testid, -1, -1,
msgs_cnt, NULL);
}
expected[i].group_id = group_id;
expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
Expand Down Expand Up @@ -2708,15 +2728,46 @@ static void do_test_DescribeConsumerGroups(const char *what,
i, exp->group_id,
rd_kafka_ConsumerGroupDescription_group_id(act));
if (i < known_groups) {
int member_count;
const rd_kafka_MemberDescription_t *member;
const char *client_id;
const char *group_instance_id;

TEST_ASSERT(state ==
RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY,
"Expected Empty state, got %s.",
RD_KAFKA_CONSUMER_GROUP_STATE_STABLE,
"Expected Stable state, got %s.",
rd_kafka_consumer_group_state_name(state));

TEST_ASSERT(
!rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(
act),
"Expected a normal consumer group, got a simple "
"one.");

member_count =
rd_kafka_ConsumerGroupDescription_member_count(act);
TEST_ASSERT(member_count == 1,
"Expected one member, got %d.",
member_count);

member =
rd_kafka_ConsumerGroupDescription_member(act, 0);

client_id =
rd_kafka_MemberDescription_client_id(member);
TEST_ASSERT(!strcmp(client_id, client_ids[i]),
"Expected client id \"%s\","
" got \"%s\".",
client_ids[i], client_id);

group_instance_id =
rd_kafka_MemberDescription_group_instance_id(
member);
TEST_ASSERT(
!strcmp(group_instance_id, group_instance_ids[i]),
"Expected group instance id \"%s\","
" got \"%s\".",
group_instance_ids[i], group_instance_id);
} else {
TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_DEAD,
"Expected Dead state, got %s.",
Expand All @@ -2730,6 +2781,14 @@ static void do_test_DescribeConsumerGroups(const char *what,

rd_kafka_event_destroy(rkev);

for (i = 0; i < known_groups; i++) {
test_consumer_close(rks[i]);
rd_kafka_destroy(rks[i]);
}

/* Wait session timeout + 1s. Because using static group membership */
rd_sleep(6);

test_DeleteGroups_simple(rk, NULL, (char **)describe_groups,
known_groups, NULL);

Expand Down

0 comments on commit 73b9c63

Please sign in to comment.