diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index fbbbcc2a2..01db6a98c 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -5,7 +5,8 @@ import socket from kafka.client_async import KafkaClient, selectors from kafka.errors import ( - KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError) + IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, + NodeNotReadyError, NotControllerError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, @@ -25,9 +26,11 @@ class KafkaAdmin(object): nicer, more pythonic objects. Unfortunately, this will likely break those interfaces. - The KafkaAdmin class will negotiate for the latest version of each message protocol format supported - by both the kafka-python client library and the kafka broker. Usage of optional fields from protocol - versions that are not supported by the broker will result in UnsupportedVersionError exceptions. + The KafkaAdmin class will negotiate for the latest version of each message + protocol format supported by both the kafka-python client library and the + kafka broker. Usage of optional fields from protocol versions that are not + supported by the broker will result in IncompatibleBrokerVersion exceptions. + Use of this class requires a minimum broker version >= 0.10.0.0. @@ -223,8 +226,8 @@ def _matching_api_version(self, operation): if version < self._client.get_api_versions()[operation[0].API_KEY][0]: # max library version is less than min broker version. Not sure any brokers # actually set a min version greater than 0 right now, tho. But maybe in the future? - raise UnsupportedVersionError( - "Could not find matching protocol version for {}" + raise IncompatibleBrokerVersion( + "No version of the '{}' kafka protocol is supported by both the client and broker." .format(operation.__name__)) return version @@ -246,9 +249,9 @@ def _refresh_controller_id(self): self._controller_id = response.controller_id version = self._client.check_version(self._controller_id) if version < (0, 10, 0): - raise UnsupportedVersionError( - "Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0" - .format(version)) + raise IncompatibleBrokerVersion( + "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + .format(version)) def _send_request_to_node(self, node, request): """Send a kafka protocol message to a specific broker. Will block until the message result is received. @@ -311,9 +314,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None): timeout_ms = self._validate_timeout(timeout_ms) if version == 0: if validate_only: - raise UnsupportedVersionError( - "validate_only not supported on cluster version {}" - .format(self.config['api_version'])) + raise IncompatibleBrokerVersion( + "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." + .format(self.config['api_version'])) request = CreateTopicsRequest[version]( create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout = timeout_ms @@ -326,10 +329,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None): validate_only = validate_only ) else: - raise UnsupportedVersionError( - "missing implementation of CreateTopics for library supported version {}" - .format(version) - ) + raise NotImplementedError( + "Support for CreateTopics v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) def delete_topics(self, topics, timeout_ms=None): @@ -347,9 +349,9 @@ def delete_topics(self, topics, timeout_ms=None): timeout = timeout_ms ) else: - raise UnsupportedVersionError( - "missing implementation of DeleteTopics for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for DeleteTopics v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) # list topics functionality is in ClusterMetadata @@ -386,9 +388,9 @@ def describe_configs(self, config_resources, include_synonyms=None): version = self._matching_api_version(DescribeConfigsRequest) if version == 0: if include_synonyms: - raise UnsupportedVersionError( - "include_synonyms not supported on cluster version {}" - .format(self.config['api_version'])) + raise IncompatibleBrokerVersion( + "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." + .format(self.config['api_version'])) request = DescribeConfigsRequest[version]( resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources] ) @@ -399,9 +401,9 @@ def describe_configs(self, config_resources, include_synonyms=None): include_synonyms = include_synonyms ) else: - raise UnsupportedVersionError( - "missing implementation of DescribeConfigs for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) @staticmethod @@ -426,9 +428,9 @@ def alter_configs(self, config_resources): resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] ) else: - raise UnsupportedVersionError( - "missing implementation of AlterConfigs for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for AlterConfigs v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) # alter replica logs dir protocol not implemented @@ -463,9 +465,9 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non validate_only = validate_only ) else: - raise UnsupportedVersionError( - "missing implementation of CreatePartitions for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for CreatePartitions v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) # delete records protocol not implemented @@ -490,9 +492,9 @@ def describe_consumer_groups(self, group_ids): groups = group_ids ) else: - raise UnsupportedVersionError( - "missing implementation of DescribeGroups for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) def list_consumer_groups(self): @@ -504,9 +506,9 @@ def list_consumer_groups(self): if version <= 1: request = ListGroupsRequest[version]() else: - raise UnsupportedVersionError( - "missing implementation of ListGroups for library supported version {}" - .format(version)) + raise NotImplementedError( + "Support for ListGroups v{} has not yet been added to KafkaAdmin." + .format(version)) return self._send(request) # delete groups protocol not implemented diff --git a/kafka/conn.py b/kafka/conn.py index 5ec97575f..471bae7ed 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -881,7 +881,7 @@ def get_api_versions(self): .format(version)) # _api_versions is set as a side effect of check_versions() on a cluster # that supports 0.10.0 or later - return self._api_versions; + return self._api_versions def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions diff --git a/kafka/errors.py b/kafka/errors.py index fb9576c3f..118e4302b 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError): pass +class IncompatibleBrokerVersion(KafkaError): + pass + + class CommitFailedError(KafkaError): def __init__(self, *args, **kwargs): super(CommitFailedError, self).__init__(