Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Dec 10, 2024
1 parent cb8c19c commit 773ac3c
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 101 deletions.
26 changes: 26 additions & 0 deletions src/rdbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,32 @@ size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) {
return size;
}

// TODO: Dedupe the code
size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size) {
size_t remains = size;
size_t rlen;
const void *p;
size_t orig_end = slice->end;

if (unlikely(rd_slice_remains(slice) < size))
return 0;

/* Temporarily shrink slice to offset + \p size */
slice->end = rd_slice_abs_offset(slice) + size;

while ((rlen = rd_slice_reader(slice, &p))) {
rd_dassert(remains >= rlen);
rd_buf_write(rbuf, p, rlen);
remains -= rlen;
}

rd_dassert(remains == 0);

/* Restore original size */
slice->end = orig_end;

return size;
}

/**
* @brief Read \p size bytes from absolute slice offset \p offset
Expand Down
2 changes: 2 additions & 0 deletions src/rdbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ size_t rd_slice_reader(rd_slice_t *slice, const void **p);
size_t rd_slice_peeker(const rd_slice_t *slice, const void **p);

size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size);
size_t rd_slice_read_into_buf(rd_slice_t *slice, rd_buf_t *rbuf, size_t size);

size_t
rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size);

Expand Down
59 changes: 45 additions & 14 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,15 @@ void rd_kafka_broker_conn_closed(rd_kafka_broker_t *rkb,
rd_kafka_broker_fail(rkb, log_level, err, "%s", errstr);
}

rd_bool_t buf_contains_toppar(rd_kafka_buf_t *rkbuf, rd_kafka_toppar_t *rktp) {
if (!RD_MAP_IS_EMPTY(&rkbuf->rkbuf_u.Produce.batch_map)) {
if (RD_MAP_GET(&rkbuf->rkbuf_u.Produce.batch_map,
rd_kafka_topic_partition_new_from_rktp(rktp)))
return rd_true;
return rd_false;
} else
return rkbuf->rkbuf_u.Produce.batch.rktp == rktp;
}

/**
* @brief Purge requests in \p rkbq matching request \p ApiKey
Expand All @@ -792,7 +801,7 @@ static int rd_kafka_broker_bufq_purge_by_toppar(rd_kafka_broker_t *rkb,
TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {

if (rkbuf->rkbuf_reqhdr.ApiKey != ApiKey ||
rkbuf->rkbuf_u.Produce.batch.rktp != rktp ||
!buf_contains_toppar(rkbuf, rktp) ||
/* Skip partially sent buffers and let them transmit.
* The alternative would be to kill the connection here,
* which is more drastic and costly. */
Expand Down Expand Up @@ -3912,14 +3921,17 @@ rd_kafka_broker_outbufs_space(rd_kafka_broker_t *rkb) {
* @locks none
* @locality broker thread
*/
static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid,
rd_ts_t now,
rd_ts_t *next_wakeup,
rd_bool_t do_timeout_scan,
rd_bool_t may_send,
rd_bool_t flushing) {
static int
rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
const rd_kafka_pid_t pid,
rd_ts_t now,
rd_ts_t *next_wakeup,
rd_bool_t do_timeout_scan,
rd_bool_t may_send,
rd_bool_t flushing,
rd_bool_t multi_batch_request,
map_topic_partition_buf_t *map_topic_batch) {
int cnt = 0;
int r;
rd_kafka_msg_t *rkm;
Expand Down Expand Up @@ -4168,10 +4180,15 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
/* Send Produce requests for this toppar, honouring the
* queue backpressure threshold. */
for (reqcnt = 0; reqcnt < max_requests; reqcnt++) {
r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid);
if (likely(r > 0))
r = rd_kafka_ProduceRequest(rkb, rktp, pid, epoch_base_msgid,
multi_batch_request,
map_topic_batch);
if (likely(r > 0)) {
cnt += r;
else
if (multi_batch_request)
/* One batch fot a toppar in a single request */
break;
} else
break;
}

Expand All @@ -4193,7 +4210,6 @@ static int rd_kafka_toppar_producer_serve(rd_kafka_broker_t *rkb,
}



