Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1586: At most one discovery message #734

Merged
merged 2 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 36 additions & 42 deletions finality-aleph/src/network/manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ impl<M: Multiaddress> Discovery<M> {
}
}

/// Returns messages that should be sent as part of authority discovery at this moment.
/// Returns a message that should be sent as part of authority discovery at this moment.
pub fn discover_authorities(
&mut self,
handler: &SessionHandler<M>,
) -> Vec<DiscoveryCommand<M>> {
) -> Option<DiscoveryCommand<M>> {
let authentication = match handler.authentication() {
Some(authentication) => authentication,
None => return Vec::new(),
None => return None,
};

let missing_authorities = handler.missing_nodes();
let node_count = handler.node_count();
info!(target: "aleph-network", "{}/{} authorities known for session {}.", node_count.0-missing_authorities.len(), node_count.0, handler.session_id().0);
vec![authentication_broadcast(authentication)]
Some(authentication_broadcast(authentication))
}

/// Checks the authentication using the handler and returns the addresses we should be
Expand All @@ -104,39 +104,37 @@ impl<M: Multiaddress> Discovery<M> {
&mut self,
authentication: Authentication<M>,
handler: &mut SessionHandler<M>,
) -> (Vec<M>, Vec<DiscoveryCommand<M>>) {
) -> (Vec<M>, Option<DiscoveryCommand<M>>) {
debug!(target: "aleph-network", "Handling broadcast with authentication {:?}.", authentication);
let addresses = self.handle_authentication(authentication.clone(), handler);
if addresses.is_empty() {
return (Vec::new(), Vec::new());
return (Vec::new(), None);
}
let node_id = authentication.0.creator();
let mut messages = Vec::new();
if self.should_rebroadcast(&node_id) {
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
messages.push(authentication_broadcast(authentication));
if !self.should_rebroadcast(&node_id) {
return (addresses, None);
}
(addresses, messages)
trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication);
self.last_broadcast.insert(node_id, Instant::now());
(addresses, Some(authentication_broadcast(authentication)))
}

/// Analyzes the provided message and returns all the new multiaddresses we should
/// be connected to if we want to stay connected to the committee and any messages
/// that we should send as a result of it.
/// be connected to if we want to stay connected to the committee and an optional
/// message that we should send as a result of it.
pub fn handle_message(
&mut self,
message: DiscoveryMessage<M>,
handler: &mut SessionHandler<M>,
) -> (Vec<M>, Vec<DiscoveryCommand<M>>) {
) -> (Vec<M>, Option<DiscoveryCommand<M>>) {
use DiscoveryMessage::*;
match message {
AuthenticationBroadcast(authentication) => {
self.handle_broadcast(authentication, handler)
}
Authentication(authentication) => (
self.handle_authentication(authentication, handler),
Vec::new(),
),
Authentication(authentication) => {
(self.handle_authentication(authentication, handler), None)
}
}
}
}
Expand Down Expand Up @@ -215,11 +213,9 @@ mod tests {
for num_nodes in 2..NUM_NODES {
let (mut discovery, mut handlers, _) = build_number(num_nodes).await;
let handler = &mut handlers[0];
let mut messages = discovery.discover_authorities(handler);
assert_eq!(messages.len(), 1);
let message = messages.pop().unwrap();
let message = discovery.discover_authorities(handler);
assert_eq!(
message,
message.expect("there is a discovery message"),
(
DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()),
DataCommand::Broadcast
Expand All @@ -231,41 +227,39 @@ mod tests {
#[tokio::test]
async fn non_validator_discover_authorities_returns_empty_vector() {
let (mut discovery, _, non_validator) = build().await;
let messages = discovery.discover_authorities(&non_validator);
assert!(messages.is_empty());
let message = discovery.discover_authorities(&non_validator);
assert!(message.is_none());
}

#[tokio::test]
async fn rebroadcasts_and_accepts_addresses() {
let (mut discovery, mut handlers, _) = build().await;
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
let (addresses, commands) = discovery.handle_message(
let (addresses, command) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert_eq!(commands.len(), 1);
assert!(commands.iter().any(|command| matches!(command, (
assert!(matches!(command, Some((
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
)) if rebroadcast_authentication == authentication));
}

#[tokio::test]
async fn non_validators_rebroadcasts() {
let (mut discovery, handlers, mut non_validator) = build().await;
let authentication = handlers[1].authentication().unwrap();
let (addresses, commands) = discovery.handle_message(
let (addresses, command) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
&mut non_validator,
);
assert_eq!(addresses, authentication.0.addresses());
assert_eq!(commands.len(), 1);
assert!(commands.iter().any(|command| matches!(command, (
assert!(matches!(command, Some((
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
)) if rebroadcast_authentication == authentication));
}

#[tokio::test]
Expand All @@ -275,12 +269,12 @@ mod tests {
let (_, signature) = handlers[2].authentication().unwrap();
let authentication = (auth_data, signature);
let handler = &mut handlers[0];
let (addresses, commands) = discovery.handle_message(
let (addresses, command) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication),
handler,
);
assert!(addresses.is_empty());
assert!(commands.is_empty());
assert!(command.is_none());
}

#[tokio::test]
Expand All @@ -293,15 +287,15 @@ mod tests {
handler,
);
sleep(Duration::from_millis(MS_COOLDOWN + 5));
let (addresses, commands) = discovery.handle_message(
let (addresses, command) = discovery.handle_message(
DiscoveryMessage::AuthenticationBroadcast(authentication.clone()),
handler,
);
assert_eq!(addresses, authentication.0.addresses());
assert!(commands.iter().any(|command| matches!(command, (
assert!(matches!(command, Some((
DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication),
DataCommand::Broadcast,
) if rebroadcast_authentication == &authentication)));
)) if rebroadcast_authentication == authentication));
}

#[tokio::test]
Expand All @@ -310,12 +304,12 @@ mod tests {
let expected_address = handlers[1].authentication().unwrap().0.addresses()[0].encode();
let authentication = handlers[1].authentication().unwrap();
let handler = &mut handlers[0];
let (addresses, commands) =
let (addresses, command) =
discovery.handle_message(DiscoveryMessage::Authentication(authentication), handler);
assert_eq!(addresses.len(), 1);
let address = addresses[0].encode();
assert_eq!(address, expected_address);
assert!(commands.is_empty());
assert!(command.is_none());
}

#[tokio::test]
Expand All @@ -325,11 +319,11 @@ mod tests {
let (_, signature) = handlers[2].authentication().unwrap();
let incorrect_authentication = (auth_data, signature);
let handler = &mut handlers[0];
let (addresses, commands) = discovery.handle_message(
let (addresses, command) = discovery.handle_message(
DiscoveryMessage::Authentication(incorrect_authentication),
handler,
);
assert!(addresses.is_empty());
assert!(commands.is_empty());
assert!(command.is_none());
}
}
Loading