Skip to content

Commit

Permalink
Merge pull request #761 from drmingdrmer/40-refact
Browse files Browse the repository at this point in the history
Change: remove defensive check utilities
  • Loading branch information
drmingdrmer authored Apr 11, 2023
2 parents f69b4fd + 26dc883 commit 8f597f6
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 1,255 deletions.
11 changes: 0 additions & 11 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,17 @@ jobs:
fail-fast: false
matrix:
include:
# Base test
- toolchain: "nightly"
store_defensive: "on"
send_delay: "0"
features: ""

- toolchain: "stable"
store_defensive: "off"
send_delay: "0"
features: ""

# With network delay
- toolchain: "nightly"
store_defensive: "on"
send_delay: "30"
features: ""

# Feature-flag: Standard raft term
- toolchain: "nightly"
store_defensive: "on"
send_delay: "0"
features: "single-term-leader"

Expand Down Expand Up @@ -119,7 +110,6 @@ jobs:
RUST_TEST_THREADS: 2
RUST_LOG: debug
RUST_BACKTRACE: full
OPENRAFT_STORE_DEFENSIVE: ${{ matrix.store_defensive }}
OPENRAFT_NETWORK_SEND_DELAY: ${{ matrix.send_delay }}


Expand All @@ -133,7 +123,6 @@ jobs:
RUST_TEST_THREADS: 2
RUST_LOG: debug
RUST_BACKTRACE: full
OPENRAFT_STORE_DEFENSIVE: ${{ matrix.store_defensive }}
OPENRAFT_NETWORK_SEND_DELAY: ${{ matrix.send_delay }}


Expand Down
314 changes: 0 additions & 314 deletions openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,326 +2,12 @@ use std::collections::Bound;
use std::fmt::Debug;
use std::ops::RangeBounds;

use async_trait::async_trait;

use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::DefensiveError;
use crate::ErrorSubject;
use crate::LogId;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Violation;
use crate::Vote;
use crate::Wrapper;

/// Defines methods of defensive checks for RaftStorage independent of the storage type.
// TODO This can be extended by other methods, as needed. Currently only moved the one for LogReader
pub trait DefensiveCheckBase<C: RaftTypeConfig> {
/// Enable or disable defensive check when calling storage APIs.
fn set_defensive(&self, v: bool);

fn is_defensive(&self) -> bool;

/// The range must not be empty otherwise it is an inappropriate action.
fn defensive_nonempty_range<RB: RangeBounds<u64> + Clone + Debug + Send>(
&self,
range: RB,
) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}
let start = match range.start_bound() {
Bound::Included(i) => Some(*i),
Bound::Excluded(i) => Some(*i + 1),
Bound::Unbounded => None,
};

let end = match range.end_bound() {
Bound::Included(i) => Some(*i),
Bound::Excluded(i) => Some(*i - 1),
Bound::Unbounded => None,
};

if start.is_none() || end.is_none() {
return Ok(());
}

if start > end {
return Err(DefensiveError::new(ErrorSubject::Logs, Violation::RangeEmpty { start, end }).into());
}

Ok(())
}
}

/// Defines methods of defensive checks for RaftStorage.
#[async_trait]
pub trait DefensiveCheck<C, T>: DefensiveCheckBase<C>
where
C: RaftTypeConfig,
T: RaftStorage<C>,
Self: Wrapper<C, T>,
{
/// Ensure that logs that have greater index than last_applied should have greater log_id.
/// Invariant must hold: `log.log_id.index > last_applied.index` implies `log.log_id >
/// last_applied`.
async fn defensive_no_dirty_log(&mut self) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let last_log_id = self.inner().get_log_state().await?.last_log_id;
let (last_applied, _) = self.inner().last_applied_state().await?;

if last_log_id.index() > last_applied.index() && last_log_id < last_applied {
return Err(
DefensiveError::new(ErrorSubject::Log(last_log_id.unwrap()), Violation::DirtyLog {
higher_index_log_id: last_log_id.unwrap(),
lower_index_log_id: last_applied.unwrap(),
})
.into(),
);
}

Ok(())
}

/// Ensure that current_term must increment for every update, and for every term there could be
/// only one value for voted_for.
async fn defensive_incremental_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let h = self.inner().read_vote().await?;

let curr = h.unwrap_or_default();

if vote >= &curr {
// OK
} else {
return Err(
DefensiveError::new(ErrorSubject::Vote, Violation::VoteNotAscending { curr, to: *vote }).into(),
);
}

Ok(())
}

/// The log entries fed into a store must be consecutive otherwise it is a bug.
async fn defensive_consecutive_input(&self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

if entries.is_empty() {
return Ok(());
}

let mut prev_log_id = *entries[0].get_log_id();

for e in entries.iter().skip(1) {
if e.get_log_id().index != prev_log_id.index + 1 {
return Err(DefensiveError::new(ErrorSubject::Logs, Violation::LogsNonConsecutive {
prev: Some(prev_log_id),
next: *e.get_log_id(),
})
.into());
}

prev_log_id = *e.get_log_id();
}

Ok(())
}

