From a1420389f2b3e44f42de165d569667ea4dfbf4fe Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Sat, 17 Nov 2018 04:11:52 -0800 Subject: [PATCH] Various docstring / pep8 / code hygiene cleanups --- kafka/admin/kafka.py | 157 ++++++++++++++++++++++++------------------- 1 file changed, 86 insertions(+), 71 deletions(-) diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index ca5ad565f..8e0a7565c 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -21,10 +21,12 @@ from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.version import __version__ + log = logging.getLogger(__name__) + class KafkaAdmin(object): - """An class for administering the kafka cluster. + """A class for administering the Kafka cluster. Warning: This is an unstable interface that was recently added and is subject to @@ -35,10 +37,9 @@ class KafkaAdmin(object): The KafkaAdmin class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the - kafka broker. Usage of optional fields from protocol versions that are not + Kafka broker. Usage of optional fields from protocol versions that are not supported by the broker will result in IncompatibleBrokerVersion exceptions. - Use of this class requires a minimum broker version >= 0.10.0.0. Keyword Arguments: @@ -167,16 +168,16 @@ class KafkaAdmin(object): 'sasl_kerberos_service_name': 'kafka', # metrics configs - 'metric_reporters' : [], + 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, } def __init__(self, **configs): - log.debug("Starting Kafka administration interface") + log.debug("Starting KafkaAdmin interface.") extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) + raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) @@ -189,8 +190,9 @@ def __init__(self, **configs): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='admin', - **self.config) + self._client = KafkaClient(metrics=self._metrics, + metric_group_prefix='admin', + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: @@ -198,46 +200,49 @@ def __init__(self, **configs): self._closed = False self._refresh_controller_id() - log.debug('Kafka administration interface started') + log.debug("KafkaAdmin interface started.") def close(self): - """Close the administration connection to the kafka broker""" + """Close the KafkaAdmin connection to the Kafka broker.""" if not hasattr(self, '_closed') or self._closed: - log.info('Kafka administration interface already closed') + log.info("KafkaAdmin interface already closed.") return self._metrics.close() self._client.close() self._closed = True - log.debug('Kafka administration interface has closed') + log.debug("KafkaAdmin interface has closed.") def _matching_api_version(self, operation): - """Find matching api version, the lesser of either the latest api version the library supports, or - the max version supported by the broker + """Find the latest version of the protocol operation supported by both + this library and the broker. + + This resolves to the lesser of either the latest api version this + library supports, or the max version supported by the broker. - :param operation: An operation array from kafka.protocol - :return: The max matching version number between client and broker + :param operation: A list of protocol operation versions from kafka.protocol. + :return: The max matching version number between client and broker. """ version = min(len(operation) - 1, self._client.get_api_versions()[operation[0].API_KEY][1]) if version < self._client.get_api_versions()[operation[0].API_KEY][0]: - # max library version is less than min broker version. Not sure any brokers - # actually set a min version greater than 0 right now, tho. But maybe in the future? + # max library version is less than min broker version. Currently, + # no Kafka versions specify a min msg version. Maybe in the future? raise IncompatibleBrokerVersion( - "No version of the '{}' kafka protocol is supported by both the client and broker." + "No version of the '{}' Kafka protocol is supported by both the client and broker." .format(operation.__name__)) return version def _validate_timeout(self, timeout_ms): - """Validate the timeout is set or use the configuration default + """Validate the timeout is set or use the configuration default. - :param timeout_ms: The timeout provided by api call, in milliseconds - :return: The timeout to use for the operation + :param timeout_ms: The timeout provided by api call, in milliseconds. + :return: The timeout to use for the operation. """ return timeout_ms or self.config['request_timeout_ms'] def _refresh_controller_id(self): - """Determine the kafka cluster controller.""" + """Determine the Kafka cluster controller.""" version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() @@ -293,31 +298,34 @@ def _find_group_coordinator_id(self, group_id): assert group_coordinator != -1 return group_coordinator - def _send_request_to_node(self, node, request): - """Send a kafka protocol message to a specific broker. Will block until the message result is received. + def _send_request_to_node(self, node_id, request): + """Send a Kafka protocol message to a specific broker. - :param node: The broker id to which to send the message - :param request: The message to send - :return: The kafka protocol response for the message - :exception: The exception if the message could not be sent + Will block until the message result is received. + + :param node_id: The broker id to which to send the message. + :param request: The message to send. + :return: The Kafka protocol response for the message. + :exception: The exception if the message could not be sent. """ - while not self._client.ready(node): - # connection to broker not ready, poll until it is or send will fail with NodeNotReadyError + while not self._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError self._client.poll() - future = self._client.send(node, request) + future = self._client.send(node_id, request) self._client.poll(future=future) if future.succeeded(): return future.value else: - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type def _send_request_to_controller(self, request): - """Send a kafka protocol message to the cluster controller. + """Send a Kafka protocol message to the cluster controller. Will block until the message result is received. - :param request: The message to send - :return: The kafka protocol response for the message + :param request: The message to send. + :return: The Kafka protocol response for the message. """ tries = 2 # in case our cached self._controller_id is outdated while tries: @@ -357,11 +365,12 @@ def _convert_new_topic_request(new_topic): def create_topics(self, new_topics, timeout_ms=None, validate_only=False): """Create new topics in the cluster. - :param new_topics: Array of NewTopic objects - :param timeout_ms: Milliseconds to wait for new topics to be created before broker returns + :param new_topics: A list of NewTopic objects. + :param timeout_ms: Milliseconds to wait for new topics to be created + before the broker returns. :param validate_only: If True, don't actually create new topics. Not supported by all versions. Default: False - :return: Appropriate version of CreateTopicResponse class + :return: Appropriate version of CreateTopicResponse class. """ version = self._matching_api_version(CreateTopicsRequest) timeout_ms = self._validate_timeout(timeout_ms) @@ -371,40 +380,44 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." .format(self.config['api_version'])) request = CreateTopicsRequest[version]( - create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout = timeout_ms + create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout=timeout_ms ) elif version <= 2: request = CreateTopicsRequest[version]( - create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout = timeout_ms, - validate_only = validate_only + create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout=timeout_ms, + validate_only=validate_only ) else: raise NotImplementedError( "Support for CreateTopics v{} has not yet been added to KafkaAdmin." .format(version)) + # TODO convert structs to a more pythonic interface + # TODO raise exceptions if errors return self._send_request_to_controller(request) def delete_topics(self, topics, timeout_ms=None): - """Delete topics from the cluster + """Delete topics from the cluster. - :param topics: Array of topic name strings - :param timeout_ms: Milliseconds to wait for topics to be deleted before broker returns - :return: Appropriate version of DeleteTopicsResponse class + :param topics: A list of topic name strings. + :param timeout_ms: Milliseconds to wait for topics to be deleted + before the broker returns. + :return: Appropriate version of DeleteTopicsResponse class. """ version = self._matching_api_version(DeleteTopicsRequest) timeout_ms = self._validate_timeout(timeout_ms) if version <= 1: request = DeleteTopicsRequest[version]( - topics = topics, - timeout = timeout_ms + topics=topics, + timeout=timeout_ms ) + response = self._send_request_to_controller(request) else: raise NotImplementedError( "Support for DeleteTopics v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send_request_to_controller(request) + return response # list topics functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() @@ -435,14 +448,15 @@ def _convert_describe_config_resource_request(config_resource): ) def describe_configs(self, config_resources, include_synonyms=False): - """Fetch configuration parameters for one or more kafka resources. + """Fetch configuration parameters for one or more Kafka resources. - :param config_resources: An array of ConfigResource objects. - Any keys in ConfigResource.configs dict will be used to filter the result. The configs dict should be None - to get all values. An empty dict will get zero values (as per kafka protocol). - :param include_synonyms: If True, return synonyms in response. Not + :param config_resources: An list of ConfigResource objects. + Any keys in ConfigResource.configs dict will be used to filter the + result. Setting the configs dict to None will get all values. An + empty dict will get zero values (as per Kafka protocol). + :param include_synonyms: If True, return synonyms in response. Not supported by all versions. Default: False. - :return: Appropriate version of DescribeConfigsResponse class + :return: Appropriate version of DescribeConfigsResponse class. """ version = self._matching_api_version(DescribeConfigsRequest) if version == 0: @@ -451,12 +465,12 @@ def describe_configs(self, config_resources, include_synonyms=False): "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." .format(self.config['api_version'])) request = DescribeConfigsRequest[version]( - resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources] + resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources] ) - elif version <= 1: + elif version == 1: request = DescribeConfigsRequest[version]( - resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources], - include_synonyms = include_synonyms + resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources], + include_synonyms=include_synonyms ) else: raise NotImplementedError( @@ -475,7 +489,7 @@ def _convert_alter_config_resource_request(config_resource): ) def alter_configs(self, config_resources): - """Alter configuration parameters of one or more kafka resources. + """Alter configuration parameters of one or more Kafka resources. Warning: This is currently broken for BROKER resources because those must be @@ -483,13 +497,13 @@ def alter_configs(self, config_resources): least-loaded node. See the comment in the source code for details. We would happily accept a PR fixing this. - :param config_resources: An array of ConfigResource objects. - :return: Appropriate version of AlterConfigsResponse class + :param config_resources: A list of ConfigResource objects. + :return: Appropriate version of AlterConfigsResponse class. """ version = self._matching_api_version(AlterConfigsRequest) if version == 0: request = AlterConfigsRequest[version]( - resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] + resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] ) else: raise NotImplementedError( @@ -522,19 +536,20 @@ def _convert_create_partitions_request(topic_name, new_partitions): def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False): """Create additional partitions for an existing topic. - :param topic_partitions: A map of topic name strings to NewPartition objects - :param timeout_ms: Milliseconds to wait for new partitions to be created before broker returns + :param topic_partitions: A map of topic name strings to NewPartition objects. + :param timeout_ms: Milliseconds to wait for new partitions to be + created before the broker returns. :param validate_only: If True, don't actually create new partitions. Default: False - :return: Appropriate version of CreatePartitionsResponse class + :return: Appropriate version of CreatePartitionsResponse class. """ version = self._matching_api_version(CreatePartitionsRequest) timeout_ms = self._validate_timeout(timeout_ms) if version == 0: request = CreatePartitionsRequest[version]( - topic_partitions = [self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], - timeout = timeout_ms, - validate_only = validate_only + topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], + timeout=timeout_ms, + validate_only=validate_only ) else: raise NotImplementedError(