Skip to content

Commit

Permalink
A0-1592: make connection manager operate of validator network directly (
Browse files Browse the repository at this point in the history
#790)

* Make connection manager operate of validator network directly

* Fix and simplify network service tests
  • Loading branch information
maciejnems authored Dec 7, 2022
1 parent 5acf27d commit 4f29b37
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 470 deletions.
43 changes: 22 additions & 21 deletions finality-aleph/src/network/io.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
use futures::channel::mpsc;

use crate::network::{
manager::{DataInSession, VersionedAuthentication},
ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO,
use crate::{
network::{
manager::{DataInSession, VersionedAuthentication},
ConnectionManagerIO, Data, Multiaddress, NetworkServiceIO as NetworkIO, SessionManagerIO,
},
validator_network::{Network as ValidatorNetwork, PublicKey},
};

type AuthenticationNetworkIO<D, M> = NetworkIO<VersionedAuthentication<M>, DataInSession<D>, M>;
type AuthenticationNetworkIO<M> = NetworkIO<VersionedAuthentication<M>>;

pub fn setup<D: Data, M: Multiaddress + 'static>() -> (
ConnectionManagerIO<D, M>,
AuthenticationNetworkIO<D, M>,
pub fn setup<
D: Data,
M: Multiaddress + 'static,
VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>,
>(
validator_network: VN,
) -> (
ConnectionManagerIO<D, M, VN>,
AuthenticationNetworkIO<M>,
SessionManagerIO<D>,
) {
)
where
M::PeerId: PublicKey,
{
// Prepare and start the network
let (commands_for_network, commands_from_io) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::unbounded();
let (messages_for_network, messages_from_user) = mpsc::unbounded();
let (commands_for_service, commands_from_user) = mpsc::unbounded();
let (messages_for_service, commands_from_manager) = mpsc::unbounded();
let (data_for_user, data_from_network) = mpsc::unbounded();
let (messages_for_user, messages_from_network) = mpsc::unbounded();

let connection_io = ConnectionManagerIO::new(
commands_for_network,
data_for_network,
messages_for_network,
commands_from_user,
commands_from_manager,
data_from_network,
messages_from_network,
validator_network,
);
let channels_for_network = NetworkIO::new(
data_from_user,
messages_from_user,
data_for_user,
messages_for_user,
commands_from_io,
);
let channels_for_network = NetworkIO::new(messages_from_user, messages_for_user);
let channels_for_session_manager =
SessionManagerIO::new(commands_for_service, messages_for_service);

Expand Down
69 changes: 39 additions & 30 deletions finality-aleph/src/network/manager/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
},
AddressedData, ConnectionCommand, Data, Multiaddress, NetworkIdentity, PeerId,
},
validator_network::{Network as ValidatorNetwork, PublicKey},
MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL,
};

Expand Down Expand Up @@ -595,22 +596,22 @@ impl<NI: NetworkIdentity, D: Data> Service<NI, D> {
}
}

