Skip to content

Commit

Permalink
Improve: getting a snapshot does not block RaftCore task
Browse files Browse the repository at this point in the history
`RaftCore` no longer blocks on receiving a snapshot from the state-machine
worker while replicating a snapshot. Instead, it sends the `Receiver` to
the replication task and the replication task blocks on receiving the
snapshot.
  • Loading branch information
drmingdrmer committed Apr 16, 2023
1 parent 840f207 commit dcd18c5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 27 deletions.
20 changes: 8 additions & 12 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1476,25 +1476,21 @@ where
let _ = node.tx_repl.send(Replicate::logs(id, log_id_range));
}
Inflight::Snapshot { id, last_log_id } => {
// TODO(2): move to another task.
let _ = last_log_id;

// Create a channel to let state machine worker to send the snapshot and the replication
// worker to receive it.
let (tx, rx) = oneshot::channel();

let cmd = sm::Command::get_snapshot(0, tx);
self.sm_handle
.send(cmd)
.map_err(|e| StorageIOError::read_snapshot(None, AnyError::error(e)))?;

let snapshot =
rx.await.map_err(|e| StorageIOError::read_snapshot(None, AnyError::error(e)))?;

tracing::debug!("snapshot: {}", snapshot.as_ref().map(|x| &x.meta).summary());

if let Some(snapshot) = snapshot {
debug_assert_eq!(last_log_id, snapshot.meta.last_log_id);
let _ = node.tx_repl.send(Replicate::snapshot(id, snapshot));
} else {
unreachable!("No snapshot");
}
// unwrap: The replication channel must not be dropped or it is a bug.
node.tx_repl.send(Replicate::snapshot(id, rx)).map_err(|_e| {
StorageIOError::read_snapshot(None, AnyError::error("replication channel closed"))
})?;
}
}
} else {
Expand Down
7 changes: 5 additions & 2 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where

#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self) -> Result<SnapshotMeta<C::NodeId, C::Node>, StorageError<C::NodeId>> {
// TODO: move it to another task
// TODO(3): move it to another task
// use futures::future::abortable;
// let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await });

Expand All @@ -240,7 +240,10 @@ where

let snapshot = self.state_machine.get_current_snapshot().await?;

tracing::info!("sending back snapshot");
tracing::info!(
"sending back snapshot: meta: {:?}",
snapshot.as_ref().map(|s| s.meta.summary())
);
let _ = tx.send(snapshot);
Ok(())
}
Expand Down
50 changes: 37 additions & 13 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use std::fmt::Formatter;
use std::io::SeekFrom;
use std::sync::Arc;

use anyerror::AnyError;
use futures::future::FutureExt;
pub(crate) use replication_session_id::ReplicationSessionId;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeek;
use tokio::io::AsyncSeekExt;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio::time::timeout;
Expand Down Expand Up @@ -45,6 +47,8 @@ use crate::RPCTypes;
use crate::RaftNetwork;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StorageIOError;
use crate::ToStorageResult;

/// The handle to a spawned replication stream.
Expand Down Expand Up @@ -169,7 +173,7 @@ where
repl_id = id;
match r_action {
Payload::Logs(log_id_range) => self.send_log_entries(id, log_id_range).await,
Payload::Snapshot(snapshot) => self.stream_snapshot(id, snapshot).await,
Payload::Snapshot(snapshot_rx) => self.stream_snapshot(id, snapshot_rx).await,
}
}
};
Expand Down Expand Up @@ -447,13 +451,33 @@ where
}
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
#[tracing::instrument(level = "info", skip_all)]
async fn stream_snapshot(
&mut self,
id: u64,
mut snapshot: Snapshot<C::NodeId, C::Node, SM::SnapshotData>,
rx: oneshot::Receiver<Option<Snapshot<C::NodeId, C::Node, SM::SnapshotData>>>,
) -> Result<(), ReplicationError<C::NodeId, C::Node>> {
tracing::debug!(id = display(id), snapshot = debug(&snapshot.meta), "stream_snapshot",);
tracing::info!(id = display(id), "{}", func_name!());

let snapshot = rx.await.map_err(|e| {
let io_err = StorageIOError::read_snapshot(None, AnyError::error(e));
StorageError::IO { source: io_err }
})?;

tracing::info!(
"received snapshot: id={}; meta:{}",
id,
snapshot.as_ref().map(|x| &x.meta).summary()
);

let mut snapshot = match snapshot {
None => {
let io_err = StorageIOError::read_snapshot(None, AnyError::error("snapshot not found"));
let sto_err = StorageError::IO { source: io_err };
return Err(ReplicationError::StorageError(sto_err));
}
Some(x) => x,
};

let err_x = || (ErrorSubject::Snapshot(Some(snapshot.meta.signature())), ErrorVerb::Read);

Expand Down Expand Up @@ -572,8 +596,8 @@ where
Payload::Logs(log_id_range) => {
format!("Logs{{id={}, {}}}", self.id, log_id_range)
}
Payload::Snapshot(snapshot) => {
format!("Snapshot{{id={}, {}}}", self.id, snapshot.meta.summary())
Payload::Snapshot(_) => {
format!("Snapshot{{id={}}}", self.id)
}
}
}
Expand All @@ -592,10 +616,10 @@ where
}
}

fn new_snapshot(id: u64, snapshot: Snapshot<NID, N, SD>) -> Self {
fn new_snapshot(id: u64, snapshot_rx: oneshot::Receiver<Option<Snapshot<NID, N, SD>>>) -> Self {
Self {
id,
payload: Payload::Snapshot(snapshot),
payload: Payload::Snapshot(snapshot_rx),
}
}
}
Expand All @@ -610,7 +634,7 @@ where
SD: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
Logs(LogIdRange<NID>),
Snapshot(Snapshot<NID, N, SD>),
Snapshot(oneshot::Receiver<Option<Snapshot<NID, N, SD>>>),
}

impl<NID, N, SD> Debug for Payload<NID, N, SD>
Expand All @@ -624,8 +648,8 @@ where
Self::Logs(log_id_range) => {
write!(f, "Logs({})", log_id_range)
}
Self::Snapshot(snapshot) => {
write!(f, "Snapshot({:?})", snapshot.meta)
Self::Snapshot(_) => {
write!(f, "Snapshot()")
}
}
}
Expand Down Expand Up @@ -665,8 +689,8 @@ where
Self::Data(Data::new_logs(id, log_id_range))
}

pub(crate) fn snapshot(id: u64, snapshot: Snapshot<NID, N, SD>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot))
pub(crate) fn snapshot(id: u64, snapshot_rx: oneshot::Receiver<Option<Snapshot<NID, N, SD>>>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot_rx))
}
}

Expand Down

0 comments on commit dcd18c5

Please sign in to comment.