Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #4939

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

librdkafka v2.6.3 is a maintenance release:

* Extend Config Apis to support group config (#4939).
* Socket options are now all set before connection (#4893).
* Client certificate chain is now sent when using `ssl.certificate.pem`
or `ssl_certificate` or `ssl.keystore.location` (#4894).
Expand Down
29 changes: 27 additions & 2 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,23 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) {
}


int8_t map_from_resource_type_to_config_resource_type(
rd_kafka_ResourceType_t restype) {
if (restype == RD_KAFKA_RESOURCE_GROUP)
return RD_KAFKA_CONFIG_RESOURCE_GROUP;

return restype;
}

rd_kafka_ResourceType_t
map_from_config_resource_type_to_resource_type(int8_t config_resource_type) {
if (config_resource_type == RD_KAFKA_CONFIG_RESOURCE_GROUP)
return RD_KAFKA_RESOURCE_GROUP;

return config_resource_type;
}


rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype,
const char *resname) {
Expand Down Expand Up @@ -3368,6 +3385,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t res_type;
int8_t config_resource_type;
rd_kafkap_str_t kres_name;
char *res_name;
char *this_errstr = NULL;
Expand All @@ -3377,11 +3395,14 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &config_resource_type);
rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
rd_kafka_buf_skip_tags(reply);

res_type = map_from_config_resource_type_to_resource_type(
config_resource_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down Expand Up @@ -3638,6 +3659,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,
for (i = 0; i < (int)res_cnt; i++) {
int16_t error_code;
rd_kafkap_str_t error_msg;
int8_t config_resource_type;
int8_t res_type;
rd_kafkap_str_t kres_name;
char *res_name;
Expand All @@ -3649,10 +3671,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_i16(reply, &error_code);
rd_kafka_buf_read_str(reply, &error_msg);
rd_kafka_buf_read_i8(reply, &res_type);
rd_kafka_buf_read_i8(reply, &config_resource_type);
rd_kafka_buf_read_str(reply, &kres_name);
RD_KAFKAP_STR_DUPA(&res_name, &kres_name);

res_type = map_from_config_resource_type_to_resource_type(
config_resource_type);

if (error_code) {
if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
RD_KAFKAP_STR_LEN(&error_msg) == 0)
Expand Down
29 changes: 29 additions & 0 deletions src/rdkafka_admin.h
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem that the error message contain 32 instead of 3. We are not exposing 32 to the user. I think we should modify the error code in the response as well though it is difficult to catch all the cases. Can be an improvement though.

ConfigResource result: 3,my-group: error: Unknown resource type 32

Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,35 @@ struct rd_kafka_ConfigResource_result_s {
* but with response error values. */
};

/**
* @brief Resource type specific to config apis.
*/
typedef enum rd_kafka_ConfigResourceType_t {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc.

RD_KAFKA_CONFIG_RESOURCE_GROUP = 32,
} rd_kafka_ConfigResourceType_t;

/**
* @brief Maps rd_kafka_ResourceType_t to int8_t(rd_kafka_ConfigResourceType_t)
* for Config Apis. We are incorrectly using rd_kafka_ResourceType_t in
* both Config Apis and ACL Apis. So, we need this function to map the
* resource type internally to rd_kafka_ConfigResourceType_t. Like the
* enum value for GROUP is 32 in Config Apis, but it is 3 for ACL Apis.
*/
int8_t
map_from_resource_type_to_config_resource_type(rd_kafka_ResourceType_t restype);

/**
* @brief Maps int8_t(rd_kafka_ConfigResourceType_t) to rd_kafka_ResourceType_t
* for Config Apis. We are incorrectly using rd_kafka_ResourceType_t in
* both Config Apis and ACL Apis. So, we need this function to map the
* int8_t(rd_kafka_ConfigResourceType_t) internally to
* rd_kafka_ResourceType_t. Like the enum value for GROUP is 32 in Config
* Apis, but it is 3 for ACL Apis.
*/
rd_kafka_ResourceType_t
map_from_config_resource_type_to_resource_type(int8_t config_resource_type);


/**@}*/


Expand Down
8 changes: 6 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -5371,7 +5371,9 @@ rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest(
int ei;

/* ResourceType */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf, map_from_resource_type_to_config_resource_type(
config->restype));

/* ResourceName */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down Expand Up @@ -5465,7 +5467,9 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
int ei;

/* resource_type */
rd_kafka_buf_write_i8(rkbuf, config->restype);
rd_kafka_buf_write_i8(
rkbuf, map_from_resource_type_to_config_resource_type(
config->restype));

/* resource_name */
rd_kafka_buf_write_str(rkbuf, config->name, -1);
Expand Down
167 changes: 166 additions & 1 deletion tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
*/
static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 3
#define MY_CONFRES_CNT 4
char *topics[MY_CONFRES_CNT];
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
Expand Down Expand Up @@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
/** Test the test helper, for use in other tests. */
do {
const char *broker_id = tsprintf("%d", avail_brokers[0]);
const char *group_id = "my-group";
const char *confs_set_append[] = {
"compression.type", "SET", "lz4",
"cleanup.policy", "APPEND", "compact"};
Expand All @@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
const char *confs_delete_subtract_broker[] = {
"background.threads", "DELETE", "",
"log.cleanup.policy", "SUBTRACT", "compact"};
const char *confs_set_group[] = {"consumer.session.timeout.ms",
"SET", "50000"};
const char *confs_delete_group[] = {
"consumer.session.timeout.ms", "DELETE", ""};

TEST_SAY("Testing test helper with SET and APPEND\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC,
Expand All @@ -969,6 +974,16 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
test_IncrementalAlterConfigs_simple(
rk, RD_KAFKA_RESOURCE_BROKER, broker_id,
confs_delete_subtract_broker, 2);
TEST_SAY(
"Testing test helper with SET with GROUP resource type\n");
test_IncrementalAlterConfigs_simple(
rk, RD_KAFKA_RESOURCE_GROUP, group_id, confs_set_group, 1);
TEST_SAY(
"Testing test helper with DELETE with GROUP resource "
"type\n");
test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP,
group_id,
confs_delete_group, 1);
TEST_SAY("End testing test helper\n");
} while (0);

Expand Down Expand Up @@ -1035,6 +1050,24 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk,
exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
ci++;

/**
* ConfigResource #3: valid group config
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group");

error = rd_kafka_ConfigResource_add_incremental_config(
configs[ci], "consumer.session.timeout.ms",
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000");
TEST_ASSERT(!error, "%s", rd_kafka_error_string(error));
if (test_broker_version >= TEST_BRKVER(3, 8, 0, 0) &&
!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
ci++;

/*
* Timeout options
*/
Expand Down Expand Up @@ -1334,6 +1367,137 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
SUB_TEST_PASS();
}

/**
* @brief Test DescribeConfigs for groups
*/
static void do_test_DescribeConfigs_groups(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
#define MY_CONFRES_CNT 1
rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
rd_kafka_AdminOptions_t *options;
rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
rd_kafka_event_t *rkev;
rd_kafka_resp_err_t err;
const rd_kafka_DescribeConfigs_result_t *res;
const rd_kafka_ConfigResource_t **rconfigs;
char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
size_t rconfig_cnt;
char errstr[128];
const char *errstr2;
int ci = 0;
int i;
int fails = 0;

SUB_TEST_QUICK();

/*
* ConfigResource #0: group config, for a non-existent group.
*/
/*
* ConfigResource #3: group config, for a non-existent group.
*/
configs[ci] =
rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group);
if (test_broker_version >= TEST_BRKVER(3, 8, 0, 0) &&
!test_consumer_group_protocol_classic()) {
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
} else {
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST;
}
ci++;

/*
* Timeout options
*/
options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
sizeof(errstr));
TEST_ASSERT(!err, "%s", errstr);

/*
* Fire off request
*/
rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu);

