Skip to content

Commit

Permalink
Change: EffectiveMembership.log_id to Option<LogId>
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Apr 5, 2022
1 parent 3830d24 commit 81cd344
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 22 deletions.
3 changes: 2 additions & 1 deletion example-raft-kv/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct ExampleSnapshot {
pub struct ExampleStateMachine {
pub last_applied_log: Option<LogId<ExampleNodeId>>,

// TODO: it should not be Option.
pub last_membership: Option<EffectiveMembership<ExampleTypeConfig>>,

/// Application data.
Expand Down Expand Up @@ -287,7 +288,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = Some(EffectiveMembership::new(entry.log_id, mem.clone()));
sm.last_membership = Some(EffectiveMembership::new(Some(entry.log_id), mem.clone()));
res.push(ExampleResponse { value: None })
}
};
Expand Down
2 changes: 1 addition & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
res.push(ClientResponse(previous));
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = Some(EffectiveMembership::new(entry.log_id, mem.clone()));
sm.last_membership = Some(EffectiveMembership::new(Some(entry.log_id), mem.clone()));
res.push(ClientResponse(None))
}
};
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
fn has_pending_config(&self) -> bool {
// The last membership config is not committed yet.
// Can not process the next one.
self.core.committed < Some(self.core.effective_membership.log_id)
self.core.committed < self.core.effective_membership.log_id
}

#[tracing::instrument(level = "debug", skip(self, tx))]
Expand All @@ -187,7 +187,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
if self.has_pending_config() {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::InProgress(InProgress {
membership_log_id: self.core.effective_membership.log_id,
// has_pending_config() implies an existing membership log.
membership_log_id: self.core.effective_membership.log_id.unwrap(),
}),
)));
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let last_conf_change = entries
.iter()
.filter_map(|ent| match &ent.payload {
EntryPayload::Membership(conf) => Some(EffectiveMembership::new(ent.log_id, conf.clone())),
EntryPayload::Membership(conf) => Some(EffectiveMembership::new(Some(ent.log_id), conf.clone())),
_ => None,
})
.last();
Expand Down
9 changes: 4 additions & 5 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ use crate::Update;
#[derive(Clone, Default, Eq, Serialize, Deserialize)]
pub struct EffectiveMembership<C: RaftTypeConfig> {
/// The id of the log that applies this membership config
/// TODO: this `log_id` should be an `Option<LogId>`.
pub log_id: LogId<C::NodeId>,
pub log_id: Option<LogId<C::NodeId>>,

pub membership: Membership<C::NodeId>,

Expand All @@ -103,7 +102,7 @@ impl<C: RaftTypeConfig> PartialEq for EffectiveMembership<C> {
}

impl<C: RaftTypeConfig> EffectiveMembership<C> {
pub fn new(log_id: LogId<C::NodeId>, membership: Membership<C::NodeId>) -> Self {
pub fn new(log_id: Option<LogId<C::NodeId>>, membership: Membership<C::NodeId>) -> Self {
let all_members = membership.build_member_ids();
Self {
log_id,
Expand Down Expand Up @@ -147,7 +146,7 @@ impl<C: RaftTypeConfig> EffectiveMembership<C> {

impl<C: RaftTypeConfig> MessageSummary for EffectiveMembership<C> {
fn summary(&self) -> String {
format!("{{log_id:{} membership:{}}}", self.log_id, self.membership.summary())
format!("{{log_id:{:?} membership:{}}}", self.log_id, self.membership.summary())
}
}

Expand Down Expand Up @@ -612,7 +611,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.last_log_id = Some(log_id);

if let EntryPayload::Membership(mem) = &entry.payload {
self.effective_membership = Arc::new(EffectiveMembership::new(entry.log_id, mem.clone()));
self.effective_membership = Arc::new(EffectiveMembership::new(Some(entry.log_id), mem.clone()));
}

Ok(entry)
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn test_wait() -> anyhow::Result<()> {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.membership_config = Arc::new(EffectiveMembership::new(
LogId::default(),
None,
Membership::new(vec![btreeset! {1,2}], None),
));
let rst = tx.send(update);
Expand All @@ -113,7 +113,7 @@ async fn test_wait() -> anyhow::Result<()> {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.membership_config = Arc::new(EffectiveMembership::new(
LogId::default(),
None,
Membership::new(vec![btreeset! {}, btreeset! {1,2}], None),
));
let rst = tx.send(update);
Expand Down Expand Up @@ -205,7 +205,7 @@ fn init_wait_test<C: RaftTypeConfig>() -> (RaftMetrics<C>, Wait<C>, watch::Sende
last_applied: None,
current_leader: None,
membership_config: Arc::new(EffectiveMembership::new(
LogId::default(),
None,
Membership::new(vec![btreeset! {}], None),
)),

Expand Down
8 changes: 4 additions & 4 deletions openraft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ where C: RaftTypeConfig
async fn get_membership(&mut self) -> Result<Option<EffectiveMembership<C>>, StorageError<C::NodeId>> {
let (_, sm_mem) = self.last_applied_state().await?;

let sm_mem_index = match &sm_mem {
let sm_mem_next_index = match &sm_mem {
None => 0,
Some(mem) => mem.log_id.index,
Some(mem) => mem.log_id.next_index(),
};

let log_mem = self.last_membership_in_log(sm_mem_index + 1).await?;
let log_mem = self.last_membership_in_log(sm_mem_next_index).await?;

if log_mem.is_some() {
return Ok(log_mem);
Expand Down Expand Up @@ -230,7 +230,7 @@ where C: RaftTypeConfig

for ent in entries.iter().rev() {
if let EntryPayload::Membership(ref mem) = ent.payload {
return Ok(Some(EffectiveMembership::new(ent.log_id, mem.clone())));
return Ok(Some(EffectiveMembership::new(Some(ent.log_id), mem.clone())));
}
}

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ where
assert_eq!(Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)), applied);
assert_eq!(
Some(EffectiveMembership::new(
LogId::new(LeaderId::new(1, NODE_ID.into()), 3),
Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)),
Membership::new(vec![btreeset! {1,2}], None)
)),
membership
Expand All @@ -658,7 +658,7 @@ where
assert_eq!(Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 5)), applied);
assert_eq!(
Some(EffectiveMembership::new(
LogId::new(LeaderId::new(1, NODE_ID.into()), 3),
Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)),
Membership::new(vec![btreeset! {1,2}], None)
)),
membership
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn initialization() -> Result<()> {
let sm_mem = sto.last_applied_state().await?.1;
assert_eq!(
Some(EffectiveMembership::new(
LogId::new(LeaderId::new(0, 0), 0),
Some(LogId::new(LeaderId::new(0, 0), 0)),
Membership::new(vec![btreeset! {0,1,2}], None)
)),
sm_mem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn state_machine_apply_membership() -> Result<()> {
let mut sto = router.get_storage_handle(&i)?;
assert_eq!(
Some(EffectiveMembership::new(
LogId::new(LeaderId::new(0, 0), 0),
Some(LogId::new(LeaderId::new(0, 0), 0)),
Membership::new(vec![btreeset! {0}], None)
)),
sto.last_applied_state().await?.1
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn state_machine_apply_membership() -> Result<()> {
let (_, last_membership) = sto.last_applied_state().await?;
assert_eq!(
Some(EffectiveMembership::new(
LogId::new(LeaderId::new(1, 0), log_index),
Some(LogId::new(LeaderId::new(1, 0), log_index)),
Membership::new(vec![btreeset! {0, 1, 2}], Some(btreeset! {3,4}))
)),
last_membership
Expand Down

0 comments on commit 81cd344

Please sign in to comment.