Skip to content

Commit

Permalink
Change: API: move default impl methods in RaftStorage to StorageHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 4, 2022
1 parent 84b32de commit f99ade3
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 222 deletions.
11 changes: 7 additions & 4 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;
use crate::StorageHelper;
use crate::Update;

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
Expand Down Expand Up @@ -113,7 +114,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// TODO(xp): get_membership() should have a defensive check to ensure it always returns Some() if node is
// initialized. Because a node always commit a membership log as the first log entry.
let membership = self.storage.get_membership().await?;
let membership = StorageHelper::new(&self.storage).get_membership().await?;

// TODO(xp): This is a dirty patch:
// When a node starts in a single-node mode, it does not append an initial log
Expand Down Expand Up @@ -284,7 +285,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let index = log_id.index;

// TODO(xp): this is a naive impl. Batch loading entries from storage.
let log = self.storage.try_get_log_entry(index).await?;
let log = StorageHelper::new(&self.storage).try_get_log_entry(index).await?;

if let Some(local) = log {
if local.log_id == log_id {
Expand Down Expand Up @@ -317,7 +318,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

let index = log_id.index;

let log = self.storage.try_get_log_entry(index).await?;
let log = StorageHelper::new(&self.storage).try_get_log_entry(index).await?;
tracing::debug!(
"check log id matching: local: {:?} remote: {}",
log.as_ref().map(|x| x.log_id),
Expand Down Expand Up @@ -393,7 +394,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Drain entries from the beginning of the cache up to commit index.

let entries = self.storage.get_log_entries(self.last_applied.next_index()..self.committed.next_index()).await?;
let entries = StorageHelper::new(&self.storage)
.get_log_entries(self.last_applied.next_index()..self.committed.next_index())
.await?;

let last_log_id = entries.last().map(|x| x.log_id).unwrap();

Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;
use crate::StorageHelper;

/// A wrapper around a ClientRequest which has been transformed into an Entry, along with its response channel.
pub(super) struct ClientRequestEntry<D: AppData, R: AppDataResponse> {
Expand Down Expand Up @@ -304,7 +305,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
};

if index != expected_next_index {
let entries = self.core.storage.get_log_entries(expected_next_index..index).await?;
let entries = StorageHelper::new(&self.core.storage).get_log_entries(expected_next_index..index).await?;

if let Some(entry) = entries.last() {
self.core.last_applied = Some(entry.log_id);
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::RaftNetwork;
use crate::RaftStorage;
use crate::SnapshotSegmentId;
use crate::StorageError;
use crate::StorageHelper;
use crate::StorageIOError;
use crate::Update;

Expand Down Expand Up @@ -249,7 +250,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

// There could be unknown membership in the snapshot.
let membership = self.storage.get_membership().await?;
let membership = StorageHelper::new(&self.storage).get_membership().await?;
tracing::debug!("storage membership: {:?}", membership);

assert!(membership.is_some());
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::types::v070::RaftNetwork;
use crate::types::v070::RaftStorage;
use crate::types::v070::StorageError;
use crate::MessageSummary;
use crate::StorageHelper;
use crate::Update;

impl EffectiveMembership {
Expand Down Expand Up @@ -223,7 +224,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn do_main(&mut self) -> Result<(), Fatal> {
tracing::debug!("raft node is initializing");

let state = self.storage.get_initial_state().await?;
let state = StorageHelper::new(&self.storage).get_initial_state().await?;

// TODO(xp): this is not necessary.
self.storage.save_hard_state(&state.hard_state).await?;
Expand Down Expand Up @@ -600,7 +601,7 @@ where
return Ok(());
}

let log_id = sto.get_log_id(end - 1).await?;
let log_id = StorageHelper::new(&sto).get_log_id(end - 1).await?;
sto.purge_logs_upto(log_id).await
}

Expand Down
2 changes: 2 additions & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod types;

#[cfg(test)]
mod metrics_wait_test;
mod storage_helper;

pub use async_trait;

Expand All @@ -35,6 +36,7 @@ pub use crate::raft::Raft;
pub use crate::raft_types::LogIdOptionExt;
pub use crate::raft_types::Update;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage_helper::StorageHelper;
pub use crate::store_ext::StoreExt;
pub use crate::store_wrapper::Wrapper;
pub use crate::summary::MessageSummary;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::Snapshot;
use crate::StorageHelper;

#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationMetrics {
Expand Down Expand Up @@ -297,7 +298,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
let prev_log_id = if prev_index == last_purged.index() {
last_purged
} else if let Some(prev_i) = prev_index {
let first = self.storage.try_get_log_entry(prev_i).await?;
let first = StorageHelper::new(&self.storage).try_get_log_entry(prev_i).await?;
match first {
Some(f) => Some(f.log_id),
None => {
Expand Down
153 changes: 153 additions & 0 deletions openraft/src/storage_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeBounds;

use crate::defensive::check_range_matches_entries;
use crate::AppData;
use crate::AppDataResponse;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
use crate::InitialState;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftStorage;
use crate::StorageError;

/// StorageHelper provides additional methods to access a RaftStorage implementation.
pub struct StorageHelper<'a, D, R, S, Sto>
where
D: AppData,
R: AppDataResponse,

S: RaftStorage<D, R>,
Sto: AsRef<S>,
{
pub(crate) sto: &'a Sto,
_p: PhantomData<(D, R, S)>,
}

impl<'a, D, R, S, Sto> StorageHelper<'a, D, R, S, Sto>
where
D: AppData,
R: AppDataResponse,
S: RaftStorage<D, R>,
Sto: AsRef<S>,
{
pub fn new(sto: &'a Sto) -> Self {
Self {
sto,
_p: Default::default(),
}
}

/// Get Raft's state information from storage.
///
/// When the Raft node is first started, it will call this interface to fetch the last known state from stable
/// storage.
pub async fn get_initial_state(&self) -> Result<InitialState, StorageError> {
let hs = self.sto.as_ref().read_hard_state().await?;
let st = self.sto.as_ref().get_log_state().await?;
let mut last_log_id = st.last_log_id;
let (last_applied, _) = self.sto.as_ref().last_applied_state().await?;
let membership = self.get_membership().await?;

// Clean up dirty state: snapshot is installed but logs are not cleaned.
if last_log_id < last_applied {
self.sto.as_ref().purge_logs_upto(last_applied.unwrap()).await?;
last_log_id = last_applied;
}

Ok(InitialState {
last_log_id,
last_applied,
hard_state: hs.unwrap_or_default(),
last_membership: membership,
})
}

/// Returns the last membership config found in log or state machine.
pub async fn get_membership(&self) -> Result<Option<EffectiveMembership>, StorageError> {
let (_, sm_mem) = self.sto.as_ref().last_applied_state().await?;

let sm_mem_index = match &sm_mem {
None => 0,
Some(mem) => mem.log_id.index,
};

let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?;

if log_mem.is_some() {
return Ok(log_mem);
}

Ok(sm_mem)
}

/// Get the latest membership config found in the log.
///
/// This method should returns membership with the greatest log index which is `>=since_index`.
/// If no such membership log is found, it returns `None`, e.g., when logs are cleaned after being applied.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn last_membership_in_log(&self, since_index: u64) -> Result<Option<EffectiveMembership>, StorageError> {
let st = self.sto.as_ref().get_log_state().await?;

let mut end = st.last_log_id.next_index();
let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index);
let step = 64;

while start < end {
let step_start = std::cmp::max(start, end.saturating_sub(step));
let entries = self.sto.as_ref().try_get_log_entries(step_start..end).await?;

for ent in entries.iter().rev() {
if let EntryPayload::Membership(ref mem) = ent.payload {
return Ok(Some(EffectiveMembership {
log_id: ent.log_id,
membership: mem.clone(),
}));
}
}

end = end.saturating_sub(step);
}

Ok(None)
}

/// Get a series of log entries from storage.
///
/// Similar to `try_get_log_entries` except an error will be returned if there is an entry not found in the
/// specified range.
pub async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
range: RB,
) -> Result<Vec<Entry<D>>, StorageError> {
let res = self.sto.as_ref().try_get_log_entries(range.clone()).await?;

check_range_matches_entries(range, &res)?;

Ok(res)
}

/// Try to get an log entry.
///
/// It does not return an error if the log entry at `log_index` is not found.
pub async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<D>>, StorageError> {
let mut res = self.sto.as_ref().try_get_log_entries(log_index..(log_index + 1)).await?;
Ok(res.pop())
}

/// Get the log id of the entry at `index`.
pub async fn get_log_id(&self, log_index: u64) -> Result<LogId, StorageError> {
let st = self.sto.as_ref().get_log_state().await?;

if Some(log_index) == st.last_purged_log_id.index() {
return Ok(st.last_purged_log_id.unwrap());
}

let entries = self.get_log_entries(log_index..=log_index).await?;

Ok(entries[0].log_id)
}
}
Loading

0 comments on commit f99ade3

Please sign in to comment.