/**
* @brief Produce from all toppars assigned to this broker.
*
Expand All @@ -4212,6 +4228,8 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,
rd_kafka_pid_t pid = RD_KAFKA_PID_INITIALIZER;
rd_bool_t may_send = rd_true;
rd_bool_t flushing = rd_false;
rd_bool_t multi_batch_request =
rd_kafka_is_idempotent(rkb->rkb_rk) ? rd_false : rd_true;

/* Round-robin serve each toppar. */
rktp = rkb->rkb_active_toppar_next;
Expand Down Expand Up @@ -4239,20 +4257,33 @@ static int rd_kafka_broker_produce_toppars(rd_kafka_broker_t *rkb,

flushing = may_send && rd_atomic32_get(&rkb->rkb_rk->rk_flushing) > 0;

// TODO: Fix value destructor
map_topic_partition_buf_t map_topic_batch_bufq = RD_MAP_INITIALIZER(
rkb->rkb_toppar_cnt, rd_kafka_topic_partition_cmp,
rd_kafka_topic_partition_hash,
rd_kafka_topic_partition_destroy_free, NULL);
do {
rd_ts_t this_next_wakeup = ret_next_wakeup;

/* Try producing toppar */
cnt += rd_kafka_toppar_producer_serve(
rkb, rktp, pid, now, &this_next_wakeup, do_timeout_scan,
may_send, flushing);
may_send, flushing, multi_batch_request,
&map_topic_batch_bufq);

rd_kafka_set_next_wakeup(&ret_next_wakeup, this_next_wakeup);

} while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp,
rktp_activelink)) !=
rkb->rkb_active_toppar_next);

if (multi_batch_request && !RD_MAP_IS_EMPTY(&map_topic_batch_bufq)) {
rd_kafka_MultiBatchProduceRequest(rkb, pid,
&map_topic_batch_bufq);
}

RD_MAP_DESTROY(&map_topic_batch_bufq);

/* Update next starting toppar to produce in round-robin list. */
rd_kafka_broker_active_toppar_next(
rkb,
Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_buf.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void rd_kafka_buf_destroy_final(rd_kafka_buf_t *rkbuf) {

case RD_KAFKAP_Produce:
rd_kafka_msgbatch_destroy(&rkbuf->rkbuf_batch);
rd_kafka_msgbatch_t *msgbatch;
rd_kafka_topic_partition_t *key;
RD_MAP_FOREACH(key, msgbatch,
&rkbuf->rkbuf_u.Produce.batch_map) {
rd_kafka_msgbatch_destroy(msgbatch);
rd_free(msgbatch);
}
RD_MAP_DESTROY(&rkbuf->rkbuf_u.Produce.batch_map);
break;
}

Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
typedef struct rd_kafka_broker_s rd_kafka_broker_t;

#define RD_KAFKA_HEADERS_IOV_CNT 2
typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
rd_kafka_msgbatch_t *) map_topic_partition_msgbatch_t;
typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
rd_kafka_buf_t *) map_topic_partition_buf_t;


/**
Expand Down Expand Up @@ -399,6 +403,12 @@ struct rd_kafka_buf_s { /* rd_kafka_buf_t */
} Metadata;
struct {
rd_kafka_msgbatch_t batch; /**< MessageSet/batch */

map_topic_partition_msgbatch_t batch_map;
size_t batch_start_pos; /* Pos where Record batch
* starts in the buf */
size_t batch_end_pos; /* Pos after Record batch +
* Partition tags in the buf */
} Produce;
struct {
rd_bool_t commit; /**< true = txn commit,
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ typedef struct rd_kafka_Produce_result {
rd_kafka_Produce_result_record_error_t
*record_errors; /**< Errors for records that caused the batch to be
dropped */
int32_t record_errors_cnt; /**< record_errors count */
int32_t record_errors_cnt; /**< record_errors count */
rd_kafka_resp_err_t errorcode; /**< error code from the response */
} rd_kafka_Produce_result_t;

typedef struct rd_kafka_msg_s {
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
/* MessageSetSize: Will be finalized later*/
msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_arraycnt_pos(rkbuf);

rkbuf->rkbuf_u.Produce.batch_start_pos = msetw->msetw_of_MessageSetSize;

if (msetw->msetw_MsgVersion == 2) {
/* MessageSet v2 header */
rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
Expand Down Expand Up @@ -1399,6 +1401,10 @@ rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,

/* Partition tags */
rd_kafka_buf_write_tags_empty(rkbuf);

rkbuf->rkbuf_u.Produce.batch_end_pos =
rd_buf_write_pos(&rkbuf->rkbuf_buf);

/* Topics tags */
rd_kafka_buf_write_tags_empty(rkbuf);

Expand Down
Loading

0 comments on commit 773ac3c

Please sign in to comment.