Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store offset commit metadata when calling rd_kafka_offsets_store #4084

8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# librdkafka v2.1.0

### Consumer fixes

* Store offset commit metadata in `rd_kafka_offsets_store` (#4084).



# librdkafka v2.0.2

librdkafka v2.0.2 is a bugfix release:
Expand Down
15 changes: 11 additions & 4 deletions src/rdkafka_assignment.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,22 @@ static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) {

/* Save the currently stored offset on .removed
* so it will be committed below. */
rktpar->offset = rktp->rktp_stored_offset;
rktpar->offset = rktp->rktp_stored_offset;
rktpar->metadata_size = rktp->rktp_stored_metadata_size;
if (rktp->rktp_stored_metadata) {
rktpar->metadata =
rd_malloc(rktp->rktp_stored_metadata_size);
memcpy(rktpar->metadata, rktp->rktp_stored_metadata,
rktp->rktp_stored_metadata_size);
}
valid_offsets += !RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset);

/* Reset the stored offset to invalid so that
* a manual offset-less commit() or the auto-committer
* will not commit a stored offset from a previous
* assignment (issue #2782). */
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID, rd_true,
RD_DONT_LOCK);
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID, NULL, 0,
rd_true, RD_DONT_LOCK);

/* Partition is no longer desired */
rd_kafka_toppar_desired_del(rktp);
Expand Down Expand Up @@ -733,7 +740,7 @@ rd_kafka_assignment_add(rd_kafka_t *rk,

/* Reset the stored offset to INVALID to avoid the race
* condition described in rdkafka_offset.h */
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID,
rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID, NULL, 0,
rd_true /* force */, RD_DONT_LOCK);

rd_kafka_toppar_unlock(rktp);
Expand Down
20 changes: 14 additions & 6 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,13 @@ rd_kafka_offset_broker_commit(rd_kafka_toppar_t *rktp, const char *reason) {
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktpar->offset = rktp->rktp_committing_offset;
rktpar->offset = rktp->rktp_committing_offset;
rktpar->metadata_size = rktp->rktp_stored_metadata_size;
if (rktp->rktp_stored_metadata) {
rktpar->metadata = rd_malloc(rktp->rktp_stored_metadata_size);
memcpy(rktpar->metadata, rktp->rktp_stored_metadata,
rktpar->metadata_size);
}

rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
"%.*s [%" PRId32 "]: committing offset %" PRId64 ": %s",
Expand Down Expand Up @@ -646,7 +652,7 @@ rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *app_rkt,
}
rd_kafka_topic_rdunlock(rkt);

err = rd_kafka_offset_store0(rktp, offset + 1,
err = rd_kafka_offset_store0(rktp, offset + 1, NULL, 0,
rd_false /* Don't force */, RD_DO_LOCK);

rd_kafka_toppar_destroy(rktp);
Expand Down Expand Up @@ -677,9 +683,10 @@ rd_kafka_offsets_store(rd_kafka_t *rk,
continue;
}

rktpar->err = rd_kafka_offset_store0(rktp, rktpar->offset,
rd_false /* don't force */,
RD_DO_LOCK);
rktpar->err = rd_kafka_offset_store0(
rktp, rktpar->offset, rktpar->metadata,
rktpar->metadata_size, rd_false /* don't force */,
RD_DO_LOCK);
rd_kafka_toppar_destroy(rktp);

if (rktpar->err)
Expand Down Expand Up @@ -1066,7 +1073,8 @@ rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp) {
rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
rktp->rktp_offsets_fin.eof_offset > 0)
rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
rd_true /* force */, RD_DONT_LOCK);
NULL, 0, rd_true /* force */,
RD_DONT_LOCK);

/* Commit offset to backing store.
* This might be an async operation. */
Expand Down
20 changes: 17 additions & 3 deletions src/rdkafka_offset.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const char *rd_kafka_offset2str(int64_t offset);
* the stored offset to .._INVALID to provide a clean state.
*
* @param offset Offset to set, may be an absolute offset or .._INVALID.
* @param metadata Metadata to be set, may be NULL
* @param metadata_size Size of the metadata to be set
* @param force Forcibly set \p offset regardless of assignment state.
* @param do_lock Whether to lock the \p rktp or not (already locked by caller).
*
Expand All @@ -83,6 +85,8 @@ const char *rd_kafka_offset2str(int64_t offset);
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
int64_t offset,
void *metadata,
size_t metadata_size,
rd_bool_t force,
rd_dolock_t do_lock) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Expand All @@ -94,9 +98,19 @@ rd_kafka_offset_store0(rd_kafka_toppar_t *rktp,
!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED) &&
!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk)))
err = RD_KAFKA_RESP_ERR__STATE;
else
rktp->rktp_stored_offset = offset;

