Skip to content

Commit

Permalink
Merge pull request #803 from drmingdrmer/50-serialize-snapshot-receive
Browse files Browse the repository at this point in the history
Improve: move receiving snapshot chunk to sm::Worker
  • Loading branch information
drmingdrmer authored Apr 27, 2023
2 parents d9dd7d0 + 5415420 commit aece016
Show file tree
Hide file tree
Showing 25 changed files with 337 additions and 206 deletions.
6 changes: 6 additions & 0 deletions openraft/src/core/command_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[derive(Debug, Clone)]
#[derive(Default)]
pub(crate) struct CommandState {
/// The sequence number of the last finished sm command.
pub(crate) finished_sm_seq: u64,
}
1 change: 1 addition & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! storage or forward messages to other raft nodes.

pub(crate) mod balancer;
pub(crate) mod command_state;
pub(crate) mod notify;
mod raft_core;
mod replication_state;
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/core/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ where C: RaftTypeConfig
},
}

impl<C> Notify<C>
where C: RaftTypeConfig
{
pub(crate) fn sm(command_result: sm::CommandResult<C>) -> Self {
Self::StateMachine { command_result }
}
}

impl<C> MessageSummary<Notify<C>> for Notify<C>
where C: RaftTypeConfig
{
Expand Down
122 changes: 38 additions & 84 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use tracing::Span;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::balancer::Balancer;
use crate::core::command_state::CommandState;
use crate::core::notify::Notify;
use crate::core::sm;
use crate::core::sm::CommandSeq;
use crate::core::ServerState;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplaySlice;
Expand All @@ -42,7 +44,6 @@ use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
Expand All @@ -61,7 +62,6 @@ use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteTx;
use crate::raft::ExternalCommand;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::InstallSnapshotTx;
use crate::raft::RaftMsg;
use crate::raft::ResultSender;
Expand Down Expand Up @@ -195,6 +195,8 @@ where

pub(crate) tx_metrics: watch::Sender<RaftMetrics<C::NodeId, C::Node>>,

pub(crate) command_state: CommandState,

pub(crate) span: Span,

pub(crate) _p: PhantomData<SM>,
Expand Down Expand Up @@ -555,82 +557,16 @@ where
/// Invoked by leader to send chunks of a snapshot to a follower.
///
/// Leaders always send chunks in order. It is important to note that, according to the Raft
/// spec, a log may only have one snapshot at any time. As snapshot contents are application
/// specific, the Raft log will only store a pointer to the snapshot file along with the
/// index & term.
/// spec, a node may only have one snapshot at any time. As snapshot contents are application
/// specific.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn handle_install_snapshot_request(
pub(crate) fn handle_install_snapshot_request(
&mut self,
req: InstallSnapshotRequest<C>,
tx: InstallSnapshotTx<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
// TODO: move receiving to another thread.
tracing::info!(req = display(req.summary()));

let snapshot_meta = req.meta.clone();
let done = req.done;

let res = self.engine.vote_handler().accept_vote(&req.vote, tx, |state, _rejected| {
Ok(InstallSnapshotResponse {
vote: *state.vote_ref(),
})
});

let tx = match res {
Ok(tx) => tx,
Err(_) => return Ok(()),
};

// TODO(2): This is still blocking, to make it non-blocking, we need to move receiving to another
// thread.
let (recv_tx, recv_rx) = oneshot::channel::<Result<(), InstallSnapshotError>>();

let cmd = sm::Command::receive(req, recv_tx);

self.sm_handle.send(cmd).map_err(|_e| {
StorageIOError::write_snapshot(
Some(snapshot_meta.signature()),
AnyError::error("sm-worker channel closed"),
)
})?;

let recv_res = recv_rx.await.map_err(|_e| {
StorageIOError::write_snapshot(
Some(snapshot_meta.signature()),
AnyError::error("sm-worker channel closed"),
)
})?;

if let Err(e) = recv_res {
self.engine.output.push_command(Command::Respond {
when: None,
resp: Respond::new(Err(e), tx),
});
return Ok(());
}

let mut condition = None;

if done {
// If to install snapshot, we can only respond when snapshot is successfully installed.
condition = Some(Condition::Applied {
log_id: snapshot_meta.last_log_id,
});

self.engine.following_handler().install_snapshot(snapshot_meta);
}

self.engine.output.push_command(Command::Respond {
when: condition,
resp: Respond::new(
Ok(InstallSnapshotResponse {
vote: *self.engine.state.vote_ref(),
}),
tx,
),
});

Ok(())
) {
tracing::info!(req = display(req.summary()), "{}", func_name!());
self.engine.handle_install_snapshot(req, tx);
}

/// Trigger a snapshot building(log compaction) job if there is no pending building job.
Expand Down Expand Up @@ -717,6 +653,7 @@ where
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn apply_to_state_machine(
&mut self,
seq: CommandSeq,
since: u64,
upto_index: u64,
) -> Result<(), StorageError<C::NodeId>> {
Expand All @@ -743,7 +680,7 @@ where

let last_applied = *entries[entries.len() - 1].get_log_id();

let cmd = sm::Command::apply(entries);
let cmd = sm::Command::apply(entries).with_seq(seq);
self.sm_handle.send(cmd).map_err(|e| StorageIOError::apply(last_applied, AnyError::error(e)))?;

Ok(())
Expand Down Expand Up @@ -1141,11 +1078,11 @@ where
RaftMsg::InstallSnapshot { rpc, tx } => {
tracing::info!(
req = display(rpc.summary()),
"received RaftMst::IntallSnapshot: {}",
"received RaftMst::InstallSnapshot: {}",
func_name!()
);

self.handle_install_snapshot_request(rpc, tx).await?;
self.handle_install_snapshot_request(rpc, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
Expand Down Expand Up @@ -1335,6 +1272,14 @@ where
Notify::StateMachine { command_result } => {
tracing::debug!("sm::StateMachine command result: {:?}", command_result);

debug_assert!(
self.command_state.finished_sm_seq < command_result.command_seq,
"sm::StateMachine command result is out of order: expect {} < {}",
self.command_state.finished_sm_seq,
command_result.command_seq
);
self.command_state.finished_sm_seq = command_result.command_seq;

match command_result.result? {
sm::Response::BuildSnapshot(meta) => {
tracing::info!(
Expand All @@ -1344,9 +1289,6 @@ where
);
self.engine.finish_building_snapshot(meta);
}
sm::Response::GetSnapshot(_) => {
tracing::info!("sm::StateMachine command done: GetSnapshot: {}", func_name!());
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
}
Expand Down Expand Up @@ -1515,7 +1457,10 @@ where
SM: RaftStateMachine<C>,
{
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C::NodeId>> {
if let Some(condition) = cmd.condition() {
let condition = cmd.condition();
tracing::debug!("condition: {:?}", condition);

if let Some(condition) = condition {
match condition {
Condition::LogFlushed { .. } => {
todo!()
Expand All @@ -1530,8 +1475,16 @@ where
return Ok(Some(cmd));
}
}
Condition::StateMachineCommand { .. } => {
todo!()
Condition::StateMachineCommand { command_seq } => {
if self.command_state.finished_sm_seq < *command_seq {
tracing::debug!(
"sm::Command({}) has not yet finished({}), postpone cmd: {:?}",
command_seq,
self.command_state.finished_sm_seq,
cmd
);
return Ok(Some(cmd));
}
}
}
}
Expand Down Expand Up @@ -1584,10 +1537,11 @@ where
}
}
Command::Apply {
seq,
ref already_committed,
ref upto,
} => {
self.apply_to_state_machine(already_committed.next_index(), upto.index).await?;
self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?;
}
Command::Replicate { req, target } => {
if let Some(l) = &self.leader_data {
Expand Down
41 changes: 16 additions & 25 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use tokio::sync::oneshot;

use crate::display_ext::DisplaySlice;
use crate::error::InstallSnapshotError;
use crate::log_id::RaftLogId;
use crate::raft::InstallSnapshotRequest;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;
Expand All @@ -35,26 +33,24 @@ where C: RaftTypeConfig
impl<C> Command<C>
where C: RaftTypeConfig
{
/// Generate the next command seq with atomic increment.
#[allow(dead_code)]
fn next_seq() -> CommandSeq {
static SEQ: AtomicU64 = AtomicU64::new(1);
SEQ.fetch_add(1, Ordering::Relaxed)
}

pub(crate) fn new(payload: CommandPayload<C>) -> Self {
Self {
// seq: Self::next_seq(),
seq: 0,
payload,
}
Self { seq: 0, payload }
}

#[allow(dead_code)]
pub(crate) fn seq(&self) -> CommandSeq {
self.seq
}

pub(crate) fn with_seq(mut self, seq: CommandSeq) -> Self {
self.seq = seq;
self
}

pub(crate) fn set_seq(&mut self, seq: CommandSeq) {
self.seq = seq;
}

pub(crate) fn build_snapshot() -> Self {
let payload = CommandPayload::BuildSnapshot;
Command::new(payload)
Expand All @@ -65,14 +61,12 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn receive(
req: InstallSnapshotRequest<C>,
tx: oneshot::Sender<Result<(), InstallSnapshotError>>,
) -> Self {
let payload = CommandPayload::ReceiveSnapshotChunk { req, tx };
pub(crate) fn receive(req: InstallSnapshotRequest<C>) -> Self {
let payload = CommandPayload::ReceiveSnapshotChunk { req };
Command::new(payload)
}

// TODO: all sm command should have a command seq.
pub(crate) fn install_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
let payload = CommandPayload::FinalizeSnapshot {
install: true,
Expand Down Expand Up @@ -117,10 +111,7 @@ where C: RaftTypeConfig
/// If it is the final chunk, the snapshot stream will be closed and saved.
///
/// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot.
ReceiveSnapshotChunk {
req: InstallSnapshotRequest<C>,
tx: oneshot::Sender<Result<(), InstallSnapshotError>>,
},
ReceiveSnapshotChunk { req: InstallSnapshotRequest<C> },

/// After receiving all chunks, finalize the snapshot by installing it or discarding it,
/// if the snapshot is stale(the snapshot last log id is smaller than the local committed).
Expand All @@ -142,7 +133,7 @@ where C: RaftTypeConfig
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
CommandPayload::ReceiveSnapshotChunk { req, .. } => {
write!(f, "ReceiveSnapshotChunk: {:?}", req)
write!(f, "ReceiveSnapshotChunk: {}", req.summary())
}
CommandPayload::FinalizeSnapshot { install, snapshot_meta } => {
write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta)
Expand Down
Loading

0 comments on commit aece016

Please sign in to comment.