Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer dies when producing to a just created topic #4904

Open
5 of 7 tasks
marcin-krystianc opened this issue Nov 18, 2024 · 1 comment · May be fixed by #4905
Open
5 of 7 tasks

Producer dies when producing to a just created topic #4904

marcin-krystianc opened this issue Nov 18, 2024 · 1 comment · May be fixed by #4905

Comments

@marcin-krystianc
Copy link

Description

It can occasionally happen, that if we create a topic and immediately after we start producing to that topic the producer can fail with an "unknown topic or partition" message. That problem occurs regardless of topic.metadata.propagation.max.ms.

How to reproduce

  1. When I'm running tests from the tests directory, a few of them fail with unknown topic or partition error, e.g.:
...
| 0042_many_topics                         |     FAILED | 903.862s | test_dr_msg_cb():1996: Message delivery (to rdkafkatest_rnd5c8f8ba61d9c48a0_0042_many_topics [0]) failed: expected Success, got Broker: Unknown topic or partition
...
  1. Using our test application
  • start a test cluster
  • run
dotnet run -c Release --project /workspace/KafkaPlayground/dotnet/KafkaTool.csproj \
producer \
--config allow.auto.create.topics=false \
--config bootstrap.servers=$BROKERS \
--config debug=all \
--topics=3000 \
--partitions=1 \
--replication-factor=3 \
--min-isr=2 \
--topic-stem=my-topic \
--messages-per-second=50000 \
\
--config request.timeout.ms=195000 \
--config message.timeout.ms=195000 \
--config request.required.acks=-1 \
--config max.in.flight.requests.per.connection=1 \
--config topic.metadata.refresh.interval.ms=10000 \
--config topic.metadata.propagation.max.ms=75000 \
\
--reporting-cycle=1000 \
--recreate-topics-delay=10 \
--recreate-topics-batch-size=500 &> log.txt

logs:

