diff --git a/Cargo.lock b/Cargo.lock index cc3669d6c..bf178f64a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3876,7 +3876,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.51" +version = "0.4.52" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/sdk/src/error.rs b/sdk/src/error.rs index e8b2d33d3..206a11a50 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -272,6 +272,8 @@ pub enum IggyError { CannotDeleteConsumerOffsetsDirectory(String) = 3010, #[error("Failed to delete consumer offset file for path: {0}")] CannotDeleteConsumerOffsetFile(String) = 3011, + #[error("Failed to create consumer offsets directory for path: {0}")] + CannotCreateConsumerOffsetsDirectory(String) = 3012, #[error("Failed to read consumers offsets from path: {0}")] CannotReadConsumerOffsets(String) = 3020, #[error("Segment not found")] diff --git a/server/Cargo.toml b/server/Cargo.toml index 51282ec9b..391832121 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.51" +version = "0.4.52" edition = "2021" build = "src/build.rs" diff --git a/server/src/streaming/partitions/persistence.rs b/server/src/streaming/partitions/persistence.rs index 75764e80b..f34009f74 100644 --- a/server/src/streaming/partitions/persistence.rs +++ b/server/src/streaming/partitions/persistence.rs @@ -1,8 +1,10 @@ -use std::sync::atomic::Ordering; - use crate::state::system::PartitionState; use crate::streaming::partitions::partition::Partition; use iggy::error::IggyError; +use std::path::Path; +use std::sync::atomic::Ordering; +use tokio::fs::create_dir; +use tracing::error; impl Partition { pub async fn load(&mut self, state: PartitionState) -> Result<(), IggyError> { @@ -48,6 +50,30 @@ impl Partition { .await?; self.add_persisted_segment(0).await?; + if !Path::new(&self.consumer_offsets_path).exists() + && create_dir(&self.consumer_offsets_path).await.is_err() + { + error!( + "Failed to create consumer offsets directory for partition with ID: {} for stream with ID: {} and topic with ID: {}.", + self.partition_id, self.stream_id, self.topic_id + ); + return Err(IggyError::CannotCreateConsumerOffsetsDirectory( + self.consumer_offsets_path.to_owned(), + )); + } + + if !Path::new(&self.consumer_group_offsets_path).exists() + && create_dir(&self.consumer_group_offsets_path).await.is_err() + { + error!( + "Failed to create consumer group offsets directory for partition with ID: {} for stream with ID: {} and topic with ID: {}.", + self.partition_id, self.stream_id, self.topic_id + ); + return Err(IggyError::CannotCreateConsumerOffsetsDirectory( + self.consumer_group_offsets_path.to_owned(), + )); + } + Ok(()) } }