else {
if (rktp->rktp_stored_metadata) {
rd_free(rktp->rktp_stored_metadata);
rktp->rktp_stored_metadata = NULL;
}
rktp->rktp_stored_offset = offset;
rktp->rktp_stored_metadata_size = metadata_size;
if (metadata) {
rktp->rktp_stored_metadata = rd_malloc(metadata_size);
memcpy(rktp->rktp_stored_metadata, metadata,
rktp->rktp_stored_metadata_size);
}
}
if (do_lock)
rd_kafka_toppar_unlock(rktp);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {
rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, offset,
rd_kafka_offset_store0(rktp, offset, NULL, 0,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);
rd_kafka_toppar_unlock(rktp);
Expand Down
10 changes: 10 additions & 0 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ void rd_kafka_toppar_destroy_final(rd_kafka_toppar_t *rktp) {

rd_refcnt_destroy(&rktp->rktp_refcnt);

rd_free(rktp->rktp_stored_metadata);
rd_free(rktp);
}

Expand Down Expand Up @@ -3109,6 +3110,15 @@ int rd_kafka_topic_partition_list_set_offsets(
rktp->rktp_committed_offset) {
verb = "setting stored";
rktpar->offset = rktp->rktp_stored_offset;
rktpar->metadata_size =
rktp->rktp_stored_metadata_size;
if (rktp->rktp_stored_metadata) {
rktpar->metadata = rd_malloc(
rktp->rktp_stored_metadata_size);
memcpy(rktpar->metadata,
rktp->rktp_stored_metadata,
rktpar->metadata_size);
}
} else {
rktpar->offset = RD_KAFKA_OFFSET_INVALID;
}
Expand Down
29 changes: 16 additions & 13 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,22 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
* absolute timestamp
* expires. */

int64_t rktp_query_offset; /* Offset to query broker for*/
int64_t rktp_next_offset; /* Next offset to start
* fetching from.
* Locality: toppar thread */
int64_t rktp_last_next_offset; /* Last next_offset handled
* by fetch_decide().
* Locality: broker thread */
int64_t rktp_app_offset; /* Last offset delivered to
* application + 1.
* Is reset to INVALID_OFFSET
* when partition is
* unassigned/stopped/seeked. */
int64_t rktp_stored_offset; /* Last stored offset, but
int64_t rktp_query_offset; /* Offset to query broker for*/
int64_t rktp_next_offset; /* Next offset to start
* fetching from.
* Locality: toppar thread */
int64_t rktp_last_next_offset; /* Last next_offset handled
* by fetch_decide().
* Locality: broker thread */
int64_t rktp_app_offset; /* Last offset delivered to
* application + 1.
* Is reset to INVALID_OFFSET
* when partition is
* unassigned/stopped/seeked. */
int64_t rktp_stored_offset; /* Last stored offset, but
* maybe not committed yet. */
void *rktp_stored_metadata;
size_t rktp_stored_metadata_size; /* Last stored metadata, but
* maybe not committed yet. */
int64_t rktp_committing_offset; /* Offset currently being
* committed */
Expand Down
59 changes: 54 additions & 5 deletions tests/0130-store_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@


/**
* Verify that offsets_store() is not allowed for unassigned partitions,
* and that those offsets are not committed.
* Verify that offsets_store() commits the right offsets and metadata,
* and is not allowed for unassigned partitions.
*/
static void do_test_store_unassigned(void) {
const char *topic = test_mk_topic_name("0130_store_unassigned", 1);
Expand All @@ -40,6 +40,7 @@ static void do_test_store_unassigned(void) {
rd_kafka_topic_partition_list_t *parts;
rd_kafka_resp_err_t err;
rd_kafka_message_t *rkmessage;
char metadata[] = "metadata";
const int64_t proper_offset = 900, bad_offset = 300;

SUB_TEST_QUICK();
Expand All @@ -60,8 +61,13 @@ static void do_test_store_unassigned(void) {
TEST_SAY("Consume one message\n");
test_consumer_poll_once(c, NULL, tmout_multip(3000));

parts->elems[0].offset = proper_offset;
TEST_SAY("Storing offset %" PRId64 " while assigned: should succeed\n",
parts->elems[0].offset = proper_offset;
parts->elems[0].metadata_size = sizeof metadata;
parts->elems[0].metadata = malloc(parts->elems[0].metadata_size);
memcpy(parts->elems[0].metadata, metadata,
parts->elems[0].metadata_size);
TEST_SAY("Storing offset %" PRId64
" with metadata while assigned: should succeed\n",
parts->elems[0].offset);
TEST_CALL_ERR__(rd_kafka_offsets_store(c, parts));

Expand All @@ -71,7 +77,10 @@ static void do_test_store_unassigned(void) {
TEST_SAY("Unassigning partitions and trying to store again\n");
TEST_CALL_ERR__(rd_kafka_assign(c, NULL));

parts->elems[0].offset = bad_offset;
parts->elems[0].offset = bad_offset;
parts->elems[0].metadata_size = 0;
rd_free(parts->elems[0].metadata);
parts->elems[0].metadata = NULL;
TEST_SAY("Storing offset %" PRId64 " while unassigned: should fail\n",
parts->elems[0].offset);
err = rd_kafka_offsets_store(c, parts);
Expand Down Expand Up @@ -108,9 +117,49 @@ static void do_test_store_unassigned(void) {
"offset %" PRId64 ", not %" PRId64,
proper_offset, rkmessage->offset);

TEST_SAY(
"Retrieving committed offsets to verify committed offset "
"metadata\n");
rd_kafka_topic_partition_list_t *committed_toppar;
committed_toppar = rd_kafka_topic_partition_list_new(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be destroyed at the end of the test, rd_kafka_topic_partition_list_destroy(committed_toppar);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed this in 5d37ab1.

rd_kafka_topic_partition_list_add(committed_toppar, topic, 0);
TEST_CALL_ERR__(
rd_kafka_committed(c, committed_toppar, tmout_multip(3000)));
TEST_ASSERT(committed_toppar->elems[0].offset == proper_offset,
"Expected committed offset to be %" PRId64 ", not %" PRId64,
proper_offset, committed_toppar->elems[0].offset);
TEST_ASSERT(committed_toppar->elems[0].metadata != NULL,
"Expected metadata to not be NULL");
TEST_ASSERT(strcmp(committed_toppar->elems[0].metadata, metadata) == 0,
"Expected metadata to be %s, not %s", metadata,
(char *)committed_toppar->elems[0].metadata);

TEST_SAY("Storing next offset without metadata\n");
parts->elems[0].offset = proper_offset + 1;
TEST_CALL_ERR__(rd_kafka_offsets_store(c, parts));

TEST_SAY("Committing\n");
TEST_CALL_ERR__(rd_kafka_commit(c, NULL, rd_false /*sync*/));

TEST_SAY(
"Retrieving committed offset to verify empty committed offset "
"metadata\n");
rd_kafka_topic_partition_list_t *committed_toppar_empty;
committed_toppar_empty = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(committed_toppar_empty, topic, 0);
TEST_CALL_ERR__(
rd_kafka_committed(c, committed_toppar_empty, tmout_multip(3000)));
TEST_ASSERT(committed_toppar_empty->elems[0].offset == proper_offset + 1,
"Expected committed offset to be %" PRId64 ", not %" PRId64,
proper_offset, committed_toppar_empty->elems[0].offset);
TEST_ASSERT(committed_toppar_empty->elems[0].metadata == NULL,
"Expected metadata to be NULL");

rd_kafka_message_destroy(rkmessage);

rd_kafka_topic_partition_list_destroy(parts);
rd_kafka_topic_partition_list_destroy(committed_toppar);
rd_kafka_topic_partition_list_destroy(committed_toppar_empty);

rd_kafka_consumer_close(c);
rd_kafka_destroy(c);
Expand Down