/// Trying to feed in empty entries slice is an inappropriate action.
///
/// The impl has to avoid this otherwise it may be a bug.
async fn defensive_nonempty_input(&self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

if entries.is_empty() {
return Err(DefensiveError::new(ErrorSubject::Logs, Violation::LogsEmpty).into());
}

Ok(())
}

/// The entries to append has to be last_log_id.index + 1
async fn defensive_append_log_index_is_last_plus_one(
&mut self,
entries: &[C::Entry],
) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let last_id = self.inner().get_log_state().await?.last_log_id;

let first_id = entries[0].get_log_id();
if last_id.next_index() != first_id.index {
return Err(
DefensiveError::new(ErrorSubject::Log(*first_id), Violation::LogsNonConsecutive {
prev: last_id,
next: *first_id,
})
.into(),
);
}

Ok(())
}

/// The entries to append has to be greater than any known log ids
async fn defensive_append_log_id_gt_last(&mut self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let last_id = self.inner().get_log_state().await?.last_log_id;

let first_id = *entries[0].get_log_id();
// TODO(xp): test first eq last.
// TODO(xp): test last == None is ok
if last_id.is_some() && Some(first_id) <= last_id {
return Err(
DefensiveError::new(ErrorSubject::Log(first_id), Violation::LogsNonConsecutive {
prev: last_id,
next: first_id,
})
.into(),
);
}

Ok(())
}

async fn defensive_purge_applied_le_last_applied(
&mut self,
upto: LogId<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
let (last_applied, _) = self.inner().last_applied_state().await?;
if Some(upto.index) > last_applied.index() {
return Err(
DefensiveError::new(ErrorSubject::Log(upto), Violation::PurgeNonApplied {
last_applied,
purge_upto: upto,
})
.into(),
);
}
Ok(())
}

async fn defensive_delete_conflict_gt_last_applied(
&mut self,
since: LogId<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
let (last_applied, _) = self.inner().last_applied_state().await?;
if Some(since.index) <= last_applied.index() {
return Err(
DefensiveError::new(ErrorSubject::Log(since), Violation::AppliedWontConflict {
last_applied,
first_conflict_log_id: since,
})
.into(),
);
}
Ok(())
}

/// The entries to apply to state machine has to be last_applied_log_id.index + 1
async fn defensive_apply_index_is_last_applied_plus_one(
&mut self,
entries: &[C::Entry],
) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let (last_id, _) = self.inner().last_applied_state().await?;

let first_id = *entries[0].get_log_id();
if last_id.next_index() != first_id.index {
return Err(
DefensiveError::new(ErrorSubject::Apply(first_id), Violation::ApplyNonConsecutive {
prev: last_id,
next: first_id,
})
.into(),
);
}

Ok(())
}

/// Requires a range must be at least half open: (-oo, n] or [n, +oo);
/// In order to keep logs continuity.
async fn defensive_half_open_range<RB: RangeBounds<u64> + Clone + Debug + Send>(
&self,
range: RB,
) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

if let Bound::Unbounded = range.start_bound() {
return Ok(());
};

if let Bound::Unbounded = range.end_bound() {
return Ok(());
};

Err(DefensiveError::new(ErrorSubject::Logs, Violation::RangeNotHalfOpen {
start: range.start_bound().cloned(),
end: range.end_bound().cloned(),
})
.into())
}

/// An range operation such as get or delete has to actually covers some log entries in store.
async fn defensive_range_hits_logs<RB: RangeBounds<u64> + Debug + Send>(
&self,
range: RB,
logs: &[C::Entry],
) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

check_range_matches_entries::<C, RB>(range, logs)?;
Ok(())
}

/// The log id of the entries to apply has to be greater than the last known one.
async fn defensive_apply_log_id_gt_last(&mut self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>> {
if !self.is_defensive() {
return Ok(());
}

let (last_id, _) = self.inner().last_applied_state().await?;

let first_id = *entries[0].get_log_id();
// TODO(xp): test first eq last
if Some(first_id) <= last_id {
return Err(
DefensiveError::new(ErrorSubject::Apply(first_id), Violation::ApplyNonConsecutive {
prev: last_id,
next: first_id,
})
.into(),
);
}

Ok(())
}
}

pub fn check_range_matches_entries<C: RaftTypeConfig, RB: RangeBounds<u64> + Debug + Send>(
range: RB,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ where
// Check if any replication task is going to use the log that are going to purge.
let mut in_use = false;
for (id, prog_entry) in self.leader.progress.iter() {
if prog_entry.is_inflight(&purge_upto) {
if prog_entry.is_log_range_inflight(&purge_upto) {
tracing::debug!("log {} is in use by {}", purge_upto, id);
in_use = true;
}
Expand Down
Loading

0 comments on commit 8f597f6

Please sign in to comment.