Skip to content

Commit

Permalink
Recreate consumer offset directory after purging topic (#1269)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Oct 1, 2024
1 parent 65a2247 commit e8fb738
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ pub enum IggyError {
CannotDeleteConsumerOffsetsDirectory(String) = 3010,
#[error("Failed to delete consumer offset file for path: {0}")]
CannotDeleteConsumerOffsetFile(String) = 3011,
#[error("Failed to create consumer offsets directory for path: {0}")]
CannotCreateConsumerOffsetsDirectory(String) = 3012,
#[error("Failed to read consumers offsets from path: {0}")]
CannotReadConsumerOffsets(String) = 3020,
#[error("Segment not found")]
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.51"
version = "0.4.52"
edition = "2021"
build = "src/build.rs"

Expand Down
30 changes: 28 additions & 2 deletions server/src/streaming/partitions/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::atomic::Ordering;

use crate::state::system::PartitionState;
use crate::streaming::partitions::partition::Partition;
use iggy::error::IggyError;
use std::path::Path;
use std::sync::atomic::Ordering;
use tokio::fs::create_dir;
use tracing::error;

impl Partition {
pub async fn load(&mut self, state: PartitionState) -> Result<(), IggyError> {
Expand Down Expand Up @@ -48,6 +50,30 @@ impl Partition {
.await?;
self.add_persisted_segment(0).await?;

if !Path::new(&self.consumer_offsets_path).exists()
&& create_dir(&self.consumer_offsets_path).await.is_err()
{
error!(
"Failed to create consumer offsets directory for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
self.partition_id, self.stream_id, self.topic_id
);
return Err(IggyError::CannotCreateConsumerOffsetsDirectory(
self.consumer_offsets_path.to_owned(),
));
}

if !Path::new(&self.consumer_group_offsets_path).exists()
&& create_dir(&self.consumer_group_offsets_path).await.is_err()
{
error!(
"Failed to create consumer group offsets directory for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
self.partition_id, self.stream_id, self.topic_id
);
return Err(IggyError::CannotCreateConsumerOffsetsDirectory(
self.consumer_group_offsets_path.to_owned(),
));
}

Ok(())
}
}

0 comments on commit e8fb738

Please sign in to comment.