Skip to content

Commit

Permalink
Proper use of message error on Producer (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 19, 2017
1 parent d415bb9 commit b535590
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 8 deletions.
4 changes: 3 additions & 1 deletion confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
if (!rkm)
Py_RETURN_NONE;

msgobj = Message_new0(rkm);
msgobj = Message_new0(self, rkm);
rd_kafka_message_destroy(rkm);

return msgobj;
Expand Down Expand Up @@ -770,6 +770,8 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
return -1;
}

self->type = RD_KAFKA_CONSUMER;

if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
args, kwargs)))
return -1; /* Exception raised by ..conf_setup() */
Expand Down
4 changes: 3 additions & 1 deletion confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
goto done;
}

msgobj = Message_new0(rkm);
msgobj = Message_new0(self, rkm);

args = Py_BuildValue("(OO)",
Message_error((Message *)msgobj, NULL),
Expand Down Expand Up @@ -527,6 +527,8 @@ static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
return -1;
}

self->type = RD_KAFKA_PRODUCER;

if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
args, kwargs)))
return -1;
Expand Down
12 changes: 7 additions & 5 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,19 @@ PyTypeObject MessageType = {
/**
* @brief Internal factory to create Message object from message_t
*/
PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
Message *self;

self = (Message *)MessageType.tp_alloc(&MessageType, 0);
if (!self)
return NULL;

self->error = KafkaError_new_or_None(rkm->err,
rkm->err ?
rd_kafka_message_errstr(rkm) :
NULL);
/* Only use message error string on Consumer, for Producers
* it will contain the original message payload. */
self->error = KafkaError_new_or_None(
rkm->err,
(rkm->err && handle->type != RD_KAFKA_PRODUCER) ?
rd_kafka_message_errstr(rkm) : NULL);

if (rkm->rkt)
self->topic = cfl_PyUnistr(
Expand Down
3 changes: 2 additions & 1 deletion confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ typedef struct {
PyObject *stats_cb;
int initiated;
int tlskey; /* Thread-Local-Storage key */
rd_kafka_type_t type; /* Producer or consumer */

union {
/**
Expand Down Expand Up @@ -255,7 +256,7 @@ typedef struct {

extern PyTypeObject MessageType;

PyObject *Message_new0 (const rd_kafka_message_t *rkm);
PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm);
PyObject *Message_error (Message *self, PyObject *ignore);


Expand Down
24 changes: 24 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,27 @@ def produce_hi (self):
sp = SubProducer({'log.thread.name': True}, 'mytopic')
sp.produce('someother', value='not hello')
sp.produce_hi()


def test_dr_msg_errstr():
"""
Test that the error string for failed messages works (issue #129).
The underlying problem is that librdkafka reuses the message payload
for error value on Consumer messages, but on Producer messages the
payload is the original payload and no rich error string exists.
"""
p = Producer({"default.topic.config":{"message.timeout.ms":10}})

def handle_dr (err, msg):
# Neither message payloads must not affect the error string.
assert err is not None
assert err.code() == KafkaError._MSG_TIMED_OUT
assert "Message timed out" in err.str()

# Unicode safe string
p.produce('mytopic', "This is the message payload", on_delivery=handle_dr)

# Invalid unicode sequence
p.produce('mytopic', "\xc2\xc2", on_delivery=handle_dr)

p.flush()

0 comments on commit b535590

Please sign in to comment.