diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d0e8491f..b3785a8b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ librdkafka v2.6.3 is a maintenance release: +* Extend Config Apis to support group config (#4939). * Socket options are now all set before connection (#4893). * Client certificate chain is now sent when using `ssl.certificate.pem` or `ssl_certificate` or `ssl.keystore.location` (#4894). diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 41934e349..87ad02df6 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -2911,6 +2911,23 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) { } +int8_t map_from_resource_type_to_config_resource_type( + rd_kafka_ResourceType_t restype) { + if (restype == RD_KAFKA_RESOURCE_GROUP) + return RD_KAFKA_CONFIG_RESOURCE_GROUP; + + return restype; +} + +rd_kafka_ResourceType_t +map_from_config_resource_type_to_resource_type(int8_t config_resource_type) { + if (config_resource_type == RD_KAFKA_CONFIG_RESOURCE_GROUP) + return RD_KAFKA_RESOURCE_GROUP; + + return config_resource_type; +} + + rd_kafka_ConfigResource_t * rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype, const char *resname) { @@ -3368,6 +3385,7 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, int16_t error_code; rd_kafkap_str_t error_msg; int8_t res_type; + int8_t config_resource_type; rd_kafkap_str_t kres_name; char *res_name; char *this_errstr = NULL; @@ -3377,11 +3395,14 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); - rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_i8(reply, &config_resource_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); rd_kafka_buf_skip_tags(reply); + res_type = map_from_config_resource_type_to_resource_type( + config_resource_type); + if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) @@ -3638,6 +3659,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, for (i = 0; i < (int)res_cnt; i++) { int16_t error_code; rd_kafkap_str_t error_msg; + int8_t config_resource_type; int8_t res_type; rd_kafkap_str_t kres_name; char *res_name; @@ -3649,10 +3671,13 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_buf_read_i16(reply, &error_code); rd_kafka_buf_read_str(reply, &error_msg); - rd_kafka_buf_read_i8(reply, &res_type); + rd_kafka_buf_read_i8(reply, &config_resource_type); rd_kafka_buf_read_str(reply, &kres_name); RD_KAFKAP_STR_DUPA(&res_name, &kres_name); + res_type = map_from_config_resource_type_to_resource_type( + config_resource_type); + if (error_code) { if (RD_KAFKAP_STR_IS_NULL(&error_msg) || RD_KAFKAP_STR_LEN(&error_msg) == 0) diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index ad58fe5cc..c8a02458a 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -283,6 +283,35 @@ struct rd_kafka_ConfigResource_result_s { * but with response error values. */ }; +/** + * @brief Resource type specific to config apis. + */ +typedef enum rd_kafka_ConfigResourceType_t { + RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, +} rd_kafka_ConfigResourceType_t; + +/** + * @brief Maps rd_kafka_ResourceType_t to int8_t(rd_kafka_ConfigResourceType_t) + * for Config Apis. We are incorrectly using rd_kafka_ResourceType_t in + * both Config Apis and ACL Apis. So, we need this function to map the + * resource type internally to rd_kafka_ConfigResourceType_t. Like the + * enum value for GROUP is 32 in Config Apis, but it is 3 for ACL Apis. + */ +int8_t +map_from_resource_type_to_config_resource_type(rd_kafka_ResourceType_t restype); + +/** + * @brief Maps int8_t(rd_kafka_ConfigResourceType_t) to rd_kafka_ResourceType_t + * for Config Apis. We are incorrectly using rd_kafka_ResourceType_t in + * both Config Apis and ACL Apis. So, we need this function to map the + * int8_t(rd_kafka_ConfigResourceType_t) internally to + * rd_kafka_ResourceType_t. Like the enum value for GROUP is 32 in Config + * Apis, but it is 3 for ACL Apis. + */ +rd_kafka_ResourceType_t +map_from_config_resource_type_to_resource_type(int8_t config_resource_type); + + /**@}*/ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index ac04343fa..c23f728dd 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -5371,7 +5371,9 @@ rd_kafka_resp_err_t rd_kafka_IncrementalAlterConfigsRequest( int ei; /* ResourceType */ - rd_kafka_buf_write_i8(rkbuf, config->restype); + rd_kafka_buf_write_i8( + rkbuf, map_from_resource_type_to_config_resource_type( + config->restype)); /* ResourceName */ rd_kafka_buf_write_str(rkbuf, config->name, -1); @@ -5465,7 +5467,9 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( int ei; /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, config->restype); + rd_kafka_buf_write_i8( + rkbuf, map_from_resource_type_to_config_resource_type( + config->restype)); /* resource_name */ rd_kafka_buf_write_str(rkbuf, config->name, -1); diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 9144c400c..b541e3fa8 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -902,7 +902,7 @@ static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { */ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { -#define MY_CONFRES_CNT 3 +#define MY_CONFRES_CNT 4 char *topics[MY_CONFRES_CNT]; rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; rd_kafka_AdminOptions_t *options; @@ -935,6 +935,7 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, /** Test the test helper, for use in other tests. */ do { const char *broker_id = tsprintf("%d", avail_brokers[0]); + const char *group_id = "my-group"; const char *confs_set_append[] = { "compression.type", "SET", "lz4", "cleanup.policy", "APPEND", "compact"}; @@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, const char *confs_delete_subtract_broker[] = { "background.threads", "DELETE", "", "log.cleanup.policy", "SUBTRACT", "compact"}; + const char *confs_set_group[] = {"consumer.session.timeout.ms", + "SET", "50000"}; + const char *confs_delete_group[] = { + "consumer.session.timeout.ms", "DELETE", ""}; TEST_SAY("Testing test helper with SET and APPEND\n"); test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_TOPIC, @@ -969,6 +974,16 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, test_IncrementalAlterConfigs_simple( rk, RD_KAFKA_RESOURCE_BROKER, broker_id, confs_delete_subtract_broker, 2); + TEST_SAY( + "Testing test helper with SET with GROUP resource type\n"); + test_IncrementalAlterConfigs_simple( + rk, RD_KAFKA_RESOURCE_GROUP, group_id, confs_set_group, 1); + TEST_SAY( + "Testing test helper with DELETE with GROUP resource " + "type\n"); + test_IncrementalAlterConfigs_simple(rk, RD_KAFKA_RESOURCE_GROUP, + group_id, + confs_delete_group, 1); TEST_SAY("End testing test helper\n"); } while (0); @@ -1035,6 +1050,24 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; ci++; + /** + * ConfigResource #3: valid group config + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, "my-group"); + + error = rd_kafka_ConfigResource_add_incremental_config( + configs[ci], "consumer.session.timeout.ms", + RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET, "50000"); + TEST_ASSERT(!error, "%s", rd_kafka_error_string(error)); + if (test_broker_version >= TEST_BRKVER(3, 8, 0, 0) && + !test_consumer_group_protocol_classic()) { + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + } else { + exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + } + ci++; + /* * Timeout options */ @@ -1334,6 +1367,137 @@ static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { SUB_TEST_PASS(); } +/** + * @brief Test DescribeConfigs for groups + */ +static void do_test_DescribeConfigs_groups(rd_kafka_t *rk, + rd_kafka_queue_t *rkqu) { +#define MY_CONFRES_CNT 1 + rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; + rd_kafka_AdminOptions_t *options; + rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_DescribeConfigs_result_t *res; + const rd_kafka_ConfigResource_t **rconfigs; + char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + size_t rconfig_cnt; + char errstr[128]; + const char *errstr2; + int ci = 0; + int i; + int fails = 0; + + SUB_TEST_QUICK(); + + /* + * ConfigResource #0: group config, for a non-existent group. + */ + /* + * ConfigResource #3: group config, for a non-existent group. + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_GROUP, group); + if (test_broker_version >= TEST_BRKVER(3, 8, 0, 0) && + !test_consumer_group_protocol_classic()) { + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + } else { + exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + } + ci++; + + /* + * Timeout options + */ + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); + err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, + sizeof(errstr)); + TEST_ASSERT(!err, "%s", errstr); + + /* + * Fire off request + */ + rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu); + + /* + * Wait for result + */ + rkev = test_wait_admin_result( + rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000); + + /* + * Extract result + */ + res = rd_kafka_event_DescribeConfigs_result(rkev); + TEST_ASSERT(res, "Expected DescribeConfigs result, not %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, "Expected success, not %s: %s", + rd_kafka_err2name(err), errstr2); + + rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt); + TEST_ASSERT((int)rconfig_cnt == ci, + "Expected %d result resources, got %" PRIusz "\n", ci, + rconfig_cnt); + + /* + * Verify status per resource + */ + for (i = 0; i < (int)rconfig_cnt; i++) { + const rd_kafka_ConfigEntry_t **entries; + size_t entry_cnt; + + err = rd_kafka_ConfigResource_error(rconfigs[i]); + errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); + + entries = + rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); + + TEST_SAY( + "ConfigResource #%d: type %s (%d), \"%s\": " + "%" PRIusz " ConfigEntries, error %s (%s)\n", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_type(rconfigs[i]), + rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + + test_print_ConfigEntry_array(entries, entry_cnt, 1); + + if (rd_kafka_ConfigResource_type(rconfigs[i]) != + rd_kafka_ConfigResource_type(configs[i]) || + strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), + rd_kafka_ConfigResource_name(configs[i]))) { + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected type %s name %s, " + "got type %s name %s", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(configs[i])), + rd_kafka_ConfigResource_name(configs[i]), + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_name(rconfigs[i])); + fails++; + } + } + + TEST_ASSERT(!fails, "See %d previous failure(s)", fails); + + rd_kafka_event_destroy(rkev); + + rd_kafka_ConfigResource_destroy_array(configs, ci); + + TEST_LATER_CHECK(); +#undef MY_CONFRES_CNT + + SUB_TEST_PASS(); +} + /** * @brief Test CreateAcls */ @@ -5258,6 +5422,7 @@ static void do_test_apis(rd_kafka_type_t cltype) { /* DescribeConfigs */ do_test_DescribeConfigs(rk, mainq); + do_test_DescribeConfigs_groups(rk, mainq); /* Delete records */ do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);