Skip to content

Commit

Permalink
Change: remove ClientWriteRequest
Browse files Browse the repository at this point in the history
Remove struct `ClientWriteRequest`.
`ClientWriteRequest` is barely a wrapper that does not provide any
additional function.

`Raft::client_write(ClientWriteRequest)` is changed to
`Raft::client_write(app_data: D)`, where `D` is application defined
`AppData` implementation.

- Fix: #171
  • Loading branch information
drmingdrmer committed Aug 1, 2022
1 parent a3175a6 commit 8c7f085
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 72 deletions.
5 changes: 1 addition & 4 deletions examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use actix_web::web::Data;
use actix_web::Responder;
use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::raft::ClientWriteRequest;
use openraft::EntryPayload;
use web::Json;

use crate::app::ExampleApp;
Expand All @@ -23,8 +21,7 @@ use crate::ExampleNodeId;
*/
#[post("/write")]
pub async fn write(app: Data<ExampleApp>, req: Json<ExampleRequest>) -> actix_web::Result<impl Responder> {
let request = ClientWriteRequest::new(EntryPayload::Normal(req.0));
let response = app.raft.client_write(request).await;
let response = app.raft.client_write(req.0).await;
Ok(Json(response))
}

Expand Down
5 changes: 1 addition & 4 deletions examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::sync::Arc;

use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::raft::ClientWriteRequest;
use openraft::EntryPayload;
use tide::Body;
use tide::Request;
use tide::Response;
Expand All @@ -30,8 +28,7 @@ pub fn rest(app: &mut Server) {
*/
async fn write(mut req: Request<Arc<ExampleApp>>) -> tide::Result {
let body = req.body_json().await?;
let request = ClientWriteRequest::new(EntryPayload::Normal(body));
let res = req.state().raft.client_write(request).await;
let res = req.state().raft.client_write(body).await;
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}

Expand Down
30 changes: 11 additions & 19 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,9 +1324,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.reject_with_forward_to_leader(tx);
}
}
RaftMsg::ClientWriteRequest { rpc, tx } => {
RaftMsg::ClientWriteRequest { payload: rpc, tx } => {
if is_leader() {
self.write_entry(rpc.payload, Some(tx)).await?;
self.write_entry(rpc, Some(tx)).await?;
} else {
self.reject_with_forward_to_leader(tx);
}
Expand Down Expand Up @@ -1454,31 +1454,23 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
target: C::NodeId,
result: Result<LogId<C::NodeId>, String>,
) -> Result<(), StorageError<C::NodeId>> {
// Update target's match index & check if it is awaiting removal.

tracing::debug!(
target = display(target),
result = debug(&result),
"handle_update_matched"
);

// TODO(xp): a leader has to refuse a message from a previous leader.
if let Some(l) = &self.leader_data {
if !l.nodes.contains_key(&target) {
return Ok(());
};
} else {
// no longer a leader.
tracing::warn!(
target = display(target),
result = debug(&result),
"received replication update but no longer a leader"
);
return Ok(());
if tracing::enabled!(Level::DEBUG) {
if let Some(l) = &self.leader_data {
if !l.nodes.contains_key(&target) {
tracing::warn!("leader has removed target: {}", target);
return Ok(());
};
} else {
unreachable!("no longer a leader, received message from previous leader");
}
}

tracing::debug!("update matched: {:?}", result);

let matched = match result {
Ok(matched) => matched,
Err(_err_str) => {
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ impl<NID: NodeId> Engine<NID> {
}

// Seen a higher log.
// TODO: if already installed a timer with can_be_leader==false, it should not install a timer with
// can_be_leader==true.
if resp.last_log_id > self.state.last_log_id() {
self.push_command(Command::InstallElectionTimer { can_be_leader: false });
} else {
Expand Down
54 changes: 13 additions & 41 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
///
/// These are application specific requirements, and must be implemented by the application which is
/// being built on top of Raft.
#[tracing::instrument(level = "debug", skip(self, rpc))]
pub async fn client_write(
&self,
rpc: ClientWriteRequest<C>,
) -> Result<ClientWriteResponse<C>, ClientWriteError<C::NodeId>> {
#[tracing::instrument(level = "debug", skip_all)]
pub async fn client_write(&self, app_data: C::D) -> Result<ClientWriteResponse<C>, ClientWriteError<C::NodeId>> {
let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::ClientWriteRequest { rpc, tx }, rx).await
self.call_core(
RaftMsg::ClientWriteRequest {
payload: EntryPayload::Normal(app_data),
tx,
},
rx,
)
.await
}

/// Initialize a pristine Raft node with the given config.
Expand Down Expand Up @@ -772,7 +776,7 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
},

ClientWriteRequest {
rpc: ClientWriteRequest<C>,
payload: EntryPayload<C>,
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>,
},
CheckIsLeaderRequest {
Expand Down Expand Up @@ -888,7 +892,7 @@ where
RaftMsg::SnapshotUpdate { update } => {
format!("SnapshotUpdate: {:?}", update)
}
RaftMsg::ClientWriteRequest { rpc, .. } => {
RaftMsg::ClientWriteRequest { payload: rpc, .. } => {
format!("ClientWriteRequest: {}", rpc.summary())
}
RaftMsg::CheckIsLeaderRequest { .. } => "CheckIsLeaderRequest".to_string(),
Expand Down Expand Up @@ -1118,39 +1122,7 @@ pub struct InstallSnapshotResponse<NID: NodeId> {
pub vote: Vote<NID>,
}

//////////////////////////////////////////////////////////////////////////////////////////////////

/// An application specific client request to update the state of the system (§5.1).
///
/// The entry of this payload will be appended to the Raft log and then applied to the Raft state
/// machine according to the Raft protocol.
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct ClientWriteRequest<C: RaftTypeConfig> {
/// The application specific contents of this client request.
pub(crate) payload: EntryPayload<C>,
}

impl<C: RaftTypeConfig> Debug for ClientWriteRequest<C>
where C::D: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientWriteRequest").field("payload", &self.payload).finish()
}
}

impl<C: RaftTypeConfig> MessageSummary<ClientWriteRequest<C>> for ClientWriteRequest<C> {
fn summary(&self) -> String {
self.payload.summary()
}
}

impl<C: RaftTypeConfig> ClientWriteRequest<C> {
pub fn new(entry: EntryPayload<C>) -> Self {
Self { payload: entry }
}
}

/// The response to a `ClientRequest`.
/// The response to a client-request.
#[cfg_attr(
feature = "serde",
derive(serde::Deserialize, serde::Serialize),
Expand Down
5 changes: 1 addition & 4 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use openraft::metrics::Wait;
use openraft::raft::AddLearnerResponse;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::ClientWriteRequest;
use openraft::raft::InstallSnapshotRequest;
use openraft::raft::InstallSnapshotResponse;
use openraft::raft::VoteRequest;
Expand Down Expand Up @@ -651,9 +650,7 @@ where
.clone()
};

let payload = EntryPayload::<C>::Normal(req);

node.0.client_write(ClientWriteRequest::<C>::new(payload)).await.map(|res| res.data)
node.0.client_write(req).await.map(|res| res.data)
}

/// Assert that the cluster is in a pristine state, with all nodes as learners.
Expand Down

0 comments on commit 8c7f085

Please sign in to comment.