Skip to content

Commit

Permalink
Merge pull request #131 from confluentinc/errors_129
Browse files Browse the repository at this point in the history
Proper error string handling in Producer, issue #129
  • Loading branch information
edenhill authored Feb 27, 2017
2 parents 366b322 + 0613e58 commit ece3df4
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 9 deletions.
4 changes: 3 additions & 1 deletion confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
if (!rkm)
Py_RETURN_NONE;

msgobj = Message_new0(rkm);
msgobj = Message_new0(self, rkm);
rd_kafka_message_destroy(rkm);

return msgobj;
Expand Down Expand Up @@ -770,6 +770,8 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
return -1;
}

self->type = RD_KAFKA_CONSUMER;

if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
args, kwargs)))
return -1; /* Exception raised by ..conf_setup() */
Expand Down
4 changes: 3 additions & 1 deletion confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
goto done;
}

msgobj = Message_new0(rkm);
msgobj = Message_new0(self, rkm);

args = Py_BuildValue("(OO)",
Message_error((Message *)msgobj, NULL),
Expand Down Expand Up @@ -527,6 +527,8 @@ static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
return -1;
}

self->type = RD_KAFKA_PRODUCER;

if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
args, kwargs)))
return -1;
Expand Down
17 changes: 11 additions & 6 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) {
if (!err)
Py_RETURN_NONE;
return KafkaError_new0(err, "%s", str);
if (str)
return KafkaError_new0(err, "%s", str);
else
return KafkaError_new0(err, NULL);
}


Expand Down Expand Up @@ -529,17 +532,19 @@ PyTypeObject MessageType = {
/**
* @brief Internal factory to create Message object from message_t
*/
PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
Message *self;

self = (Message *)MessageType.tp_alloc(&MessageType, 0);
if (!self)
return NULL;

self->error = KafkaError_new_or_None(rkm->err,
rkm->err ?
rd_kafka_message_errstr(rkm) :
NULL);
/* Only use message error string on Consumer, for Producers
* it will contain the original message payload. */
self->error = KafkaError_new_or_None(
rkm->err,
(rkm->err && handle->type != RD_KAFKA_PRODUCER) ?
rd_kafka_message_errstr(rkm) : NULL);

if (rkm->rkt)
self->topic = cfl_PyUnistr(
Expand Down
3 changes: 2 additions & 1 deletion confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ typedef struct {
PyObject *stats_cb;
int initiated;
int tlskey; /* Thread-Local-Storage key */
rd_kafka_type_t type; /* Producer or consumer */

union {
/**
Expand Down Expand Up @@ -255,7 +256,7 @@ typedef struct {

extern PyTypeObject MessageType;

PyObject *Message_new0 (const rd_kafka_message_t *rkm);
PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm);
PyObject *Message_error (Message *self, PyObject *ignore);


Expand Down
24 changes: 24 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,27 @@ def produce_hi (self):
sp = SubProducer({'log.thread.name': True}, 'mytopic')
sp.produce('someother', value='not hello')
sp.produce_hi()


def test_dr_msg_errstr():
"""
Test that the error string for failed messages works (issue #129).
The underlying problem is that librdkafka reuses the message payload
for error value on Consumer messages, but on Producer messages the
payload is the original payload and no rich error string exists.
"""
p = Producer({"default.topic.config":{"message.timeout.ms":10}})

def handle_dr (err, msg):
# Neither message payloads must not affect the error string.
assert err is not None
assert err.code() == KafkaError._MSG_TIMED_OUT
assert "Message timed out" in err.str()

# Unicode safe string
p.produce('mytopic', "This is the message payload", on_delivery=handle_dr)

# Invalid unicode sequence
p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr)

p.flush()

0 comments on commit ece3df4

Please sign in to comment.