Skip to content

Commit

Permalink
dekaf: Expire Reads after 5m
Browse files Browse the repository at this point in the history
We saw problems before where something about a `journal::Client` would stop responding after some amount of time in use. We were seeing issues where consumers would stop seeing updates to journals after a certain amount of time, and I just realized that it's possible for a `Read` (which contains a `journal::Client`) to live for a long time. This times out a `Read` after 5 minutes, so it get a new `journal::Client`, and hopefully bypasses the issue.
  • Loading branch information
jshearer committed Oct 24, 2024
1 parent 04e8740 commit 9e1ca03
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum SessionDataPreviewState {

pub struct Session {
app: Arc<App>,
reads: HashMap<(TopicName, i32), PendingRead>,
reads: HashMap<(TopicName, i32), (PendingRead, std::time::Instant)>,
secret: String,
auth: Option<Authenticated>,
data_preview_state: SessionDataPreviewState,
Expand Down Expand Up @@ -454,16 +454,33 @@ impl Session {
}
};

if matches!(self.reads.get(&key), Some(pending) if pending.offset == fetch_offset) {
metrics::counter!(
"dekaf_fetch_requests",
"topic_name" => key.0.to_string(),
"partition_index" => key.1.to_string(),
"state" => "read_pending"
)
.increment(1);
continue; // Common case: fetch is at the pending offset.
match self.reads.get(&key) {
Some((_, started_at))
if started_at.elapsed() > std::time::Duration::from_secs(60 * 5) =>
{
metrics::counter!(
"dekaf_fetch_requests",
"topic_name" => key.0.to_string(),
"partition_index" => key.1.to_string(),
"state" => "read_expired"
)
.increment(1);
tracing::debug!(lifetime=?started_at.elapsed(), topic_name=?key.0,partition_index=?key.1, "Restarting expired Read");
self.reads.remove(&key);
}
Some(_) => {
metrics::counter!(
"dekaf_fetch_requests",
"topic_name" => key.0.to_string(),
"partition_index" => key.1.to_string(),
"state" => "read_pending"
)
.increment(1);
continue; // Common case: fetch is at the pending offset.
}
_ => {}
}

let Some(collection) = Collection::new(&client, &key.0).await? else {
metrics::counter!(
"dekaf_fetch_requests",
Expand Down Expand Up @@ -566,12 +583,16 @@ impl Session {
"started read",
);

if let Some(old) = self.reads.insert(key.clone(), pending) {
if let Some((old, started_at)) = self
.reads
.insert(key.clone(), (pending, std::time::Instant::now()))
{
tracing::warn!(
topic = topic_request.topic.as_str(),
partition = partition_request.partition,
old_offset = old.offset,
new_offset = fetch_offset,
read_lifetime = ?started_at.elapsed(),
"discarding pending read due to offset jump",
);
}
Expand All @@ -588,7 +609,7 @@ impl Session {
for partition_request in &topic_request.partitions {
key.1 = partition_request.partition;

let Some(pending) = self.reads.get_mut(&key) else {
let Some((pending, _)) = self.reads.get_mut(&key) else {
partition_responses.push(
PartitionData::default()
.with_partition_index(partition_request.partition)
Expand Down

0 comments on commit 9e1ca03

Please sign in to comment.