Skip to content

Commit

Permalink
Fix: compat07::SnapshotMeta should decode v08 SnapshotMeta
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Apr 15, 2023
1 parent 04e4060 commit 3433126
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:

# - A store with defensive checks returns error when unexpected accesses are sent to RaftStore.
# - Raft should not depend on defensive error to work correctly.
- name: Unit Tests, with and without defensive store
- name: Unit Tests
uses: actions-rs/cargo@v1
with:
command: test
Expand Down
236 changes: 53 additions & 183 deletions openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,189 +68,21 @@
//! }
//! ```

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Debug;

use crate::compat::Compat;
use crate::compat::Upgrade;

impl Upgrade<crate::LogId<u64>> for or07::LogId {
fn upgrade(self) -> crate::LogId<u64> {
let committed_leader_id = crate::CommittedLeaderId::new(self.term, 0);
crate::LogId::new(committed_leader_id, self.index)
}
}

impl Upgrade<crate::Membership<u64, crate::EmptyNode>> for or07::Membership {
fn upgrade(self) -> crate::Membership<u64, crate::EmptyNode> {
let configs = self.get_configs().clone();
let nodes = self.all_nodes().iter().map(|nid| (*nid, crate::EmptyNode::new())).collect::<BTreeMap<_, _>>();
crate::Membership::new(configs, nodes)
}
}

impl Upgrade<crate::StoredMembership<u64, crate::EmptyNode>> for or07::EffectiveMembership {
fn upgrade(self) -> crate::StoredMembership<u64, crate::EmptyNode> {
let membership = self.membership.upgrade();
let log_id = self.log_id.upgrade();

crate::StoredMembership::new(Some(log_id), membership)
}
}

impl<C> Upgrade<crate::EntryPayload<C>> for or07::EntryPayload<C::D>
where
C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>,
<C as crate::RaftTypeConfig>::D: or07::AppData + Debug,
{
fn upgrade(self) -> crate::EntryPayload<C> {
match self {
Self::Blank => crate::EntryPayload::Blank,
Self::Membership(m) => crate::EntryPayload::Membership(m.upgrade()),
Self::Normal(d) => crate::EntryPayload::Normal(d),
}
}
}

impl<C> Upgrade<crate::Entry<C>> for or07::Entry<C::D>
where
C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>,
<C as crate::RaftTypeConfig>::D: or07::AppData + Debug,
{
fn upgrade(self) -> crate::Entry<C> {
let log_id = self.log_id.upgrade();
let payload = self.payload.upgrade();
crate::Entry { log_id, payload }
}
}

impl Upgrade<crate::Vote<u64>> for or07::HardState {
fn upgrade(self) -> crate::Vote<u64> {
// When it has not yet voted for any node, let it vote for any node won't break the consensus.
crate::Vote::new(self.current_term, self.voted_for.unwrap_or_default())
}
}

impl Upgrade<crate::SnapshotMeta<u64, crate::EmptyNode>> for or07::SnapshotMeta {
fn upgrade(self) -> crate::SnapshotMeta<u64, crate::EmptyNode> {
unimplemented!("can not upgrade SnapshotMeta")
}
}

/// v0.7 compatible LogId.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::LogId = serde_json::from_slice::<compat07::LogId>(&serialized_bytes)?.upgrade()
/// ```
pub type LogId = Compat<or07::LogId, crate::LogId<u64>>;

/// v0.7 compatible Vote(in v0.7 the corresponding type is `HardState`).
///
/// To load from either v0.7 or the latest format data and upgrade it to latest type:
/// ```ignore
/// let x: openraft::Vote = serde_json::from_slice::<compat07::Vote>(&serialized_bytes)?.upgrade()
/// ```
pub type Vote = Compat<or07::HardState, crate::Vote<u64>>;

/// v0.7 compatible SnapshotMeta.
///
/// SnapshotMeta can not be upgraded, an old snapshot should be discarded and a new one should be
/// re-built.
pub type SnapshotMeta = Compat<or07::SnapshotMeta, crate::SnapshotMeta<u64, crate::EmptyNode>>;

/// v0.7 compatible Membership.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::Membership = serde_json::from_slice::<compat07::Membership>(&serialized_bytes)?.upgrade()
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Membership {
pub configs: Vec<BTreeSet<u64>>,
pub nodes: Option<BTreeMap<u64, crate::EmptyNode>>,
pub all_nodes: Option<BTreeSet<u64>>,
}

