diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index cdfa40e09..5f8e7a9c4 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -562,23 +562,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def describe_consumer_groups(self, group_ids): + def describe_consumer_groups(self, group_ids, group_coordinator_id=None): """Describe a set of consumer groups. - :param group_ids: A list of consumer group id names - :return: Appropriate version of DescribeGroupsResponse class + Any errors are immediately raised. + + :param group_ids: A list of consumer group IDs. These are typically the + group names as strings. + :param group_coordinator_id: The node_id of the groups' coordinator + broker. If set to None, it will query the cluster for each group to + find that group's coordinator. Explicitly specifying this can be + useful for avoiding extra network round trips if you already know + the group coordinator. This is only useful when all the group_ids + have the same coordinator, otherwise it will error. Default: None. + :return: A list of group descriptions. For now the group descriptions + are the raw results from the DescribeGroupsResponse. Long-term, we + plan to change this to return namedtuples as well as decoding the + partition assignments. """ + group_descriptions = [] version = self._matching_api_version(DescribeGroupsRequest) - if version <= 1: - request = DescribeGroupsRequest[version]( - groups = group_ids - ) - else: - raise NotImplementedError( - "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." - .format(version)) - # TODO this is completely broken, as it needs to send to the group coordinator - # return self._send(request) + for group_id in group_ids: + if group_coordinator_id is None: + this_groups_coordinator_id = self._find_group_coordinator_id(group_id) + if version <= 1: + # Note: KAFKA-6788 A potential optimization is to group the + # request per coordinator and send one request with a list of + # all consumer groups. Java still hasn't implemented this + # because the error checking is hard to get right when some + # groups error and others don't. + request = DescribeGroupsRequest[version](groups=(group_id,)) + response = self._send_request_to_node(this_groups_coordinator_id, request) + assert len(response.groups) == 1 + # TODO need to implement converting the response tuple into + # a more accessible interface like a namedtuple and then stop + # hardcoding tuple indices here. Several Java examples, + # including KafkaAdminClient.java + group_description = response.groups[0] + error_code = group_description[0] + error_type = Errors.for_code(error_code) + # Java has the note: KAFKA-6789, we can retry based on the error code + if error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + # TODO Java checks the group protocol type, and if consumer + # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes + # the members' partition assignments... that hasn't yet been + # implemented here so just return the raw struct results + group_descriptions.append(group_description) + else: + raise NotImplementedError( + "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." + .format(version)) + return group_descriptions def list_consumer_groups(self): """List all consumer groups known to the cluster.