diff --git a/kafka/client.py b/kafka/client.py index 4cd9e24f5..9eb8a0dcc 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -150,8 +150,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): List of response objects in the same order as the supplied payloads """ + log.debug("Sending Payloads: %s" % payloads) + # Group the requests by topic+partition - original_keys = [] + brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) for payload in payloads: @@ -159,67 +161,89 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): payload.partition) payloads_by_broker[leader].append(payload) - original_keys.append((payload.topic, payload.partition)) - - # Accumulate the responses in a dictionary - acc = {} - - # keep a list of payloads that were failed to be sent to brokers - failed_payloads = [] + brokers_for_payloads.append(leader) # For each broker, send the list of request payloads + # and collect the responses and errors + responses_by_broker = collections.defaultdict(list) + broker_failures = [] for broker, payloads in payloads_by_broker.items(): conn = self._get_conn(broker.host.decode('utf-8'), broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) - failed = False # Send the request, recv the response try: conn.send(requestId, request) + except ConnectionError as e: + broker_failures.append(broker) + log.warning("Could not send request [%s] to server %s: %s", + binascii.b2a_hex(request), conn, e) + + for payload in payloads: + responses_by_broker[broker].append(FailedPayloadsError(payload)) + + # No exception, try to get response + else: + # decoder_fn=None signal that the server is expected to not # send a response. This probably only applies to # ProduceRequest w/ acks = 0 if decoder_fn is None: + for payload in payloads: + responses_by_broker[broker].append(None) continue try: response = conn.recv(requestId) except ConnectionError as e: + broker_failures.append(broker) log.warning("Could not receive response to request [%s] " - "from server %s: %s", binascii.b2a_hex(request), conn, e) - failed = True - except ConnectionError as e: - log.warning("Could not send request [%s] to server %s: %s", - binascii.b2a_hex(request), conn, e) - failed = True + "from server %s: %s", + binascii.b2a_hex(request), conn, e) - if failed: - failed_payloads += payloads - self.reset_all_metadata() - continue + for payload in payloads: + responses_by_broker[broker].append(FailedPayloadsError(payload)) - for response in decoder_fn(response): - acc[(response.topic, response.partition)] = response + else: + + for payload_response in decoder_fn(response): + responses_by_broker[broker].append(payload_response) - if failed_payloads: - raise FailedPayloadsError(failed_payloads) + # Connection errors generally mean stale metadata + # although sometimes it means incorrect api request + # Unfortunately there is no good way to tell the difference + # so we'll just reset metadata on all errors to be safe + if broker_failures: + self.reset_all_metadata() - # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) if acc else () + # Return responses in the same order as provided + responses_by_payload = [responses_by_broker[broker].pop(0) + for broker in brokers_for_payloads] + log.debug('Responses: %s' % responses_by_payload) + return responses_by_payload def __repr__(self): return '' % (self.client_id) def _raise_on_response_error(self, resp): + + # Response can be an unraised exception object (FailedPayloadsError) + if isinstance(resp, Exception): + raise resp + + # Or a server api error response try: kafka.common.check_error(resp) except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): self.reset_topic_metadata(resp.topic) raise + # Return False if no error to enable list comprehensions + return False + ################# # Public API # ################# @@ -396,14 +420,25 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, same order as the list of payloads specified Arguments: - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function + payloads (list of ProduceRequest): produce requests to send to kafka + acks (int, optional): how many acks the servers should receive from replica + brokers before responding to the request. If it is 0, the server + will not send any response. If it is 1, the server will wait + until the data is written to the local log before sending a + response. If it is -1, the server will wait until the message + is committed by all in-sync replicas before sending a response. + For any value > 1, the server will wait for this number of acks to + occur (but the server will never wait for more acknowledgements than + there are in-sync replicas). defaults to 1. + timeout (int, optional): maximum time in milliseconds the server can + await the receipt of the number of acks, defaults to 1000. + fail_on_error (bool, optional): raise exceptions on connection and + server response errors, defaults to True. + callback (function, optional): instead of returning the ProduceResponse, + first pass it through this function, defaults to None. Returns: - list of ProduceResponse or callback(ProduceResponse), in the + list of ProduceResponses, or callback results if supplied, in the order of input payloads """ @@ -419,16 +454,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if resp is not None and + (not fail_on_error or not self._raise_on_response_error(resp))] def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None, max_wait_time=100, min_bytes=4096): @@ -447,16 +475,8 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, payloads, encoder, KafkaProtocol.decode_fetch_response) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): @@ -465,15 +485,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True, KafkaProtocol.encode_offset_request, KafkaProtocol.decode_offset_response) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): @@ -482,16 +495,8 @@ def send_offset_commit_request(self, group, payloads=[], decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): @@ -501,12 +506,5 @@ def send_offset_fetch_request(self, group, payloads=[], decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)]