impl Upgrade<crate::Membership<u64, crate::EmptyNode>> for Membership {
fn upgrade(self) -> crate::Membership<u64, crate::EmptyNode> {
if let Some(ns) = self.nodes {
crate::Membership::new(self.configs, ns)
} else {
crate::Membership::new(self.configs, self.all_nodes.unwrap())
}
}
}

/// v0.7 compatible StoredMembership.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::StoredMembership = serde_json::from_slice::<compat07::StoredMembership>(&serialized_bytes)?.upgrade()
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct StoredMembership {
pub log_id: Option<LogId>,
pub membership: Membership,
#[serde(skip)]
pub quorum_set: Option<()>,
#[serde(skip)]
pub voter_ids: Option<()>,
}

impl Upgrade<crate::StoredMembership<u64, crate::EmptyNode>> for StoredMembership {
fn upgrade(self) -> crate::StoredMembership<u64, crate::EmptyNode> {
crate::StoredMembership::new(self.log_id.map(|lid| lid.upgrade()), self.membership.upgrade())
}
}

/// v0.7 compatible EntryPayload.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::EntryPayload = serde_json::from_slice::<compat07::EntryPayload>(&serialized_bytes)?.upgrade()
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum EntryPayload<C: crate::RaftTypeConfig> {
Blank,
Normal(C::D),
Membership(Membership),
}

impl<C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>> Upgrade<crate::EntryPayload<C>>
for EntryPayload<C>
{
fn upgrade(self) -> crate::EntryPayload<C> {
match self {
EntryPayload::Blank => crate::EntryPayload::Blank,
EntryPayload::Normal(d) => crate::EntryPayload::Normal(d),
EntryPayload::Membership(m) => crate::EntryPayload::Membership(m.upgrade()),
}
}
}

/// v0.7 compatible Entry.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::Entry = serde_json::from_slice::<compat07::Entry>(&serialized_bytes)?.upgrade()
/// ```
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound = "")]
pub struct Entry<C: crate::RaftTypeConfig> {
pub log_id: LogId,
pub payload: EntryPayload<C>,
}

impl<C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>> Upgrade<crate::Entry<C>> for Entry<C> {
fn upgrade(self) -> crate::Entry<C> {
crate::Entry {
log_id: self.log_id.upgrade(),
payload: self.payload.upgrade(),
}
}
}
mod entry;
mod entry_payload;
mod log_id;
mod membership;
mod snapshot_meta;
mod stored_membership;
mod vote;

pub use entry::Entry;
pub use entry_payload::EntryPayload;
pub use log_id::LogId;
pub use membership::Membership;
pub use snapshot_meta::SnapshotMeta;
pub use stored_membership::StoredMembership;
pub use vote::Vote;

pub mod testing {
use std::path::Path;
Expand Down Expand Up @@ -818,4 +650,42 @@ mod tests {
assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c.upgrade())?);
Ok(())
}

#[test]
fn test_serde_snapshot_meta() -> anyhow::Result<()> {
use super::SnapshotMeta;

let m8 = || {
crate::Membership::new(
vec![btreeset! {1,2}],
btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}},
)
};
let v7 = or07::SnapshotMeta {
last_log_id: Some(or07::LogId::new(10, 5)),
snapshot_id: "a".to_string(),
};
let want = crate::SnapshotMeta {
last_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5)),
last_membership: crate::StoredMembership::new(
Some(crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5)),
m8(),
),
snapshot_id: "a".to_string(),
};

let s = serde_json::to_string(&v7)?;
let c: SnapshotMeta = serde_json::from_str(&s)?;
assert!(
c.try_upgrade().is_err(),
"snapshot_meta can not be upgrade because it has no membership"
);

let s = serde_json::to_string(&want)?;
let c: SnapshotMeta = serde_json::from_str(&s)?;
let c = c.try_upgrade().unwrap();
assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c)?);

Ok(())
}
}
39 changes: 39 additions & 0 deletions openraft/src/compat/compat07/entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::fmt::Debug;

use super::EntryPayload;
use super::LogId;
use crate::compat::Upgrade;

/// v0.7 compatible Entry.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::Entry = serde_json::from_slice::<compat07::Entry>(&serialized_bytes)?.upgrade()
/// ```
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(bound = "")]
pub struct Entry<C: crate::RaftTypeConfig> {
pub log_id: LogId,
pub payload: EntryPayload<C>,
}