/*
* Wait for result
*/
rkev = test_wait_admin_result(
rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000);

/*
* Extract result
*/
res = rd_kafka_event_DescribeConfigs_result(rkev);
TEST_ASSERT(res, "Expected DescribeConfigs result, not %s",
rd_kafka_event_name(rkev));

err = rd_kafka_event_error(rkev);
errstr2 = rd_kafka_event_error_string(rkev);
TEST_ASSERT(!err, "Expected success, not %s: %s",
rd_kafka_err2name(err), errstr2);

rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
TEST_ASSERT((int)rconfig_cnt == ci,
"Expected %d result resources, got %" PRIusz "\n", ci,
rconfig_cnt);

/*
* Verify status per resource
*/
for (i = 0; i < (int)rconfig_cnt; i++) {
const rd_kafka_ConfigEntry_t **entries;
size_t entry_cnt;

err = rd_kafka_ConfigResource_error(rconfigs[i]);
errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);

entries =
rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt);

TEST_SAY(
"ConfigResource #%d: type %s (%d), \"%s\": "
"%" PRIusz " ConfigEntries, error %s (%s)\n",
i,
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(rconfigs[i])),
rd_kafka_ConfigResource_type(rconfigs[i]),
rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt,
rd_kafka_err2name(err), errstr2 ? errstr2 : "");

test_print_ConfigEntry_array(entries, entry_cnt, 1);

if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
rd_kafka_ConfigResource_type(configs[i]) ||
strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
rd_kafka_ConfigResource_name(configs[i]))) {
TEST_FAIL_LATER(
"ConfigResource #%d: "
"expected type %s name %s, "
"got type %s name %s",
i,
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(configs[i])),
rd_kafka_ConfigResource_name(configs[i]),
rd_kafka_ResourceType_name(
rd_kafka_ConfigResource_type(rconfigs[i])),
rd_kafka_ConfigResource_name(rconfigs[i]));
fails++;
}
}

TEST_ASSERT(!fails, "See %d previous failure(s)", fails);

rd_kafka_event_destroy(rkev);

rd_kafka_ConfigResource_destroy_array(configs, ci);

TEST_LATER_CHECK();
#undef MY_CONFRES_CNT

SUB_TEST_PASS();
}

/**
* @brief Test CreateAcls
*/
Expand Down Expand Up @@ -5258,6 +5422,7 @@ static void do_test_apis(rd_kafka_type_t cltype) {

/* DescribeConfigs */
do_test_DescribeConfigs(rk, mainq);
do_test_DescribeConfigs_groups(rk, mainq);
Comment on lines 5424 to +5425
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add new condition in do_test_DescribeConfigs?


/* Delete records */
do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);
Expand Down