diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4fd8a1b33..a7114b545 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -271,6 +271,47 @@ def _refresh_controller_id(self): "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) + def _find_group_coordinator_id_send_request(self, group_id): + """Send a FindCoordinatorRequest to a broker. + + :param group_id: The consumer group ID. This is typically the group + name as a string. + :return: A message future + """ + # TODO add support for dynamically picking version of + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. + # When I experimented with this, the coordinator value returned in + # GroupCoordinatorResponse_v1 didn't match the value returned by + # GroupCoordinatorResponse_v0 and I couldn't figure out why. + version = 0 # version = self._matching_api_version(GroupCoordinatorRequest) + if version <= 0: + request = GroupCoordinatorRequest[version](group_id) + else: + raise NotImplementedError( + "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(self._client.least_loaded_node(), request) + + def _find_group_coordinator_id_process_response(self, response): + """Process a FindCoordinatorResponse. + + :param response: a FindCoordinatorResponse. + :return: The node_id of the broker that is the coordinator. + """ + if response.API_VERSION <= 0: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # Note: When error_type.retriable, Java will retry... see + # KafkaAdminClient's handleFindCoordinatorError method + raise error_type( + "FindCoordinatorRequest failed with response '{}'." + .format(response)) + else: + raise NotImplementedError( + "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return response.coordinator_id + def _find_group_coordinator_id(self, group_id): """Find the broker node_id of the coordinator of the given group. @@ -283,35 +324,10 @@ def _find_group_coordinator_id(self, group_id): :return: The node_id of the broker that is the coordinator. """ # Note: Java may change how this is implemented in KAFKA-6791. - # - # TODO add support for dynamically picking version of - # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. - # When I experimented with this, GroupCoordinatorResponse_v1 didn't - # match GroupCoordinatorResponse_v0 and I couldn't figure out why. - gc_request = GroupCoordinatorRequest[0](group_id) - future = self._send_request_to_node(self._client.least_loaded_node(), gc_request) - + future = self._find_group_coordinator_id_send_request(group_id) self._wait_for_futures([future]) - - gc_response = future.value - # use the extra error checking in add_group_coordinator() rather than - # immediately returning the group coordinator. - success = self._client.cluster.add_group_coordinator(group_id, gc_response) - if not success: - error_type = Errors.for_code(gc_response.error_code) - assert error_type is not Errors.NoError - # Note: When error_type.retriable, Java will retry... see - # KafkaAdminClient's handleFindCoordinatorError method - raise error_type( - "Could not identify group coordinator for group_id '{}' from response '{}'." - .format(group_id, gc_response)) - group_coordinator = self._client.cluster.coordinator_for_group(group_id) - # will be None if the coordinator was never populated, which should never happen here - assert group_coordinator is not None - # will be -1 if add_group_coordinator() failed... but by this point the - # error should have been raised. - assert group_coordinator != -1 - return group_coordinator + response = future.value + return self._find_group_coordinator_id_process_response(response) def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -329,7 +345,6 @@ def _send_request_to_node(self, node_id, request): self._client.poll() return self._client.send(node_id, request) - def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller.