Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): fixes stale chain metadata being sent to listening state #5039

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion base_layer/core/src/base_node/chain_metadata_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,19 @@ impl Display for PeerChainMetadata {

#[derive(Debug)]
pub enum ChainMetadataEvent {
PeerChainMetadataReceived(Vec<PeerChainMetadata>),
PeerChainMetadataReceived(PeerChainMetadata),
NetworkSilence,
}

impl ChainMetadataEvent {
pub fn peer_metadata(&self) -> Option<PeerChainMetadata> {
match self {
Self::PeerChainMetadataReceived(metadata) => Some(metadata.clone()),
_ => None,
}
}
}

#[derive(Clone)]
pub struct ChainMetadataHandle {
event_stream: broadcast::Sender<Arc<ChainMetadataEvent>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_p2p::services::liveness::LivenessHandle;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;
Expand All @@ -38,17 +37,16 @@ impl ServiceInitializer for ChainMetadataServiceInitializer {
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
debug!(target: LOG_TARGET, "Initializing Chain Metadata Service");
// Buffer size set to 1 because only the most recent metadata is applicable
let (publisher, _) = broadcast::channel(1);
let (publisher, _) = broadcast::channel(20);

let handle = ChainMetadataHandle::new(publisher.clone());
context.register_handle(handle);

context.spawn_until_shutdown(|handles| {
let liveness = handles.expect_handle::<LivenessHandle>();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();

ChainMetadataService::new(liveness, base_node, connectivity, publisher).run()
ChainMetadataService::new(liveness, base_node, publisher).run()
});

debug!(target: LOG_TARGET, "Chain Metadata Service initialized");
Expand Down
166 changes: 27 additions & 139 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use num_format::{Locale, ToFormattedString};
use prost::Message;
use tari_common::log_if_error;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityRequester},
message::MessageExt,
};
use tari_comms::message::MessageExt;
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent};
use tokio::sync::broadcast;

Expand All @@ -49,8 +46,6 @@ const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3;
pub(super) struct ChainMetadataService {
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
peer_chain_metadata: Vec<PeerChainMetadata>,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
number_of_rounds_no_pings: u16,
}
Expand All @@ -64,14 +59,11 @@ impl ChainMetadataService {
pub fn new(
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
) -> Self {
Self {
liveness,
base_node,
peer_chain_metadata: Vec::new(),
connectivity,
event_publisher,
number_of_rounds_no_pings: 0,
}
Expand All @@ -81,7 +73,6 @@ impl ChainMetadataService {
pub async fn run(mut self) {
let mut liveness_event_stream = self.liveness.get_event_stream();
let mut block_event_stream = self.base_node.get_block_event_stream();
let mut connectivity_events = self.connectivity.get_event_subscription();

log_if_error!(
target: LOG_TARGET,
Expand All @@ -108,29 +99,10 @@ impl ChainMetadataService {
);
},

Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event);
}
}
}
}

fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
use ConnectivityEvent::{PeerBanned, PeerDisconnected};
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
"Removing disconnected/banned peer `{}` from chain metadata list ", node_id
);
self.peer_chain_metadata.remove(pos);
}
},
_ => {},
}
}

/// Handle BlockEvents
async fn handle_block_event(&mut self, event: &BlockEvent) -> Result<(), ChainMetadataSyncError> {
match event {
Expand Down Expand Up @@ -166,8 +138,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
Expand All @@ -179,8 +150,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// New ping round has begun
Expand All @@ -197,10 +167,6 @@ impl ChainMetadataService {
self.number_of_rounds_no_pings = 0;
}
}
// Ensure that we're waiting for the correct amount of peers to respond
// and have allocated space for their replies

self.resize_chainstate_buffer(*num_peers);
},
}

Expand All @@ -212,31 +178,10 @@ impl ChainMetadataService {
Ok(())
}

async fn send_chain_metadata_to_event_publisher(&mut self) -> Result<(), ChainMetadataSyncError> {
// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
self.peer_chain_metadata.clone(),
)));

Ok(())
}

fn resize_chainstate_buffer(&mut self, n: usize) {
match self.peer_chain_metadata.capacity() {
cap if n > cap => {
let additional = n - self.peer_chain_metadata.len();
self.peer_chain_metadata.reserve_exact(additional);
},
cap if n < cap => {
self.peer_chain_metadata.shrink_to(cap);
},
_ => {},
}
}

fn collect_chain_state_from_ping_pong(&mut self, event: &PingPongEvent) -> Result<(), ChainMetadataSyncError> {
async fn send_chain_metadata_to_event_publisher(
&mut self,
event: &PingPongEvent,
) -> Result<(), ChainMetadataSyncError> {
let chain_metadata_bytes = event
.metadata
.get(MetadataKey::ChainMetadata)
Expand All @@ -252,19 +197,15 @@ impl ChainMetadataService {
chain_metadata.accumulated_difficulty().to_formatted_string(&Locale::en),
);

if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| *peer_chainstate.node_id() == event.node_id)
{
self.peer_chain_metadata.remove(pos);
}
let peer_chain_metadata = PeerChainMetadata::new(event.node_id.clone(), chain_metadata, event.latency);

// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
peer_chain_metadata,
)));

