Skip to content

Commit

Permalink
dekaf: Bump kafka-protocol to 0.13.0 and upgrade the Fetch API to…
Browse files Browse the repository at this point in the history
… support flexible versions.

I _believe_ this should fix the warning about `Message at offset xx might be too large to fetch, try increasing receive.message.max.bytes`.
  • Loading branch information
jshearer committed Oct 29, 2024
1 parent 98041fc commit ee70d45
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 148 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ flate2 = "1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
fxhash = "0.2" # Used in `json` crate. Replace with xxhash.
hex = "0.4.3"
hexdump = "0.1"
humantime = "2.1"
Expand All @@ -72,9 +72,7 @@ jemalloc-ctl = "0.3"
json-patch = "0.3"
jsonwebtoken = { version = "9", default-features = false }
js-sys = "0.3.60"
# TODO(jshearer): Swap back after 0.13.0 is released, which includes
# https://github.com/tychedelia/kafka-protocol-rs/pull/81
kafka-protocol = { git = "https://github.com/tychedelia/kafka-protocol-rs.git", rev = "cabe835" }
kafka-protocol = "0.13.0"
lazy_static = "1.4"
libc = "0.2"
librocksdb-sys = { version = "0.16.0", default-features = false, features = [
Expand Down
28 changes: 16 additions & 12 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ impl KafkaApiClient {

let controller = resp
.brokers
.get(&resp.controller_id)
.iter()
.find(|broker| broker.node_id == resp.controller_id)
.context("Failed to find controller")?;

let controller_url = format!("tcp://{}:{}", controller.host.to_string(), controller.port);
Expand All @@ -529,7 +530,8 @@ impl KafkaApiClient {
let version = self
.versions
.api_keys
.get(&api_key)
.iter()
.find(|version| version.api_key == api_key)
.context(format!("Unknown API key {api_key}"))?;

Ok(version.to_owned())
Expand All @@ -553,18 +555,20 @@ impl KafkaApiClient {
let resp = coord.send_request(req, None).await?;
tracing::debug!(metadata=?resp, "Got metadata response");

if resp
.topics
.iter()
.all(|(name, topic)| topic_names.contains(&name) && topic.error_code == 0)
{
if resp.topics.iter().all(|topic| {
topic
.name
.as_ref()
.map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0)
.unwrap_or(false)
}) {
return Ok(());
} else {
let mut topics_map = kafka_protocol::indexmap::IndexMap::new();
let mut topics_map = vec![];
for topic_name in topic_names.into_iter() {
topics_map.insert(
topic_name,
topics_map.push(
messages::create_topics_request::CreatableTopic::default()
.with_name(topic_name)
.with_replication_factor(2)
.with_num_partitions(-1),
);
Expand All @@ -573,11 +577,11 @@ impl KafkaApiClient {
let create_resp = coord.send_request(create_req, None).await?;
tracing::debug!(create_response=?create_resp, "Got create response");

for (name, topic) in create_resp.topics {
for topic in create_resp.topics {
if topic.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
tracing::warn!(
topic = name.to_string(),
topic = topic.name.to_string(),
error = ?err,
message = topic.error_message.map(|m|m.to_string()),
"Failed to create topic"
Expand Down
Loading

0 comments on commit ee70d45

Please sign in to comment.