-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP 848:Extend DescribeConfigs and IncrementalAlterConfigs to support GROUP Config #4939
base: master
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First high level pass
src/rdkafka_admin.c
Outdated
@@ -2911,6 +2921,47 @@ const char *rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype) { | |||
} | |||
|
|||
|
|||
rd_kafka_InternalConfigResourceType_t | |||
map_to_internal_config_resourcetype(rd_kafka_ResourceType_t resourcetype) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name both consistently. map_from_x_to_y
src/rdkafka_admin.c
Outdated
} | ||
} | ||
|
||
rd_kafka_ResourceType_t map_from_internal_config_resourcetype( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name both consistently. map_from_x_to_y
src/rdkafka_admin.c
Outdated
default: | ||
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't make anything default. Add an assert instead that the value is not supported.
src/rdkafka_admin.c
Outdated
default: | ||
return RD_KAFKA_RESOURCE_UNKNOWN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
src/rdkafka_admin.c
Outdated
if (api_version >= 3) { | ||
rd_kafka_buf_read_i8(reply, &config_type); | ||
rd_kafka_buf_read_str(reply, &documentation); | ||
entry->type = config_type; | ||
entry->documentation = | ||
RD_KAFKAP_STR_DUP(&documentation); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change necessary for KIP-848 changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this change was for upgrading the API version. Have removed it.
src/rdkafka_admin.c
Outdated
@@ -3805,6 +3880,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, | |||
config); | |||
config = NULL; | |||
} | |||
rd_kafka_buf_skip_tags(reply); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
src/rdkafka_request.c
Outdated
/* include_synonyms */ | ||
rd_kafka_buf_write_i8(rkbuf, 1); | ||
} | ||
|
||
if (ApiVersion >= 3) { | ||
/* include_documentation */ | ||
rd_kafka_buf_write_i8(rkbuf, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it since wasn't neccesarry as we are not upgrading the api version
src/rdkafka_request.c
Outdated
@@ -5445,7 +5447,7 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( | |||
} | |||
|
|||
ApiVersion = rd_kafka_broker_ApiVersion_supported( | |||
rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL); | |||
rkb, RD_KAFKAP_DescribeConfigs, 0, 3, NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest not to increase the version if it is not necessary. We would need to improve APIs as well to support that and there would be bunch of other things as well which we should be taking care of.
src/rdkafka_request.c
Outdated
/* include_documentation */ | ||
rd_kafka_buf_write_i8(rkbuf, 1); | ||
} | ||
rd_kafka_buf_write_tags_empty(rkbuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to write tags if we are not supporting flexver.
src/rdkafka_request.c
Outdated
@@ -5484,14 +5489,21 @@ rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( | |||
/* config_name */ | |||
rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); | |||
} | |||
rd_kafka_buf_write_tags_empty(rkbuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do the changes for the comments which are valid now.
src/rdkafka_admin.c
Outdated
if (resourcetype > RD_KAFKA_RESOURCE__CNT) { | ||
rd_assert("Invalid resource type"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this default case in the below switch.
src/rdkafka_admin.c
Outdated
rd_kafka_ResourceType_t resourcetype) { | ||
|
||
if (resourcetype > RD_KAFKA_RESOURCE__CNT) { | ||
rd_assert("Invalid resource type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide information about the resource type value in the assert as well.
src/rdkafka_admin.c
Outdated
|
||
if (internal_resourcetype != RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP && | ||
internal_resourcetype > RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT) { | ||
rd_assert("Recieved invalid resource type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide information about the resource type value in the assert as well.
src/rdkafka_admin.c
Outdated
case RD_KAFKA_RESOURCE__CNT: | ||
return RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want cnt conversion. This is incorrect in my opinion.
src/rdkafka_admin.c
Outdated
case RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT: | ||
return RD_KAFKA_RESOURCE__CNT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Count conversion is not needed.
src/rdkafka_admin.h
Outdated
@@ -283,6 +283,33 @@ struct rd_kafka_ConfigResource_result_s { | |||
* but with response error values. */ | |||
}; | |||
|
|||
typedef enum rd_kafka_InternalConfigResourceType_t { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use Internal with ConfigResource. Use Java name which is rd_kafka_ConfigResource_t
src/rdkafka_admin.h
Outdated
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_UNKNOWN = 0, | ||
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_ANY = 1, | ||
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_TOPIC = 2, | ||
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_GROUP = | ||
32, // Changed value for config APIs | ||
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_BROKER = 4, | ||
RD_KAFKA_INTERNAL_RESOURCE_CONFIG_CNT, | ||
} rd_kafka_InternalConfigResourceType_t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this conversion only for GROUP right now. We can think about others later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests next.
src/rdkafka_admin.c
Outdated
rd_kafka_ConfigResourceType_t map_from_resource_type_to_config_resource_type( | ||
rd_kafka_ResourceType_t resourcetype) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rd_kafka_ConfigResourceType_t map_from_resource_type_to_config_resource_type( | |
rd_kafka_ResourceType_t resourcetype) { | |
int8_t map_from_resource_type_to_config_resource_type( | |
rd_kafka_ResourceType_t restype) { |
src/rdkafka_admin.c
Outdated
rd_kafka_ResourceType_t map_from_config_resource_type_to_resource_type( | ||
rd_kafka_ConfigResourceType_t internal_resourcetype) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rd_kafka_ResourceType_t map_from_config_resource_type_to_resource_type( | |
rd_kafka_ConfigResourceType_t internal_resourcetype) { | |
rd_kafka_ResourceType_t map_from_config_resource_type_to_resource_type( | |
int8_t config_resource_type) { |
src/rdkafka_admin.c
Outdated
if (resourcetype == RD_KAFKA_RESOURCE_GROUP) { | ||
return RD_KAFKA_CONFIG_RESOURCE_GROUP; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (resourcetype == RD_KAFKA_RESOURCE_GROUP) { | |
return RD_KAFKA_CONFIG_RESOURCE_GROUP; | |
} | |
if (resourcetype == RD_KAFKA_RESOURCE_GROUP) | |
return RD_KAFKA_CONFIG_RESOURCE_GROUP; |
src/rdkafka_admin.c
Outdated
if (internal_resourcetype == RD_KAFKA_CONFIG_RESOURCE_GROUP) { | ||
return RD_KAFKA_RESOURCE_GROUP; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (internal_resourcetype == RD_KAFKA_CONFIG_RESOURCE_GROUP) { | |
return RD_KAFKA_RESOURCE_GROUP; | |
} | |
if (internal_resourcetype == RD_KAFKA_CONFIG_RESOURCE_GROUP) | |
return RD_KAFKA_RESOURCE_GROUP; |
src/rdkafka_admin.c
Outdated
@@ -3377,11 +3397,14 @@ rd_kafka_IncrementalAlterConfigsResponse_parse(rd_kafka_op_t *rko_req, | |||
|
|||
rd_kafka_buf_read_i16(reply, &error_code); | |||
rd_kafka_buf_read_str(reply, &error_msg); | |||
rd_kafka_buf_read_i8(reply, &res_type); | |||
rd_kafka_buf_read_i8(reply, &internal_res_type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config_resource_type
src/rdkafka_admin.c
Outdated
@@ -3638,6 +3661,7 @@ rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t *rko_req, | |||
for (i = 0; i < (int)res_cnt; i++) { | |||
int16_t error_code; | |||
rd_kafkap_str_t error_msg; | |||
int8_t internal_res_type; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config_resource_type
src/rdkafka_admin.h
Outdated
typedef enum rd_kafka_ConfigResourceType_t { | ||
RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, // Changed value for config APIs | ||
} rd_kafka_ConfigResourceType_t; | ||
|
||
/** | ||
* @brief Map from rd_kafka_ResourceType_t to | ||
* rd_kafka_ConfigResourceType_t | ||
*/ | ||
rd_kafka_ConfigResourceType_t map_from_resource_type_to_config_resource_type( | ||
rd_kafka_ResourceType_t resourcetype); | ||
|
||
/** | ||
* @brief Map from rd_kafka_ConfigResourceType_t to | ||
* rd_kafka_ResourceType_t | ||
*/ | ||
|
||
rd_kafka_ResourceType_t map_from_config_resource_type_to_resource_type( | ||
rd_kafka_ConfigResourceType_t internal_resourcetype); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same name, variable and type changes here as well.
src/rdkafka_admin.h
Outdated
@@ -283,6 +283,26 @@ struct rd_kafka_ConfigResource_result_s { | |||
* but with response error values. */ | |||
}; | |||
|
|||
typedef enum rd_kafka_ConfigResourceType_t { | |||
RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, // Changed value for config APIs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, // Changed value for config APIs | |
RD_KAFKA_CONFIG_RESOURCE_GROUP = 32, |
src/rdkafka_admin.h
Outdated
/** | ||
* @brief Map from int8_t to rd_kafka_ResourceType_t | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra line.
@@ -283,6 +283,24 @@ struct rd_kafka_ConfigResource_result_s { | |||
* but with response error values. */ | |||
}; | |||
|
|||
typedef enum rd_kafka_ConfigResourceType_t { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doc.
tests/0081-admin.c
Outdated
if (!test_consumer_group_protocol_classic()) { | ||
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; | ||
} else { | ||
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check broker version as well here.
tests/0081-admin.c
Outdated
if (!test_consumer_group_protocol_classic()) { | ||
exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; | ||
} else { | ||
exp_err[ci] = RD_KAFKA_RESP_ERR_INVALID_REQUEST; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
do_test_DescribeConfigs(rk, mainq); | ||
do_test_DescribeConfigs_groups(rk, mainq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add new condition in do_test_DescribeConfigs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a problem that the error message contain 32 instead of 3. We are not exposing 32 to the user. I think we should modify the error code in the response as well though it is difficult to catch all the cases. Can be an improvement though.
ConfigResource result: 3,my-group: error: Unknown resource type 32
src/rdkafka_admin.h
Outdated
} rd_kafka_ConfigResourceType_t; | ||
|
||
/** | ||
* @brief Map from rd_kafka_ResourceType_t to int8_t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better doc explaining what and why are we doing this.
src/rdkafka_admin.h
Outdated
map_from_resource_type_to_config_resource_type(rd_kafka_ResourceType_t restype); | ||
|
||
/** | ||
* @brief Map from int8_t to rd_kafka_ResourceType_t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better doc explaining what and why are we doing this.
tests/0081-admin.c
Outdated
@@ -947,6 +948,10 @@ static void do_test_IncrementalAlterConfigs(rd_kafka_t *rk, | |||
const char *confs_delete_subtract_broker[] = { | |||
"background.threads", "DELETE", "", | |||
"log.cleanup.policy", "SUBTRACT", "compact"}; | |||
const char *confs_set_append_group[] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const char *confs_set_append_group[] = { | |
const char *confs_set_group[] = { |
tests/0081-admin.c
Outdated
rd_kafka_ConfigResource_type(rconfigs[i])), | ||
rd_kafka_ConfigResource_name(rconfigs[i])); | ||
fails++; | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continue;
not needed.
This PR intends to add support for Group Resource type for IncrementalALterConfigs API as specified in KIP 848 and extended the DescribeConfigs support till API version 3.
Additional changed the integration test to check for Group Resource Type.