Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using broker-errors for client-side problems #1639

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import socket
from kafka.client_async import KafkaClient, selectors
from kafka.errors import (
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
Expand All @@ -25,9 +26,11 @@ class KafkaAdmin(object):
nicer, more pythonic objects. Unfortunately, this will likely break
those interfaces.

The KafkaAdmin class will negotiate for the latest version of each message protocol format supported
by both the kafka-python client library and the kafka broker. Usage of optional fields from protocol
versions that are not supported by the broker will result in UnsupportedVersionError exceptions.
The KafkaAdmin class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.


Use of this class requires a minimum broker version >= 0.10.0.0.

Expand Down Expand Up @@ -223,8 +226,8 @@ def _matching_api_version(self, operation):
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
# max library version is less than min broker version. Not sure any brokers
# actually set a min version greater than 0 right now, tho. But maybe in the future?
raise UnsupportedVersionError(
"Could not find matching protocol version for {}"
raise IncompatibleBrokerVersion(
"No version of the '{}' kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version

Expand All @@ -246,9 +249,9 @@ def _refresh_controller_id(self):
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise UnsupportedVersionError(
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
.format(version))
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
Expand Down Expand Up @@ -311,9 +314,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
raise UnsupportedVersionError(
"validate_only not supported on cluster version {}"
.format(self.config['api_version']))
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms
Expand All @@ -326,10 +329,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreateTopics for library supported version {}"
.format(version)
)
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def delete_topics(self, topics, timeout_ms=None):
Expand All @@ -347,9 +349,9 @@ def delete_topics(self, topics, timeout_ms=None):
timeout = timeout_ms
)
else:
raise UnsupportedVersionError(
"missing implementation of DeleteTopics for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# list topics functionality is in ClusterMetadata
Expand Down Expand Up @@ -386,9 +388,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise UnsupportedVersionError(
"include_synonyms not supported on cluster version {}"
.format(self.config['api_version']))
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
Expand All @@ -399,9 +401,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
include_synonyms = include_synonyms
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

@staticmethod
Expand All @@ -426,9 +428,9 @@ def alter_configs(self, config_resources):
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
raise UnsupportedVersionError(
"missing implementation of AlterConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# alter replica logs dir protocol not implemented
Expand Down Expand Up @@ -463,9 +465,9 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreatePartitions for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete records protocol not implemented
Expand All @@ -490,9 +492,9 @@ def describe_consumer_groups(self, group_ids):
groups = group_ids
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def list_consumer_groups(self):
Expand All @@ -504,9 +506,9 @@ def list_consumer_groups(self):
if version <= 1:
request = ListGroupsRequest[version]()
else:
raise UnsupportedVersionError(
"missing implementation of ListGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete groups protocol not implemented
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def get_api_versions(self):
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
return self._api_versions;
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
Expand Down
4 changes: 4 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError):
pass


class IncompatibleBrokerVersion(KafkaError):
pass


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
super(CommitFailedError, self).__init__(
Expand Down