Skip to content

Commit

Permalink
Feature: add "Inflight" to store info about inflight replication data
Browse files Browse the repository at this point in the history
But it is not used anywhere until replication progress control is moved
from `ReplicationCore` to `Engine`. `Inflight` will be part of the
**progress** data.
  • Loading branch information
drmingdrmer committed Jan 17, 2023
1 parent 7b67f11 commit 541e9d3
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 0 deletions.
2 changes: 2 additions & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ mod store_wrapper;
mod summary;
mod vote;

pub(crate) mod log_id_range;

mod engine;
pub mod error;
mod internal_server_state;
Expand Down
34 changes: 34 additions & 0 deletions openraft/src/log_id_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::fmt::Display;
use std::fmt::Formatter;

use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;

/// A log id range of continuous series of log entries.
///
/// The range of log to send is left open right close: `(prev_log_id, last_log_id]`.
#[derive(Clone, Copy, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct LogIdRange<NID: NodeId> {
/// The prev log id before the first to send, exclusive.
pub(crate) prev_log_id: Option<LogId<NID>>,

/// The last log id to send, inclusive.
pub(crate) last_log_id: Option<LogId<NID>>,
}

impl<NID: NodeId> Display for LogIdRange<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {}]", self.prev_log_id.summary(), self.last_log_id.summary())
}
}

impl<NID: NodeId> LogIdRange<NID> {
pub(crate) fn new(prev: Option<LogId<NID>>, last: Option<LogId<NID>>) -> Self {
Self {
prev_log_id: prev,
last_log_id: last,
}
}
}
287 changes: 287 additions & 0 deletions openraft/src/progress/inflight.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// TODO: remove it
#![allow(unused)]

use std::fmt::Display;
use std::fmt::Formatter;

use crate::log_id_range::LogIdRange;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;

/// The inflight data being transmitting from leader to a follower/learner.
///
/// If inflight data is non-None, it's waiting for responses from a follower/learner.
/// The follower/learner respond with `ack()` or `conflict()` to update the state of inflight data.
#[derive(Clone, Copy, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) enum Inflight<NID: NodeId> {
None,

/// Being replicating a series of logs.
Logs(LogIdRange<NID>),

/// Being replicating a snapshot.
Snapshot {
/// The last log id snapshot includes.
///
/// It is None, if the snapshot is empty.
last_log_id: Option<LogId<NID>>,
},
}

impl<NID: NodeId> Display for Inflight<NID> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Inflight::None => {
write!(f, "None")
}
Inflight::Logs(l) => {
write!(f, "Logs:{}", l)
}
Inflight::Snapshot { last_log_id: last_next } => {
write!(f, "Snapshot:{}", last_next.summary())
}
}
}
}

impl<NID: NodeId> Inflight<NID> {
pub(crate) fn logs(prev: Option<LogId<NID>>, last: Option<LogId<NID>>) -> Self {
#![allow(clippy::nonminimal_bool)]
if !(prev < last) {
Self::None
} else {
Self::Logs(LogIdRange::new(prev, last))
}
}

pub(crate) fn snapshot(snapshot_last_log_id: Option<LogId<NID>>) -> Self {
Self::Snapshot {
last_log_id: snapshot_last_log_id,
}
}

pub(crate) fn is_none(&self) -> bool {
&Inflight::None == self
}

// test it if used
#[allow(dead_code)]
pub(crate) fn is_sending_log(&self) -> bool {
matches!(self, Inflight::Logs(_))
}

// test it if used
#[allow(dead_code)]
pub(crate) fn is_sending_snapshot(&self) -> bool {
matches!(self, Inflight::Snapshot { .. })
}

/// Update inflight state when log upto `upto` is acknowledged by a follower/learner.
pub(crate) fn ack(&mut self, upto: Option<LogId<NID>>) {
match self {
Inflight::None => {
unreachable!("no inflight data")
}
Inflight::Logs(logs) => {
*self = {
debug_assert!(upto >= logs.prev_log_id);
debug_assert!(upto <= logs.last_log_id);
Inflight::logs(upto, logs.last_log_id)
}
}
Inflight::Snapshot { last_log_id } => {
debug_assert_eq!(&upto, last_log_id);
*self = Inflight::None;
}
}
}

/// Update inflight state when a conflicting log id is responded by a follower/learner.
pub(crate) fn conflict(&mut self, conflict: u64) {
match self {
Inflight::None => {
unreachable!("no inflight data")
}
Inflight::Logs(logs) => {
// if prev_log_id==None, it will never conflict
debug_assert_eq!(Some(conflict), logs.prev_log_id.index());
*self = Inflight::None
}
Inflight::Snapshot { last_log_id: _ } => {
unreachable!("sending snapshot should not conflict");
}
}
}
}

