Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #4941

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

librdkafka v2.6.3 is a maintenance release:

* DescribeConsumerGroup now supports new consumer protocol groups (#4922).
* Socket options are now all set before connection (#4893).
* Client certificate chain is now sent when using `ssl.certificate.pem`
or `ssl_certificate` or `ssl.keystore.location` (#4894).
Expand Down
18 changes: 17 additions & 1 deletion examples/describe_consumer_groups.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) {
printf(" Assignment:\n");
print_partition_list(stdout, topic_partitions, 0, " ");
}
const rd_kafka_MemberAssignment_t *target_assignment =
rd_kafka_MemberDescription_target_assignment(member);
const rd_kafka_topic_partition_list_t *target_topic_partitions =
rd_kafka_MemberAssignment_partitions(target_assignment);
if (!target_topic_partitions) {
printf(" No target assignment\n");
} else if (target_topic_partitions->cnt == 0) {
printf(" Empty target assignment\n");
} else {
printf(" Target assignment:\n");
print_partition_list(stdout, target_topic_partitions, 0,
" ");
}
}


Expand All @@ -194,6 +207,8 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
rd_kafka_consumer_group_state_t state =
rd_kafka_ConsumerGroupDescription_state(group);
rd_kafka_consumer_group_type_t type =
rd_kafka_ConsumerGroupDescription_type(group);
authorized_operations =
rd_kafka_ConsumerGroupDescription_authorized_operations(
group, &authorized_operations_cnt);
Expand All @@ -212,9 +227,10 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
rd_kafka_Node_port(coordinator));
}
printf(
"Group \"%s\", partition assignor \"%s\", "
"Group \"%s\", partition assignor \"%s\", type \"%s\" "
" state %s%s, with %" PRId32 " member(s)\n",
group_id, partition_assignor,
rd_kafka_consumer_group_type_name(type),
rd_kafka_consumer_group_state_name(state), coordinator_desc,
member_cnt);
for (j = 0; j < authorized_operations_cnt; j++) {
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
[RD_KAFKAP_AlterClientQuotas] = rd_true,
[RD_KAFKAP_DescribeUserScramCredentials] = rd_true,
[RD_KAFKAP_AlterUserScramCredentials] = rd_true,
[RD_KAFKAP_ConsumerGroupDescribe] = rd_true,
}};
int i;
int cnt = 0;
Expand Down
46 changes: 46 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -5593,6 +5593,8 @@ typedef int rd_kafka_event_type_t;
#define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000
/** ElectLeaders_result_t */
#define RD_KAFKA_EVENT_ELECTLEADERS_RESULT 0x800000
/** ConsumerGroupDescribe_result_t */
#define RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT 0x1000000

/**
* @returns the event type for the given event.
Expand Down Expand Up @@ -5752,6 +5754,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev);
* - RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT
* - RD_KAFKA_EVENT_LISTOFFSETS_RESULT
* - RD_KAFKA_EVENT_ELECTLEADERS_RESULT
* - RD_KAFKA_EVENT_CONSUMERGROUPDESCRIBE_RESULT
*/
RD_EXPORT
void *rd_kafka_event_opaque(rd_kafka_event_t *rkev);
Expand Down Expand Up @@ -8881,6 +8884,20 @@ RD_EXPORT
const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets type for the \p grpdesc group.
*
* @param grpdesc The group description.
*
* @return A group type.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p grpdesc object.
*/
RD_EXPORT
rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupDescription_type(
const rd_kafka_ConsumerGroupDescription_t *grpdesc);

/**
* @brief Gets the members count of \p grpdesc group.
*
Expand Down Expand Up @@ -8993,6 +9010,35 @@ RD_EXPORT
const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions(
const rd_kafka_MemberAssignment_t *assignment);

/**
* @brief Gets target assignment of \p member.
*
* @param member The group member.
*
* @return The target assignment.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p member object.
*/
RD_EXPORT
const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment(
const rd_kafka_MemberDescription_t *member);

/**
* @brief Gets target assigned partitions of a member \p assignment.
*
* @param assignment The group member assignment.
*
* @return The target assigned partitions.
*
* @remark The lifetime of the returned memory is the same
* as the lifetime of the \p assignment object.
*/
RD_EXPORT
const rd_kafka_topic_partition_list_t *
rd_kafka_MemberAssignment_target_partitions(
const rd_kafka_MemberAssignment_t *assignment);

/**@}*/

/**
Expand Down
Loading