Skip to content

Commit

Permalink
Unified rd_kafka_DescribeGroupsRequest
Browse files Browse the repository at this point in the history
Max allowed number of groups
Initial size calculation
Return an error in case of unsupported feature.
Use protocol field names
Remove const from error
  • Loading branch information
emasab committed Dec 2, 2022
1 parent 946d8c3 commit 545c3f7
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 103 deletions.
13 changes: 11 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -4749,10 +4749,19 @@ static void rd_kafka_ListGroups_resp_cb(rd_kafka_t *rk,
}

if (i > 0) {
rd_kafka_error_t *error;

state->wait_cnt++;
rd_kafka_DescribeGroupsRequest(
rkb, (const char **)grps, i, RD_KAFKA_REPLYQ(state->q, 0),
error = rd_kafka_DescribeGroupsRequest(
rkb, 0, (const char **)grps, i,
RD_KAFKA_REPLYQ(state->q, 0),
rd_kafka_DescribeGroups_resp_cb, state);
if (error) {
rd_kafka_DescribeGroups_resp_cb(
rk, rkb, rd_kafka_error_code(error), reply, request,
opaque);
rd_kafka_error_destroy(error);
}

while (i-- > 0)
rd_free(grps[i]);
Expand Down
186 changes: 103 additions & 83 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6276,17 +6276,26 @@ static rd_kafka_resp_err_t rd_kafka_admin_DescribeConsumerGroupsRequest(
void *opaque) {
int i;
char *group;
rd_kafka_resp_err_t err;
int groups_cnt = rd_list_cnt(groups);
rd_kafka_error_t *error = NULL;
const char **groups_arr = calloc(groups_cnt, sizeof(*groups_arr));

RD_LIST_FOREACH(group, groups, i) {
groups_arr[i] = rd_list_elem(groups, i);
}
rd_kafka_DescribeGroupsRequest(rkb, groups_arr, groups_cnt, replyq,
resp_cb, opaque);

error = rd_kafka_DescribeGroupsRequest(rkb, -1, groups_arr, groups_cnt,
replyq, resp_cb, opaque);
rd_free((void *)groups_arr);

if (error) {
rd_snprintf(errstr, errstr_size, "%s",
rd_kafka_error_string(error));
err = rd_kafka_error_code(error);
rd_kafka_error_destroy(error);
return err;
}

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

Expand All @@ -6300,16 +6309,22 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
char *errstr,
size_t errstr_size) {
const int log_decode_errors = LOG_ERR;
int cnt, nodeid, port;
int nodeid, port;
int16_t api_version;
int32_t cnt;
rd_kafka_op_t *rko_result = NULL;
rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
rd_kafka_Node_t *node = NULL;
rd_kafka_error_t *error = NULL;
char *group_s = NULL, *group_state_s = NULL, *proto_type_s = NULL,
*proto_s = NULL, *host = NULL, *member_id_s = NULL,
*client_id_s = NULL, *client_host_s = NULL;
char *group_id = NULL, *group_state = NULL, *proto_type = NULL,
*proto = NULL, *host = NULL;

rd_kafka_buf_read_i32(reply, &cnt);
api_version = rd_kafka_buf_ApiVersion(reply);
if (api_version >= 1) {
rd_kafka_buf_read_throttle_time(reply);
}

rd_kafka_buf_read_arraycnt(reply, &cnt, 100000);

rko_result = rd_kafka_admin_result_new(rko_req);
rd_list_init(&rko_result->rko_u.admin_result.results, cnt,
Expand All @@ -6322,75 +6337,74 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_broker_unlock(rkb);

node = rd_kafka_Node_new(nodeid, host, port, NULL);

while (cnt-- > 0) {
int16_t error_code;
rd_kafkap_str_t group, group_state, proto_type, proto;
rd_kafkap_str_t GroupId, GroupState, ProtocolType, ProtocolData;
rd_bool_t is_simple_consumer_group, is_consumer_protocol_type;
int member_cnt;
int32_t member_cnt;
rd_list_t members;
rd_kafka_ConsumerGroupDescription_t *grpdesc = NULL;

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &group);
rd_kafka_buf_read_str(reply, &group_state);
rd_kafka_buf_read_str(reply, &proto_type);
rd_kafka_buf_read_str(reply, &proto);
rd_kafka_buf_read_i32(reply, &member_cnt);
rd_kafka_buf_read_str(reply, &GroupId);
rd_kafka_buf_read_str(reply, &GroupState);
rd_kafka_buf_read_str(reply, &ProtocolType);
rd_kafka_buf_read_str(reply, &ProtocolData);
rd_kafka_buf_read_arraycnt(reply, &member_cnt, 100000);

group_s = RD_KAFKAP_STR_DUP(&group);
group_state_s = RD_KAFKAP_STR_DUP(&group_state);
proto_type_s = RD_KAFKAP_STR_DUP(&proto_type);
proto_s = RD_KAFKAP_STR_DUP(&proto);
group_id = RD_KAFKAP_STR_DUP(&GroupId);
group_state = RD_KAFKAP_STR_DUP(&GroupState);
proto_type = RD_KAFKAP_STR_DUP(&ProtocolType);
proto = RD_KAFKAP_STR_DUP(&ProtocolData);

if (error_code) {
error = rd_kafka_error_new(
error_code, "DescribeConsumerGroups: %s",
rd_kafka_err2str(error_code));
}

if (member_cnt > 100000) {
error = rd_kafka_error_new(
RD_KAFKA_RESP_ERR__BAD_MSG,
"Member count exceeds max size of 100000.");
}

is_simple_consumer_group = *proto_type_s == '\0';
is_simple_consumer_group = *proto_type == '\0';
is_consumer_protocol_type =
!strcmp(proto_type_s, CONSUMER_PROTOCOL_TYPE);
!strcmp(proto_type, CONSUMER_PROTOCOL_TYPE);
if (error == NULL && !is_simple_consumer_group &&
!is_consumer_protocol_type) {
error = rd_kafka_error_new(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"GroupId %s is not a consumer group (%s).", group_s,
proto_type_s);
"GroupId %s is not a consumer group (%s).",
group_id, proto_type);
}

rd_list_init(&members, 0, rd_kafka_MemberDescription_free);

while (member_cnt-- > 0) {
rd_kafkap_str_t member_id, client_id, client_host;
char *member_id_s, *client_id_s, *client_host_s;
rd_kafkap_str_t MemberId, ClientId, ClientHost,
GroupInstanceId = RD_KAFKAP_STR_INITIALIZER;
char *member_id, *client_id, *client_host,
*group_instance_id = NULL;
rd_kafkap_bytes_t MemberMetadata, MemberAssignment;
rd_kafka_MemberDescription_t *member;
rd_kafka_MemberAssignment_t assignment_s = RD_ZERO_INIT;
rd_kafkap_bytes_t meta, assignment;
rd_kafka_MemberAssignment_t assignment = RD_ZERO_INIT;
rd_kafka_topic_partition_list_t *toppars = NULL;
rd_kafka_buf_t *rkbuf;

rd_kafka_buf_read_str(reply, &member_id);
rd_kafka_buf_read_str(reply, &client_id);
rd_kafka_buf_read_str(reply, &client_host);
rd_kafka_buf_read_bytes(reply, &meta);
rd_kafka_buf_read_bytes(reply, &assignment);
rd_kafka_buf_read_str(reply, &MemberId);
if (api_version >= 4) {
rd_kafka_buf_read_str(reply, &GroupInstanceId);
}
rd_kafka_buf_read_str(reply, &ClientId);
rd_kafka_buf_read_str(reply, &ClientHost);
rd_kafka_buf_read_bytes(reply, &MemberMetadata);
rd_kafka_buf_read_bytes(reply, &MemberAssignment);
if (error != NULL)
continue;

if (RD_KAFKAP_BYTES_LEN(&assignment) != 0) {
if (RD_KAFKAP_BYTES_LEN(&MemberAssignment) != 0) {
int16_t version;
/* Parse assignment */
rkbuf = rd_kafka_buf_new_shadow(
assignment.data,
RD_KAFKAP_BYTES_LEN(&assignment), NULL);
MemberAssignment.data,
RD_KAFKAP_BYTES_LEN(&MemberAssignment),
NULL);
/* Protocol parser needs a broker handle
* to log errors on. */
rkbuf->rkbuf_rkb = rkb;
Expand All @@ -6406,52 +6420,64 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
"Error reading topic "
"partitions");
}
assignment_s.topic_partitions = toppars;
assignment.topic_partitions = toppars;
}

member_id_s = RD_KAFKAP_STR_DUP(&member_id);
client_id_s = RD_KAFKAP_STR_DUP(&client_id);
client_host_s = RD_KAFKAP_STR_DUP(&client_host);
member_id = RD_KAFKAP_STR_DUP(&MemberId);
if (!RD_KAFKAP_STR_IS_NULL(&GroupInstanceId)) {
group_instance_id =
RD_KAFKAP_STR_DUP(&GroupInstanceId);
}
client_id = RD_KAFKAP_STR_DUP(&ClientId);
client_host = RD_KAFKAP_STR_DUP(&ClientHost);

member = rd_kafka_MemberDescription_new(
client_id_s, member_id_s, NULL, client_host_s,
assignment_s);
client_id, member_id, group_instance_id,
client_host, assignment);
rd_list_add(&members, member);
if (assignment_s.topic_partitions)
if (assignment.topic_partitions)
rd_kafka_topic_partition_list_destroy(
assignment_s.topic_partitions);
rd_free(member_id_s);
rd_free(client_id_s);
rd_free(client_host_s);
member_id_s = NULL;
client_id_s = NULL;
client_host_s = NULL;
assignment.topic_partitions);
rd_free(member_id);
rd_free(group_instance_id);
rd_free(client_id);
rd_free(client_host);
member_id = NULL;
group_instance_id = NULL;
client_id = NULL;
client_host = NULL;
}