#[cfg(test)]
mod tests {
use crate::log_id_range::LogIdRange;
use crate::progress::Inflight;
use crate::LeaderId;
use crate::LogId;

fn log_id(index: u64) -> LogId<u64> {
LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
index,
}
}

#[test]
fn test_inflight_create() -> anyhow::Result<()> {
// Logs
let l = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
assert_eq!(Inflight::Logs(LogIdRange::new(Some(log_id(5)), Some(log_id(10)))), l);
assert!(!l.is_none());

// Empty range
let l = Inflight::logs(Some(log_id(11)), Some(log_id(10)));
assert_eq!(Inflight::None, l);
assert!(l.is_none());

// Snapshot
let l = Inflight::snapshot(Some(log_id(10)));
assert_eq!(
Inflight::Snapshot {
last_log_id: Some(log_id(10))
},
l
);
assert!(!l.is_none());

Ok(())
}

#[test]
fn test_inflight_is_xxx() -> anyhow::Result<()> {
let l = Inflight::<u64>::None;
assert!(l.is_none());

let l = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
assert!(l.is_sending_log());

let l = Inflight::snapshot(Some(log_id(10)));
assert!(l.is_sending_snapshot());

Ok(())
}

#[test]
fn test_inflight_ack() -> anyhow::Result<()> {
// Update matching when no inflight data
{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::<u64>::None;
f.ack(Some(log_id(4)));
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "Inflight::None can not ack");
}

// Update matching when transmitting by logs
{
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));

f.ack(Some(log_id(5)));
assert_eq!(Inflight::logs(Some(log_id(5)), Some(log_id(10))), f);

f.ack(Some(log_id(6)));
assert_eq!(Inflight::logs(Some(log_id(6)), Some(log_id(10))), f);

f.ack(Some(log_id(9)));
assert_eq!(Inflight::logs(Some(log_id(9)), Some(log_id(10))), f);

f.ack(Some(log_id(10)));
assert_eq!(Inflight::None, f);

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
f.ack(Some(log_id(4)));
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "non-matching ack < prev_log_id");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
f.ack(Some(log_id(11)));
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "non-matching ack > prev_log_id");
}
}

// Update matching when transmitting by snapshot
{
{
let mut f = Inflight::snapshot(Some(log_id(5)));
f.ack(Some(log_id(5)));
assert_eq!(Inflight::None, f, "valid ack");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::snapshot(Some(log_id(5)));
f.ack(Some(log_id(4)));
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "non-matching ack != snapshot.last_log_id");
}
}

Ok(())
}

#[test]
fn test_inflight_conflict() -> anyhow::Result<()> {
{
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
f.conflict(5);
assert_eq!(Inflight::None, f, "valid conflict");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
f.conflict(4);
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "non-matching conflict < prev_log_id");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::logs(Some(log_id(5)), Some(log_id(10)));
f.conflict(6);
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "non-matching conflict > prev_log_id");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::<u64>::None;
f.conflict(5);
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "conflict is not expected by Inflight::None");
}

{
let res = std::panic::catch_unwind(|| {
let mut f = Inflight::snapshot(Some(log_id(5)));
f.conflict(5);
});
tracing::info!("res: {:?}", res);
assert!(res.is_err(), "conflict is not expected by Inflight::Snapshot");
}

Ok(())
}
}
4 changes: 4 additions & 0 deletions openraft/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
#[cfg(test)]
mod bench;
pub(crate) mod entry;
mod inflight;

use std::borrow::Borrow;
use std::fmt::Debug;
use std::slice::Iter;

// TODO: remove it
#[allow(unused_imports)] pub(crate) use inflight::Inflight;

use crate::quorum::QuorumSet;

/// Track progress of several incremental values.
Expand Down

0 comments on commit 541e9d3

Please sign in to comment.