08:50:28 info: Producer0:[0] kafka-log Facility:TOPIC, Message[thrd:app]: New local topic: my-topic-1067
08:50:28 info: Producer0:[0] kafka-log Facility:TOPPARNEW, Message[thrd:app]: NEW my-topic-1067 [-1] 0x7ef2fc3c2c10 refcnt 0x7ef2fc3c2ca0 (at rd_kafka_topic_new0:488)
08:50:28 info: Producer0:[0] kafka-log Facility:CONF, Message[thrd:app]: Topic "my-topic-1067" configuration (default_topic_conf):
08:50:29 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 metadata information unknown
08:50:29 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 partition count is zero: should refresh metadata
08:50:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:63321/2:   Topic my-topic-1067 with 0 partitions: Broker: Unknown topic or partition
08:50:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Error in metadata reply for topic my-topic-1067 (PartCnt 0): Broker: Unknown topic or partition
08:50:29 info: Producer0:[0] kafka-log Facility:TOPICPROP, Message[thrd:main]: Topic my-topic-1067 does not exist, allowing 73812ms for metadata propagation before marking topic as non-existent
08:50:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:57867/4:   Topic my-topic-1067 with 0 partitions: Broker: Unknown topic or partition
08:50:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Error in metadata reply for topic my-topic-1067 (PartCnt 0): Broker: Unknown topic or partition
08:50:29 info: Producer0:[0] kafka-log Facility:TOPICPROP, Message[thrd:main]: Topic my-topic-1067 does not exist, allowing 73403ms for metadata propagation before marking topic as non-existent
08:50:30 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 metadata information unknown
08:50:30 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 partition count is zero: should refresh metadata
08:50:31 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 metadata information unknown
08:50:31 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 partition count is zero: should refresh metadata
08:50:32 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 metadata information unknown
08:50:32 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1067 partition count is zero: should refresh metadata
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:57867/4:   Topic my-topic-1067 with 1 partitions
08:50:33 info: Producer0:[0] kafka-log Facility:STATE, Message[thrd:main]: Topic my-topic-1067 changed state unknown -> exists
08:50:33 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Topic my-topic-1067 partition count changed from 0 to 1
08:50:33 info: Producer0:[0] kafka-log Facility:TOPPARNEW, Message[thrd:main]: NEW my-topic-1067 [0] 0x7ef3081e4080 refcnt 0x7ef3081e4110 (at rd_kafka_topic_partition_cnt_update:937)
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 changed id from AAAAAAAAAAAAAAAAAAAAAA to e3VNQmjsTYi3Mkg+SV5V7g
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 [0] Leader 2 Epoch 0
08:50:33 info: Producer0:[0] kafka-log Facility:BROKER, Message[thrd:main]: my-topic-1067 [0]: leader -1 epoch -1 -> leader 2 epoch 0
08:50:33 info: Producer0:[0] kafka-log Facility:BRKDELGT, Message[thrd:main]: my-topic-1067 [0]: delegate to broker localhost:63321/2 (rktp 0x7ef3081e4080, term 0, ref 3)
08:50:33 info: Producer0:[0] kafka-log Facility:BRKDELGT, Message[thrd:main]: my-topic-1067 [0]: delegating to broker localhost:63321/2 for partition with 0 messages (0 bytes) queued
08:50:33 info: Producer0:[0] kafka-log Facility:BRKMIGR, Message[thrd:main]: Migrating topic my-topic-1067 [0] 0x7ef3081e4080 from (none) to localhost:63321/2 (sending PARTITION_JOIN to localhost:63321/2)
08:50:33 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Partitioning 33 unassigned messages in topic my-topic-1067 to 1 partitions
08:50:33 info: Producer0:[0] kafka-log Facility:UAS, Message[thrd:main]: 33/33 messages were partitioned in topic my-topic-1067
08:50:33 info: Producer0:[0] kafka-log Facility:TOPBRK, Message[thrd:localhost:63321/bootstrap]: localhost:63321/2: Topic my-topic-1067 [0]: joining broker (rktp 0x7ef3081e4080, 33 message(s) queued)
08:50:33 info: Producer0:[0] kafka-log Facility:FETCHADD, Message[thrd:localhost:63321/bootstrap]: localhost:63321/2: Added my-topic-1067 [0] to active list (146 entries, opv 0, 33 messages queued): joining
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:57867/4:   Topic my-topic-1067 with 1 partitions
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 [0] Leader 2 Epoch 0
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:57867/4:   Topic my-topic-1067 with 1 partitions
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 [0] Leader 2 Epoch 0
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:63321/2:   Topic my-topic-1067 with 1 partitions
08:50:33 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 [0] Leader 2 Epoch 0
08:50:34 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:63321/2:   Topic my-topic-1067 with 1 partitions
08:50:34 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1067 [0] Leader 2 Epoch 0
08:50:34 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:62035/3:   Topic my-topic-1067 with 0 partitions: Broker: Unknown topic or partition
08:50:34 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Error in metadata reply for topic my-topic-1067 (PartCnt 0): Broker: Unknown topic or partition
08:50:34 info: Producer0:[0] kafka-log Facility:STATE, Message[thrd:main]: Topic my-topic-1067 changed state exists -> notexists
08:50:34 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Topic my-topic-1067 partition count changed from 1 to 0
08:50:34 info: Producer0:[0] kafka-log Facility:REMOVE, Message[thrd:main]: my-topic-1067 [0] no longer reported in metadata
08:50:34 info: Producer0:[0] kafka-log Facility:BRKMIGR, Message[thrd:main]: my-topic-1067 [0] 0x7ef3081e4080 sending final LEAVE for removal by localhost:63321/2
08:50:34 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Failing all 0 unassigned messages in topic my-topic-1067 since topic does not exist: Broker: Unknown topic or partition
08:50:34 info: Producer0:[0] kafka-log Facility:UAS, Message[thrd:main]: 0/0 messages were partitioned in topic my-topic-1067
08:50:34 info: Producer0:[0] kafka-log Facility:TOPBRK, Message[thrd:localhost:63321/bootstrap]: localhost:63321/2: Topic my-topic-1067 [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x7ef3081e4080)
08:50:34 info: Producer0:[0] kafka-log Facility:FETCHADD, Message[thrd:localhost:63321/bootstrap]: localhost:63321/2: Removed my-topic-1067 [0] from active list (1005 entries, opv 0): leaving
08:50:34 info: Producer0:[0] kafka-log Facility:TOPBRK, Message[thrd:localhost:63321/bootstrap]: localhost:63321/2: Topic my-topic-1067 [0]: no next broker, failing 34 message(s) in partition queue
08:50:34 info: Producer0:[0] kafka-log Facility:TOPPARREMOVE, Message[thrd:localhost:63321/bootstrap]: Removing toppar my-topic-1067 [0] 0x7ef3081e4080
08:50:34 info: Producer0:[0] kafka-log Facility:DESTROY, Message[thrd:localhost:63321/bootstrap]: my-topic-1067 [0]: 0x7ef3081e4080 DESTROY_FINAL

