Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace generic broker message type with enum #80

Merged
merged 9 commits into from
Oct 16, 2023
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 6 additions & 11 deletions solar/src/actors/muxrpc/blobs_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ use log::{info, trace, warn};

use crate::{
actors::muxrpc::handler::{RpcHandler, RpcInput},
broker::ChBrokerSend,
broker::{BrokerMessage, ChBrokerSend},
node::BLOB_STORE,
storage::blob::ToBlobHashId,
Result,
};

pub enum RpcBlobsGetEvent {
Get(dto::BlobsGetIn),
}
#[derive(Debug, Clone)]
pub struct RpcBlobsGetEvent(pub dto::BlobsGetIn);

pub struct BlobsGetHandler<W>
where
Expand Down Expand Up @@ -69,18 +68,14 @@ where
_ => {}
}
}
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamRespose()) => {
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
return self.recv_cancelstream(api, *req_no).await;
}
RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => {
return self.recv_rpc_response(api, *req_no, res).await;
}
RpcInput::Message(msg) => {
if let Some(get_event) = msg.downcast_ref::<RpcBlobsGetEvent>() {
match get_event {
RpcBlobsGetEvent::Get(req) => return self.event_get(api, req).await,
}
}
RpcInput::Message(BrokerMessage::RpcBlobsGet(RpcBlobsGetEvent(req))) => {
return self.event_get(api, req).await;
}
_ => {}
}
Expand Down
27 changes: 9 additions & 18 deletions solar/src/actors/muxrpc/blobs_wants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ use log::{trace, warn};

use crate::{
actors::muxrpc::handler::{RpcHandler, RpcInput},
broker::{BrokerEvent, ChBrokerSend, Destination},
broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination},
node::BLOB_STORE,
storage::blob::{StoBlobEvent, ToBlobHashId},
storage::blob::{StoreBlobEvent, ToBlobHashId},
Result,
};

enum RpcBlobsWantsEvent {
BroadcastWants(Vec<(String, i64)>),
}
#[derive(Debug, Clone)]
pub struct RpcBlobsWantsEvent(Vec<(String, i64)>);