if (api_version >= 3) {
/* TODO: implement KIP-430 */
int32_t authorized_operations;
rd_kafka_buf_read_i32(reply, &authorized_operations);
}

if (error == NULL) {
grpdesc = rd_kafka_ConsumerGroupDescription_new(
group_s, is_simple_consumer_group, &members,
proto_s,
rd_kafka_consumer_group_state_code(group_state_s),
group_id, is_simple_consumer_group, &members, proto,
rd_kafka_consumer_group_state_code(group_state),
node, error);
} else {
grpdesc = rd_kafka_ConsumerGroupDescription_new_error(
group_s, error);
group_id, error);
}
rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc);
if (error)
rd_kafka_error_destroy(error);
rd_list_destroy(&members);
rd_free(group_s);
rd_free(group_state_s);
rd_free(proto_type_s);
rd_free(proto_s);
error = NULL;
group_s = NULL;
group_state_s = NULL;
proto_type_s = NULL;
proto_s = NULL;
rd_free(group_id);
rd_free(group_state);
rd_free(proto_type);
rd_free(proto);
error = NULL;
group_id = NULL;
group_state = NULL;
proto_type = NULL;
proto = NULL;
}

if (host)
rd_free(host);
if (node)
Expand All @@ -6460,20 +6486,14 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
return RD_KAFKA_RESP_ERR_NO_ERROR;