We can observe, that the state of topic my-topic-1067 changed from unknown to exists and after a second it changed from exists to notexists.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version: 2.6.0
  • Apache Kafka version: 3.8.0
  • librdkafka client configuration: (See parameters)
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Operating system: ubuntu:22.04(x64)
  • Provide broker log excerpts
  • Critical issue
@stativ
Copy link

stativ commented Dec 19, 2024

I think I've been running into this exact issue. Although logs are a bit different, the referenced #4905 seems to fix the issue - I've been running my test on repeat for a few dozens of minutes without a crash now, while before I would get a crash in about a minute. Let me attach part of the question that I intended to ask before I found this bug report, since it might help others finding this issue.

The topic I create has two partitions and it is assigned to two brokers on creation. I was not able to reproduce the problem with a single-partition topic, but I didn't try very hard since I had case that could be repeated very quickly. For testing, I use small Kafka cluster with four nodes using docker running locally on my development machine.

Sometimes, it happens that messages are not written to the topic. This seems to happen randomly. Whenever this happens, a message like the following is logged:

PARTCNT[thrd:main]: Topic TProducerMirrorBack partition count changed from 2 to 0

I tried to add a while loop calling ::RdKafka::Handle::metadata() repeatedly until the metadata reports non-zero number of partitions between the code creating the partition and the producer. However, the problem persists, even if there is non-zero number of partitions with non-empty replicas and isrs. If the problem occurs, the message "partition count changed from 2 to 0" always shows after the metadata() check has passed.

I was able to workaround the problem temporarily by adding a sleep(2) between the code for creating topic and writing into it.

The essence of the code I use to create the topic follows. I just simplified error checks to keep it more concise, none of the handled errors actually happen.

char errstr_[512];
rd_kafka_resp_err_t err_;
// some broker assignments
std::vector< std::int32_t > brokerassignments_{1, 2};

// prepare a topic description with some broker assignment
std::unique_ptr< rd_kafka_queue_t, decltype(&rd_kafka_queue_destroy) > queue_(
      rd_kafka_queue_new(rk_),
      &rd_kafka_queue_destroy);
std::unique_ptr< rd_kafka_NewTopic_t, decltype(&rd_kafka_NewTopic_destroy) > newtopic_(
      rd_kafka_NewTopic_new(
          "topic",
          2,
          -1,
          errstr_,
          std::size(errstr_)),
      &rd_kafka_NewTopic_destroy);
for(std::size_t partitionid_(0); partitionid_ < brokerassignments_.size(); ++partitionid_) {
    std::int32_t broker_ids_[] = { brokerassignments_[partitionid_] };
    if(rd_kafka_NewTopic_set_replica_assignment(
            newtopic_.get(),
            static_cast< std::int32_t >(partitionid_),
            broker_ids_,
            std::size(broker_ids_),
            errstr_,
            std::size(errstr_)) != != RD_KAFKA_RESP_ERR_NO_ERROR) {
        throw std::runtime_error("NewTopic error");
    }
}

// create the topic
rd_kafka_NewTopic_t* newtopics_[] = { newtopic_.get() };
rd_kafka_CreateTopics(rk_, newtopics_, std::size(newtopics_), nullptr, queue_.get());

// wait for the result
std::unique_ptr< rd_kafka_event_t, decltype(&rd_kafka_event_destroy) > event_(
      rd_kafka_queue_poll(queue_.get(), pimpl->timeout),
      &rd_kafka_event_destroy);
if(!event_) {
    throw std::runtime_error("timeout");
}

// check the results
if(rd_kafka_event_error(event_.get()) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    throw std::runtime_error("CreateTopics error");
}
// success, but we still check that the topic was create successfully just to be sure
const rd_kafka_CreateTopics_result_t* createresult_(rd_kafka_event_CreateTopics_result(event_.get()));
size_t topicnum_;
const rd_kafka_topic_result_t** topicresult_(rd_kafka_CreateTopics_result_topics(createresult_, &topicnum_));
for(std::size_t i_(0); i_ < topicnum_; ++i_) {
    if(rd_kafka_topic_result_error(topicresult_[i_]) != RD_KAFKA_RESP_ERR_NO_ERROR) {
        throw std::runtime_error("some error");
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants