diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 27ed4da99..ed41d3098 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -390,6 +390,12 @@ def __init__(self, *topics, **configs): self._subscription.subscribe(topics=topics) self._client.set_topics(topics) + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e6bd3b9a6..95e797a9c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -23,7 +23,6 @@ from kafka.serializer import Serializer from kafka.structs import TopicPartition - log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() @@ -376,13 +375,13 @@ def __init__(self, **configs): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], - **self.config) + self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: - self.config['api_version'] = client.config['api_version'] + self.config['api_version'] = self._client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' @@ -398,9 +397,9 @@ def __init__(self, **configs): message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) - self._metadata = client.cluster + self._metadata = self._client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, + self._sender = Sender(self._client, self._metadata, self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) @@ -412,14 +411,22 @@ def __init__(self, **configs): atexit.register(self._cleanup) log.debug("Kafka producer started") + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self) + def wrapper(): try: _self.close(timeout=0) except (ReferenceError, AttributeError): pass + return wrapper def _unregister_cleanup(self):