self.peer_chain_metadata.push(PeerChainMetadata::new(
event.node_id.clone(),
chain_metadata,
event.latency,
));
Ok(())
}
}
Expand All @@ -274,13 +215,7 @@ mod test {
use std::convert::TryInto;

use futures::StreamExt;
use tari_comms::{
peer_manager::NodeId,
test_utils::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_many_node_identities,
},
};
use tari_comms::peer_manager::NodeId;
use tari_p2p::services::liveness::{
mock::{create_p2p_liveness_mock, LivenessMockState},
LivenessRequest,
Expand Down Expand Up @@ -323,33 +258,24 @@ mod test {
fn setup() -> (
ChainMetadataService,
LivenessMockState,
ConnectivityManagerMockState,
reply_channel::TryReceiver<NodeCommsRequest, NodeCommsResponse, CommsInterfaceError>,
broadcast::Receiver<Arc<ChainMetadataEvent>>,
) {
let (liveness_handle, mock, _) = create_p2p_liveness_mock(1);
let liveness_mock_state = mock.get_mock_state();
task::spawn(mock.run());

let (base_node, base_node_receiver) = create_base_node_nci();
let (publisher, _) = broadcast::channel(1);

let (connectivity, mock) = create_connectivity_mock();
let connectivity_mock_state = mock.get_shared_state();
task::spawn(mock.run());
let (publisher, event_rx) = broadcast::channel(10);

let service = ChainMetadataService::new(liveness_handle, base_node, connectivity, publisher);
let service = ChainMetadataService::new(liveness_handle, base_node, publisher);

(
service,
liveness_mock_state,
connectivity_mock_state,
base_node_receiver,
)
(service, liveness_mock_state, base_node_receiver, event_rx)
}

#[tokio::test]
async fn update_liveness_chain_metadata() {
let (mut service, liveness_mock_state, _, mut base_node_receiver) = setup();
let (mut service, liveness_mock_state, mut base_node_receiver, _) = setup();

let mut proto_chain_metadata = create_sample_proto_chain_metadata();
proto_chain_metadata.height_of_longest_chain = Some(123);
Expand All @@ -375,7 +301,7 @@ mod test {
}
#[tokio::test]
async fn handle_liveness_event_ok() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut events_rx) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
Expand All @@ -388,57 +314,19 @@ mod test {
latency: None,
};

// To prevent the chain metadata buffer being flushed after receiving a single pong event,
// extend it's capacity to 2
service.peer_chain_metadata.reserve_exact(2);
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert_eq!(service.peer_chain_metadata.len(), 1);
let metadata = service.peer_chain_metadata.remove(0);
let metadata = events_rx.recv().await.unwrap().peer_metadata().unwrap();
assert_eq!(*metadata.node_id(), node_id);
assert_eq!(
metadata.claimed_chain_metadata().height_of_longest_chain(),
proto_chain_metadata.height_of_longest_chain.unwrap()
);
}

#[tokio::test]
async fn handle_liveness_event_banned_peer() {
let (mut service, _, _, _) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
metadata.insert(MetadataKey::ChainMetadata, proto_chain_metadata.to_encoded_bytes());

service.peer_chain_metadata.reserve_exact(3);

let nodes = build_many_node_identities(2, Default::default());
for node in &nodes {
let pong_event = PingPongEvent {
metadata: metadata.clone(),
node_id: node.node_id().clone(),
latency: None,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
}

assert!(service
.peer_chain_metadata
.iter()
.any(|p| p.node_id() == nodes[0].node_id()));
service.handle_connectivity_event(ConnectivityEvent::PeerBanned(nodes[0].node_id().clone()));
// Check that banned peer was removed
assert!(service
.peer_chain_metadata
.iter()
.all(|p| p.node_id() != nodes[0].node_id()));
}

#[tokio::test]
async fn handle_liveness_event_no_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let metadata = Metadata::new();
let node_id = NodeId::new();
Expand All @@ -450,12 +338,12 @@ mod test {

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert!(service.peer_chain_metadata.is_empty());
assert!(event_rx.try_recv().is_err());
}

#[tokio::test]
async fn handle_liveness_event_bad_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let mut metadata = Metadata::new();
metadata.insert(MetadataKey::ChainMetadata, b"no-good".to_vec());
Expand All @@ -469,6 +357,6 @@ mod test {
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
let err = service.handle_liveness_event(&sample_event).await.unwrap_err();
unpack_enum!(ChainMetadataSyncError::DecodeError(_err) = err);
assert_eq!(service.peer_chain_metadata.len(), 0);
assert!(event_rx.try_recv().is_err());
}
}
Loading