/// Input/output interface for the connectiona manager service.
pub struct IO<D: Data, M: Multiaddress> {
commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
data_for_network: mpsc::UnboundedSender<AddressedData<DataInSession<D>, M::PeerId>>,
/// Input/output interface for the connection manager service.
pub struct IO<D: Data, M: Multiaddress, VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>>
where
M::PeerId: PublicKey,
{
authentications_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
commands_from_user: mpsc::UnboundedReceiver<SessionCommand<D>>,
messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>,
data_from_network: mpsc::UnboundedReceiver<DataInSession<D>>,
authentications_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
validator_network: VN,
}

/// Errors that can happen during the network service operations.
#[derive(Debug, PartialEq, Eq)]
pub enum Error {
NetworkSend,
CommandSend,
/// Should never be fatal.
UserSend,
/// Should never be fatal.
Expand All @@ -620,31 +621,28 @@ pub enum Error {
NetworkChannel,
}

impl<D: Data, M: Multiaddress> IO<D, M> {
impl<D: Data, M: Multiaddress, VN: ValidatorNetwork<M::PeerId, M, DataInSession<D>>> IO<D, M, VN>
where
M::PeerId: PublicKey,
{
pub fn new(
commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
data_for_network: mpsc::UnboundedSender<AddressedData<DataInSession<D>, M::PeerId>>,
authentications_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
commands_from_user: mpsc::UnboundedReceiver<SessionCommand<D>>,
messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>,
data_from_network: mpsc::UnboundedReceiver<DataInSession<D>>,
authentications_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
) -> IO<D, M> {
validator_network: VN,
) -> IO<D, M, VN> {
IO {
commands_for_network,
data_for_network,
authentications_for_network,
commands_from_user,
messages_from_user,
data_from_network,
authentications_from_network,
validator_network,
}
}

fn send_data(&self, to_send: AddressedData<DataInSession<D>, M::PeerId>) -> Result<(), Error> {
self.data_for_network
.unbounded_send(to_send)
.map_err(|_| Error::NetworkSend)
fn send_data(&self, to_send: AddressedData<DataInSession<D>, M::PeerId>) {
self.validator_network.send(to_send.0, to_send.1)
}

fn send_authentication(&self, to_send: DiscoveryMessage<M>) -> Result<(), Error> {
Expand All @@ -653,21 +651,32 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
.map_err(|_| Error::NetworkSend)
}

fn send_command(&self, to_send: ConnectionCommand<M>) -> Result<(), Error> {
self.commands_for_network
.unbounded_send(to_send)
.map_err(|_| Error::CommandSend)
fn handle_connection_command(&mut self, connection_command: ConnectionCommand<M>) {
match connection_command {
ConnectionCommand::AddReserved(addresses) => {
for multi in addresses {
if let Some(peer_id) = multi.get_peer_id() {
self.validator_network.add_connection(peer_id, vec![multi]);
}
}
}
ConnectionCommand::DelReserved(peers) => {
for peer in peers {
self.validator_network.remove_connection(peer);
}
}
};
}

fn send(
&self,
fn handle_service_actions(
&mut self,
ServiceActions {
maybe_command,
maybe_message,
}: ServiceActions<M>,
) -> Result<(), Error> {
if let Some(command) = maybe_command {
self.send_command(command)?;
self.handle_connection_command(command);
}
if let Some(message) = maybe_message {
self.send_authentication(message)?;
Expand Down Expand Up @@ -695,7 +704,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received a command from user");
match maybe_command {
Some(command) => match service.on_command(command).await {
Ok(to_send) => self.send(to_send)?,
Ok(to_send) => self.handle_service_actions(to_send)?,
Err(e) => warn!(target: "aleph-network", "Failed to update handler: {:?}", e),
},
None => return Err(Error::CommandsChannel),
Expand All @@ -705,12 +714,12 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received a message from user");
match maybe_message {
Some((message, session_id, recipient)) => for message in service.on_user_message(message, session_id, recipient) {
self.send_data(message)?;
self.send_data(message);
},
None => return Err(Error::MessageChannel),
}
},
maybe_data = self.data_from_network.next() => {
maybe_data = self.validator_network.next() => {
trace!(target: "aleph-network", "Manager received some data from network");
match maybe_data {
Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) {
Expand All @@ -727,7 +736,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
trace!(target: "aleph-network", "Manager received an authentication from network");
match maybe_authentication {
Some(authentication) => match authentication.try_into() {
Ok(message) => self.send(service.on_discovery_message(message))?,
Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?,
Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e),
},
None => return Err(Error::NetworkChannel),
Expand All @@ -736,7 +745,7 @@ impl<D: Data, M: Multiaddress> IO<D, M> {
_ = maintenance.tick() => {
debug!(target: "aleph-network", "Manager starts maintenence");
match service.retry_session_start().await {
Ok(to_send) => self.send(to_send)?,
Ok(to_send) => self.handle_service_actions(to_send)?,
Err(e) => warn!(target: "aleph-network", "Retry failed to update handler: {:?}", e),
}
for to_send in service.discovery() {
Expand Down
42 changes: 1 addition & 41 deletions finality-aleph/src/network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use tokio::time::timeout;

use crate::{
crypto::{AuthorityPen, AuthorityVerifier},
network::{
manager::VersionedAuthentication, AddressedData, ConnectionCommand, Event, EventStream,
Multiaddress, Network, NetworkIdentity, NetworkSender, NetworkServiceIO, Protocol,
},
network::{Event, EventStream, Network, NetworkIdentity, NetworkSender, Protocol},
testing::mocks::validator_network::{random_identity, MockMultiaddress},
validator_network::mock::MockPublicKey,
AuthorityId, NodeIndex,
Expand Down Expand Up @@ -101,43 +98,6 @@ pub type MockEvent = Event<MockMultiaddress, MockPublicKey>;

pub type MockData = Vec<u8>;

pub struct MockIO<M: Multiaddress> {
pub messages_for_network: mpsc::UnboundedSender<VersionedAuthentication<M>>,
pub data_for_network: mpsc::UnboundedSender<AddressedData<MockData, M::PeerId>>,
pub messages_from_network: mpsc::UnboundedReceiver<VersionedAuthentication<M>>,
pub data_from_network: mpsc::UnboundedReceiver<MockData>,
pub commands_for_network: mpsc::UnboundedSender<ConnectionCommand<M>>,
}

impl<M: Multiaddress + 'static> MockIO<M> {
pub fn new() -> (
MockIO<M>,
NetworkServiceIO<VersionedAuthentication<M>, MockData, M>,
) {
let (messages_for_network, messages_from_user) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::unbounded();
let (messages_for_user, messages_from_network) = mpsc::unbounded();
let (data_for_user, data_from_network) = mpsc::unbounded();
let (commands_for_network, commands_from_manager) = mpsc::unbounded();
(
MockIO {
messages_for_network,
data_for_network,
messages_from_network,
data_from_network,
commands_for_network,
},
NetworkServiceIO::new(
data_from_user,
messages_from_user,
data_for_user,
messages_for_user,
commands_from_manager,
),
)
}
}

pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send
}

/// Represents the address of an arbitrary node.
pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync {
pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static {
type PeerId: PeerId;

/// Returns the peer id associated with this multiaddress if it exists and is unique.
Expand Down
Loading

0 comments on commit 4f29b37

Please sign in to comment.