impl<C> Upgrade<crate::Entry<C>> for or07::Entry<C::D>
where
C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>,
<C as crate::RaftTypeConfig>::D: or07::AppData + Debug,
{
fn upgrade(self) -> crate::Entry<C> {
let log_id = self.log_id.upgrade();
let payload = self.payload.upgrade();
crate::Entry { log_id, payload }
}
}

impl<C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>> Upgrade<crate::Entry<C>> for Entry<C> {
fn upgrade(self) -> crate::Entry<C> {
crate::Entry {
log_id: self.log_id.upgrade(),
payload: self.payload.upgrade(),
}
}
}
43 changes: 43 additions & 0 deletions openraft/src/compat/compat07/entry_payload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::fmt::Debug;

use super::Membership;
use crate::compat::Upgrade;

/// v0.7 compatible EntryPayload.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::EntryPayload = serde_json::from_slice::<compat07::EntryPayload>(&serialized_bytes)?.upgrade()
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum EntryPayload<C: crate::RaftTypeConfig> {
Blank,
Normal(C::D),
Membership(Membership),
}

impl<C> Upgrade<crate::EntryPayload<C>> for or07::EntryPayload<C::D>
where
C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>,
<C as crate::RaftTypeConfig>::D: or07::AppData + Debug,
{
fn upgrade(self) -> crate::EntryPayload<C> {
match self {
Self::Blank => crate::EntryPayload::Blank,
Self::Membership(m) => crate::EntryPayload::Membership(m.upgrade()),
Self::Normal(d) => crate::EntryPayload::Normal(d),
}
}
}

impl<C: crate::RaftTypeConfig<NodeId = u64, Node = crate::EmptyNode>> Upgrade<crate::EntryPayload<C>>
for EntryPayload<C>
{
fn upgrade(self) -> crate::EntryPayload<C> {
match self {
EntryPayload::Blank => crate::EntryPayload::Blank,
EntryPayload::Normal(d) => crate::EntryPayload::Normal(d),
EntryPayload::Membership(m) => crate::EntryPayload::Membership(m.upgrade()),
}
}
}
17 changes: 17 additions & 0 deletions openraft/src/compat/compat07/log_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::compat::Compat;
use crate::compat::Upgrade;

/// v0.7 compatible LogId.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::LogId = serde_json::from_slice::<compat07::LogId>(&serialized_bytes)?.upgrade()
/// ```
pub type LogId = Compat<or07::LogId, crate::LogId<u64>>;

impl Upgrade<crate::LogId<u64>> for or07::LogId {
fn upgrade(self) -> crate::LogId<u64> {
let committed_leader_id = crate::CommittedLeaderId::new(self.term, 0);
crate::LogId::new(committed_leader_id, self.index)
}
}
36 changes: 36 additions & 0 deletions openraft/src/compat/compat07/membership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Debug;

use crate::compat::Upgrade;

/// v0.7 compatible Membership.
///
/// To load from either v0.7 or the latest format data and upgrade it to the latest type:
/// ```ignore
/// let x:openraft::Membership = serde_json::from_slice::<compat07::Membership>(&serialized_bytes)?.upgrade()
/// ```
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Membership {
pub configs: Vec<BTreeSet<u64>>,
pub nodes: Option<BTreeMap<u64, crate::EmptyNode>>,
pub all_nodes: Option<BTreeSet<u64>>,
}

impl Upgrade<crate::Membership<u64, crate::EmptyNode>> for or07::Membership {
fn upgrade(self) -> crate::Membership<u64, crate::EmptyNode> {
let configs = self.get_configs().clone();
let nodes = self.all_nodes().iter().map(|nid| (*nid, crate::EmptyNode::new())).collect::<BTreeMap<_, _>>();
crate::Membership::new(configs, nodes)
}
}

impl Upgrade<crate::Membership<u64, crate::EmptyNode>> for Membership {
fn upgrade(self) -> crate::Membership<u64, crate::EmptyNode> {
if let Some(ns) = self.nodes {
crate::Membership::new(self.configs, ns)
} else {
crate::Membership::new(self.configs, self.all_nodes.unwrap())
}
}
}
Loading

0 comments on commit 3433126

Please sign in to comment.