err_parse:
if (group_s)
rd_free(group_s);
if (group_state_s)
rd_free(group_state_s);
if (proto_type_s)
rd_free(proto_type_s);
if (proto_s)
rd_free(proto_s);
if (member_id_s)
rd_free(member_id_s);
if (client_id_s)
rd_free(client_id_s);
if (client_host_s)
rd_free(client_host_s);
if (group_id)
rd_free(group_id);
if (group_state)
rd_free(group_state);
if (proto_type)
rd_free(proto_type);
if (proto)
rd_free(proto);
if (error)
rd_kafka_error_destroy(error);
if (host)
Expand Down
65 changes: 53 additions & 12 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1904,28 +1904,69 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
}

/**
* Send DescribeGroupsRequest
* @brief Construct and send DescribeGroupsRequest to \p rkb
* with the groups (const char *) in \p groups.
* Uses \p max_ApiVersion as maximum API version,
* pass -1 to use the maximum available version.
*
* The response (unparsed) will be enqueued on \p replyq
* for handling by \p resp_cb (with \p opaque passed).
*
* @return NULL on success, a new error instance that must be
* released with rd_kafka_error_destroy() in case of error.
*/
void rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb,
const char **groups,
int group_cnt,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_error_t *rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb,
int16_t max_ApiVersion,
const char **groups,
size_t group_cnt,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion = 0;
size_t of_GroupsArrayCnt;

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups, 1,
32 * group_cnt);
if (max_ApiVersion < 0)
max_ApiVersion = 4;

rd_kafka_buf_write_i32(rkbuf, group_cnt);
if (max_ApiVersion > ApiVersion) {
/* Remark: don't check if max_ApiVersion is zero.
* As rd_kafka_broker_ApiVersion_supported cannot be checked
* in the application thread reliably . */
ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_DescribeGroups, 0, max_ApiVersion, NULL);
}

if (ApiVersion == -1) {
return rd_kafka_error_new(
RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
"Unsupported feature.");
}

rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_DescribeGroups, 1,
4 /* rd_kafka_buf_write_arraycnt_pos */ +
1 /* IncludeAuthorizedOperations */ + 1 /* tags */ +
32 * group_cnt /* Groups */,
rd_false);

/* write Groups */
of_GroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, group_cnt);
while (group_cnt-- > 0)
rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);

/* write IncludeAuthorizedOperations */
if (ApiVersion >= 3) {
/* TODO: implement KIP-430 */
rd_kafka_buf_write_bool(rkbuf, rd_false);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
return NULL;
}



/**
* @brief Generic handler for Metadata responses
*
Expand Down
Loading

0 comments on commit 545c3f7

Please sign in to comment.