#[derive(PartialEq)]
enum Wants {
Expand Down Expand Up @@ -153,18 +152,10 @@ where
}
}
RpcInput::Message(msg) => {
if let Some(wants_event) = msg.downcast_ref::<RpcBlobsWantsEvent>() {
match wants_event {
RpcBlobsWantsEvent::BroadcastWants(ids) => {
return self.event_wants_broadcast(api, ids).await
}
}
} else if let Some(stoblob_event) = msg.downcast_ref::<StoBlobEvent>() {
match stoblob_event {
StoBlobEvent::Added(blob_id) => {
return self.event_stoblob_added(api, blob_id).await
}
}
if let BrokerMessage::RpcBlobsWants(RpcBlobsWantsEvent(ids)) = msg {
return self.event_wants_broadcast(api, ids).await;
} else if let BrokerMessage::StoreBlob(StoreBlobEvent(blob_id)) = msg {
return self.event_stoblob_added(api, blob_id).await;
}
}
RpcInput::Timer => {
Expand Down Expand Up @@ -289,7 +280,7 @@ where
// broadcast other peers with the blobs I don't have
let broker_msg = BrokerEvent::new(
Destination::Broadcast,
RpcBlobsWantsEvent::BroadcastWants(broadcast),
BrokerMessage::RpcBlobsWants(RpcBlobsWantsEvent(broadcast)),
);
ch_broker.send(broker_msg).await.unwrap();

Expand Down
35 changes: 17 additions & 18 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use once_cell::sync::Lazy;
use regex::Regex;

use crate::{
actors::muxrpc::handler::{RpcHandler, RpcInput},
broker::{BrokerEvent, ChBrokerSend, Destination},
actors::muxrpc::{
blobs_get::RpcBlobsGetEvent,
handler::{RpcHandler, RpcInput},
},
broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination},
config::{PEERS_TO_REPLICATE, RESYNC_CONFIG, SECRET_CONFIG},
node::BLOB_STORE,
node::KV_STORE,
storage::kv::StoKvEvent,
storage::kv::StoreKvEvent,
Result,
};

Expand Down Expand Up @@ -76,26 +79,19 @@ where
self.recv_rpc_response(api, ch_broker, *req_no, res).await
}
// Handle an incoming MUXRPC 'cancel stream' response.
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamRespose()) => {
RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => {
self.recv_cancelstream(api, *req_no).await
}
// Handle an incoming MUXRPC error response.
RpcInput::Network(req_no, rpc::RecvMsg::ErrorResponse(err)) => {
self.recv_error_response(api, *req_no, err).await
}
// Handle a broker message.
RpcInput::Message(msg) => {
if let Some(kv_event) = msg.downcast_ref::<StoKvEvent>() {
match kv_event {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
StoKvEvent::IdChanged(id) => {
return self.recv_storageevent_idchanged(api, id).await
}
}
}
Ok(false)
RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(id))) => {
// Notification from the key-value store indicating that
// a new message has just been appended to the feed
// identified by `id`.
return self.recv_storageevent_idchanged(api, id).await;
}
// Handle a timer event.
RpcInput::Timer => self.on_timer(api).await,
Expand Down Expand Up @@ -239,12 +235,15 @@ where
// blobstore.
for key in self.extract_blob_refs(&msg) {
if !BLOB_STORE.read().await.exists(&key) {
let event = super::blobs_get::RpcBlobsGetEvent::Get(dto::BlobsGetIn {
let event = RpcBlobsGetEvent(dto::BlobsGetIn {
key,
size: None,
max: None,
});
let broker_msg = BrokerEvent::new(Destination::Broadcast, event);
let broker_msg = BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::RpcBlobsGet(event),
);
ch_broker.send(broker_msg).await.unwrap();
}
}
Expand Down
4 changes: 2 additions & 2 deletions solar/src/actors/muxrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ mod handler;
mod history_stream;
mod whoami;

pub use blobs_get::BlobsGetHandler;
pub use blobs_wants::BlobsWantsHandler;
pub use blobs_get::{BlobsGetHandler, RpcBlobsGetEvent};
pub use blobs_wants::{BlobsWantsHandler, RpcBlobsWantsEvent};
pub use get::GetHandler;
pub use handler::{RpcHandler, RpcInput};
pub use history_stream::HistoryStreamHandler;
Expand Down
43 changes: 32 additions & 11 deletions solar/src/actors/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::info;

use crate::{
actors::network::connection_manager::{ConnectionEvent, CONNECTION_MANAGER},
broker::*,
broker::{ActorEndpoint, Broker, BrokerEvent, BrokerMessage, Destination, BROKER},
config::{NETWORK_KEY, PEERS_TO_REPLICATE},
Result,
};
Expand Down Expand Up @@ -112,7 +112,10 @@ pub async fn actor(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Error(connection_data, err.to_string()),
BrokerMessage::Connection(ConnectionEvent::Error(
connection_data,
err.to_string(),
)),
))
.await?;
}
Expand All @@ -121,7 +124,9 @@ pub async fn actor(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Disconnected(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Disconnected(
connection_data.to_owned(),
)),
))
.await?;
}
Expand Down Expand Up @@ -160,7 +165,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Connecting(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Connecting(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -171,7 +178,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Handshaking(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Handshaking(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -185,7 +194,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Connected(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Connected(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -205,15 +216,19 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Connecting(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Connecting(
connection_data.to_owned(),
)),
))
.await?;

// Send 'handshaking' connection event message via the broker.
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Handshaking(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Handshaking(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -238,7 +253,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Connected(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Connected(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -259,7 +276,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Disconnecting(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Disconnecting(
connection_data.to_owned(),
)),
))
.await?;

Expand All @@ -286,7 +305,9 @@ pub async fn actor_inner(
ch_broker
.send(BrokerEvent::new(
Destination::Broadcast,
ConnectionEvent::Disconnecting(connection_data.to_owned()),
BrokerMessage::Connection(ConnectionEvent::Disconnecting(
connection_data.to_owned(),
)),
))
.await?;

Expand Down
Loading