Skip to content

Commit

Permalink
Use protocol field names
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Dec 2, 2022
1 parent a11ae32 commit 6a484c4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
50 changes: 25 additions & 25 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -5694,7 +5694,7 @@ rd_kafka_ListConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
rd_list_t valid, errors;
const rd_kafka_ListConsumerGroupsResult_t *list_result;
char *group_s = NULL, *group_state_s = NULL, *proto_type_s = NULL;
char *group_id = NULL, *group_state = NULL, *proto_type = NULL;

api_version = rd_kafka_buf_ApiVersion(reply);
if (api_version >= 1) {
Expand Down Expand Up @@ -5724,52 +5724,52 @@ rd_kafka_ListConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_ListConsumerGroupsResult_free);

for (i = 0; i < cnt; i++) {
rd_kafkap_str_t grp, protocol_type, grp_state = RD_ZERO_INIT;
rd_kafkap_str_t GroupId, ProtocolType,
GroupState = RD_ZERO_INIT;
rd_kafka_ConsumerGroupListing_t *group_listing;
rd_bool_t is_simple_consumer_group, is_consumer_protocol_type;
rd_kafka_consumer_group_state_t state =
RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN;

rd_kafka_buf_read_str(reply, &grp);
rd_kafka_buf_read_str(reply, &protocol_type);
rd_kafka_buf_read_str(reply, &GroupId);
rd_kafka_buf_read_str(reply, &ProtocolType);
if (api_version >= 4) {
rd_kafka_buf_read_str(reply, &grp_state);
rd_kafka_buf_read_str(reply, &GroupState);
}
rd_kafka_buf_skip_tags(reply);

group_s = RD_KAFKAP_STR_DUP(&grp);
proto_type_s = RD_KAFKAP_STR_DUP(&protocol_type);
group_id = RD_KAFKAP_STR_DUP(&GroupId);
proto_type = RD_KAFKAP_STR_DUP(&ProtocolType);
if (api_version >= 4) {
group_state_s = RD_KAFKAP_STR_DUP(&grp_state);
state =
rd_kafka_consumer_group_state_code(group_state_s);
group_state = RD_KAFKAP_STR_DUP(&GroupState);
state = rd_kafka_consumer_group_state_code(group_state);
}

is_simple_consumer_group = *proto_type_s == '\0';
is_simple_consumer_group = *proto_type == '\0';
is_consumer_protocol_type =
!strcmp(proto_type_s, CONSUMER_PROTOCOL_TYPE);
!strcmp(proto_type, CONSUMER_PROTOCOL_TYPE);
if (is_simple_consumer_group || is_consumer_protocol_type) {
group_listing = rd_kafka_ConsumerGroupListing_new(
group_s, is_simple_consumer_group, state);
group_id, is_simple_consumer_group, state);
rd_list_add(&valid, group_listing);
}

rd_free(group_s);
rd_free(group_state_s);
rd_free(proto_type_s);
group_s = NULL;
group_state_s = NULL;
proto_type_s = NULL;
rd_free(group_id);
rd_free(group_state);
rd_free(proto_type);
group_id = NULL;
group_state = NULL;
proto_type = NULL;
}
rd_kafka_buf_skip_tags(reply);

err_parse:
if (group_s)
rd_free(group_s);
if (group_state_s)
rd_free(group_state_s);
if (proto_type_s)
rd_free(proto_type_s);
if (group_id)
rd_free(group_id);
if (group_state)
rd_free(group_state);
if (proto_type)
rd_free(proto_type);

if (reply->rkbuf_err) {
error_code = reply->rkbuf_err;
Expand Down
5 changes: 3 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1887,11 +1887,12 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
,
is_flexver);
if (ApiVersion >= 4) {
size_t states_arraycnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
size_t of_GroupsArrayCnt =
rd_kafka_buf_write_arraycnt_pos(rkbuf);
for (i = 0; i < states_cnt; i++) {
rd_kafka_buf_write_kstr(rkbuf, states[i]);
}
rd_kafka_buf_finalize_arraycnt(rkbuf, states_arraycnt, i);
rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i);
}
if (is_flexver) {
rd_kafka_buf_write_tags(rkbuf);
Expand Down

0 comments on commit 6a484c4

Please sign in to comment.