diff --git a/Cargo.toml b/Cargo.toml index 02bfa30..afdd80b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,18 +12,35 @@ members = [ "crates/vrf", "crates/crypto", "crates/enclaves", + "crates/gossip", "crates/types", "demos/test_conflict", "demos/coll_tx", "demos/vlc_dag", "demos/tee_vlc", + "demos/test_vlc_net", ] +[profile.dev] +opt-level = 1 +debug = true + +[profile.dev.package."*"] +opt-level = 3 + [profile.release] -# debug = true +opt-level = "z" +debug = true +lto = true strip = true incremental = true +[profile.bench] +debug = true + +[profile.artifact] +inherits = "release" + [dependencies] anyhow = { version = "1.0.75", features = ["backtrace"] } async-trait = "0.1.74" diff --git a/crates/README.md b/crates/README.md index e9584bf..f021fe0 100644 --- a/crates/README.md +++ b/crates/README.md @@ -25,6 +25,11 @@ The crates folder of Chronos includes core functional code crates and utility li - This module provides some common utilities of TEE (Trusted Execution Environment) Enclaves. - For examples: AWS nitro enclave, Mircosoft Azure, Intel SGX, etc. +## [gossip](./gossip/) + +- This module provides the Gossip network toolkit for customizing a specified parameter. +- It implements a basic gossip network using libp2p. It currently supports discovery via mdns and bootnodes. + ## [crypto](./crypto/) - Some common crypto utilities, signatures, verify, and hash functions for elliptic curve. diff --git a/crates/gossip/Cargo.toml b/crates/gossip/Cargo.toml new file mode 100644 index 0000000..fa9745b --- /dev/null +++ b/crates/gossip/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "gossip" +version = "0.1.0" +edition = "2021" + +[dependencies] +crypto = { path = "../crypto" } +color-eyre = "0.6.2" +futures = "0.3" +ahash = "0.8" +anyhow = "1.0" +serde = { version = "1.0", features = ["derive"] } +libp2p = { version = "0.54.1", features = [ + "gossipsub", + "mdns", + "autonat", + 'identify', + "noise", + "kad", + "macros", + "ping", + "tcp", + "tokio", + 'upnp', + "yamux", +] } +multiaddr = "0.18" +libp2p-swarm-test = "0.4" +tokio = { version = "1.25.0", features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tracing = "0.1" +sha2 = "0.10.8" + +[features] +mdns = [] \ No newline at end of file diff --git a/crates/gossip/src/discovery.rs b/crates/gossip/src/discovery.rs new file mode 100644 index 0000000..900890a --- /dev/null +++ b/crates/gossip/src/discovery.rs @@ -0,0 +1,654 @@ +// Copyright 2019-2024 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::{ + cmp, + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; +use libp2p::{ + autonat, + core::Multiaddr, + identify, + identity::{PeerId, PublicKey}, + kad, + mdns::{tokio::Behaviour as Mdns, Event as MdnsEvent}, + multiaddr::Protocol, + swarm::{ + behaviour::toggle::Toggle, + derive_prelude::*, + dial_opts::{DialOpts, PeerCondition}, + NetworkBehaviour, ToSwarm, + }, + upnp, StreamProtocol, +}; +use tokio::time::Interval; +use tracing::{debug, info, trace, warn}; + +pub const HETU_VERSION_STRING: &str = "0.1"; + +#[derive(NetworkBehaviour)] +pub struct DerivedDiscoveryBehaviour { + /// Kademlia discovery. + kademlia: Toggle>, + /// Discovers nodes on the local network. + mdns: Toggle, + /// [`identify::Behaviour`] needs to be manually hooked up with [`kad::Behaviour`] to make discovery work. See + identify: identify::Behaviour, + /// For details see + autonat: Toggle, + /// `UPnP` port mapping that automatically try to map the ports externally to internal addresses on the gateway. + upnp: upnp::tokio::Behaviour, +} + +impl DerivedDiscoveryBehaviour { + pub fn get_kadelia_mut(&mut self) -> Option<&mut kad::Behaviour> { + if self.kademlia.is_enabled() { + self.kademlia.as_mut() + } else { + None + } + } +} + +/// Event generated by the `DiscoveryBehaviour`. +#[derive(Debug)] +pub enum DiscoveryEvent { + /// Event that notifies that we connected to the node with the given peer + /// id. + PeerConnected(PeerId), + + /// Event that notifies that we disconnected with the node with the given + /// peer id. + PeerDisconnected(PeerId), + + /// Discovery event + Discovery(Box), +} + +/// `DiscoveryBehaviour` configuration. +/// +/// Note: In order to discover nodes or load and store values via Kademlia one +/// has to add at least one protocol. +pub struct DiscoveryConfig<'a> { + local_peer_id: PeerId, + local_public_key: PublicKey, + user_defined: Vec<(PeerId, Multiaddr)>, + target_peer_count: u64, + enable_mdns: bool, + enable_auto_nat: bool, + enable_kademlia: bool, + network_name: &'a str, +} + +impl<'a> DiscoveryConfig<'a> { + /// Create a default configuration with the given public key. + pub fn new(local_public_key: PublicKey, network_name: &'a str) -> Self { + DiscoveryConfig { + local_peer_id: local_public_key.to_peer_id(), + local_public_key, + user_defined: Vec::new(), + target_peer_count: u64::MAX, + enable_mdns: false, + enable_auto_nat: false, + enable_kademlia: true, + network_name, + } + } + + /// Set the number of connected peers at which we pause discovery. + pub fn target_peer_count(mut self, limit: u64) -> Self { + self.target_peer_count = limit; + self + } + + /// Set custom nodes which never expire, e.g. bootstrap or reserved nodes. + pub fn with_user_defined( + mut self, + user_defined: impl IntoIterator, + ) -> anyhow::Result { + for mut addr in user_defined.into_iter() { + if let Some(Protocol::P2p(peer_id)) = addr.pop() { + self.user_defined.push((peer_id, addr)) + } else { + anyhow::bail!("Failed to parse peer id from {addr}") + } + } + Ok(self) + } + + /// Configures if MDNS is enabled. + pub fn with_mdns(mut self, value: bool) -> Self { + self.enable_mdns = value; + self + } + + /// Configures if Kademlia is enabled. + pub fn with_kademlia(mut self, value: bool) -> Self { + self.enable_kademlia = value; + self + } + + /// Create a `DiscoveryBehaviour` from this configuration. + pub fn finish(self) -> anyhow::Result { + let DiscoveryConfig { + local_peer_id, + local_public_key, + user_defined, + target_peer_count, + enable_mdns, + enable_auto_nat, + enable_kademlia, + network_name, + } = self; + + let mut peers = HashSet::new(); + let kademlia_opt = if enable_kademlia { + let mut kademlia = new_kademlia( + local_peer_id, + StreamProtocol::try_from_owned(format!("/hetu/kad/{network_name}/kad/1.0.0"))?, + ); + for (peer_id, addr) in &user_defined { + kademlia.add_address(peer_id, addr.clone()); + peers.insert(*peer_id); + } + if let Err(e) = kademlia.bootstrap() { + warn!("Kademlia bootstrap failed: {}", e); + } + Some(kademlia) + } else { + None + }; + + let mdns_opt = if enable_mdns { + Some(Mdns::new(Default::default(), local_peer_id).expect("Could not start mDNS")) + } else { + None + }; + let auto_nat_opt = if enable_auto_nat { + Some(autonat::Behaviour::new(local_peer_id, Default::default())) + } else { + None + }; + + Ok(DiscoveryBehaviour { + discovery: DerivedDiscoveryBehaviour { + kademlia: kademlia_opt.into(), + mdns: mdns_opt.into(), + identify: identify::Behaviour::new( + identify::Config::new("ipfs/0.1.0".into(), local_public_key) + .with_agent_version(format!("hetu-{}", HETU_VERSION_STRING)) + .with_push_listen_addr_updates(true), + ), + autonat: auto_nat_opt.into(), + upnp: Default::default(), + }, + next_kad_random_query: tokio::time::interval(Duration::from_secs(1)), + duration_to_next_kad: Duration::from_secs(1), + pending_events: VecDeque::new(), + n_node_connected: 0, + peers, + peer_info: HashMap::new(), + target_peer_count, + custom_seed_peers: user_defined, + pending_dial_opts: VecDeque::new(), + }) + } +} + +pub fn new_kademlia( + peer_id: PeerId, + protocol: StreamProtocol, +) -> kad::Behaviour { + let store = kad::store::MemoryStore::new(peer_id); + let kad_config = kad::Config::new(protocol); + + let mut kademlia = kad::Behaviour::with_config(peer_id, store, kad_config); + // `set_mode(Server)` fixes https://github.com/ChainSafe/forest/issues/3620 + // but it should not be required as the behaviour should automatically switch to server mode + // according to the doc. It might be a bug in `libp2p`. + // We should fix the bug or report with a minimal reproduction. + kademlia.set_mode(Some(kad::Mode::Server)); + kademlia +} + +/// Implementation of `NetworkBehaviour` that discovers the nodes on the +/// network. +// Behaviours that manage connections should come first, to get rid of some panics in debug build. +// See +pub struct DiscoveryBehaviour { + /// Derived discovery discovery. + discovery: DerivedDiscoveryBehaviour, + /// Stream that fires when we need to perform the next random Kademlia + /// query. + next_kad_random_query: Interval, + /// After `next_kad_random_query` triggers, the next one triggers after this + /// duration. + duration_to_next_kad: Duration, + /// Events to return in priority when polled. + pending_events: VecDeque, + /// Number of nodes we're currently connected to. + n_node_connected: u64, + /// Keeps hash set of peers connected. + pub peers: HashSet, + /// Keeps hash map of peers and their information. + pub peer_info: HashMap, + /// Number of connected peers to pause discovery on. + pub target_peer_count: u64, + /// Seed peers + custom_seed_peers: Vec<(PeerId, Multiaddr)>, + /// Options to configure dials to known peers. + pending_dial_opts: VecDeque, +} + +#[derive(Default)] +pub struct PeerInfo { + pub addresses: HashSet, + pub identify_info: Option, +} + +impl DiscoveryBehaviour { + /// Returns reference to peer set. + pub fn peers(&self) -> &HashSet { + &self.peers + } + + pub fn seed_peers(&self) -> &Vec<(PeerId, Multiaddr)> { + &self.custom_seed_peers + } + + pub fn node_connected_count(&self) -> u64 { + self.n_node_connected + } + + pub fn remove_peer(&mut self, peer_id: &PeerId) { + if self.discovery.autonat.is_enabled(){ + if let Some(nat) = self.discovery.autonat.as_mut(){ + nat.remove_server(peer_id); + } + } + if let Some(kad) = self.discovery.get_kadelia_mut() { + kad.remove_peer(peer_id); + } + } + + /// Returns a map of peer ids and their multi-addresses + pub fn peer_addresses(&self) -> HashMap> { + self.peer_info + .iter() + .map(|(peer_id, info)| (*peer_id, info.addresses.clone())) + .collect() + } + + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peer_info.get(peer_id) + } + + /// Bootstrap Kademlia network + pub fn bootstrap(&mut self) -> Result { + if let Some(active_kad) = self.discovery.kademlia.as_mut() { + active_kad.bootstrap().map_err(|e| e.to_string()) + } else { + // Manually dial to seed peers when kademlia is disabled + for (peer_id, address) in &self.custom_seed_peers { + self.pending_dial_opts.push_back( + DialOpts::peer_id(*peer_id) + .condition(PeerCondition::Disconnected) + .addresses(vec![address.clone()]) + .build(), + ); + } + Err("Kademlia is not activated".to_string()) + } + } + + /// Gets the NAT status. + pub fn nat_status(&self) -> Option { + if let Some(nat) = self.discovery.autonat.as_ref(){ + return Some(nat.nat_status()) + }else { + return None; + } + } +} + +impl NetworkBehaviour for DiscoveryBehaviour { + type ConnectionHandler = ::ConnectionHandler; + type ToSwarm = DiscoveryEvent; + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result, ConnectionDenied> { + self.peer_info + .entry(peer) + .or_default() + .addresses + .insert(remote_addr.clone()); + self.discovery.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &libp2p::Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.peer_info + .entry(peer) + .or_default() + .addresses + .insert(addr.clone()); + self.discovery.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.discovery + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[libp2p::Multiaddr], + effective_role: libp2p::core::Endpoint, + ) -> Result, ConnectionDenied> { + self.discovery.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match &event { + FromSwarm::ConnectionEstablished(e) => { + if e.other_established == 0 { + self.n_node_connected += 1; + self.peers.insert(e.peer_id); + self.pending_events + .push_back(DiscoveryEvent::PeerConnected(e.peer_id)); + } + } + FromSwarm::ConnectionClosed(e) => { + if e.remaining_established == 0 { + self.n_node_connected -= 1; + self.peers.remove(&e.peer_id); + self.peer_info.remove(&e.peer_id); + self.pending_events + .push_back(DiscoveryEvent::PeerDisconnected(e.peer_id)); + } + } + _ => {} + }; + self.discovery.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: THandlerOutEvent, + ) { + self.discovery + .on_connection_handler_event(peer_id, connection, event); + } + + #[allow(clippy::type_complexity)] + fn poll( + &mut self, + cx: &mut Context, + ) -> Poll>> { + // Immediately process the content of `discovered`. + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(ev)); + } + + // Dial to peers + if let Some(opts) = self.pending_dial_opts.pop_front() { + return Poll::Ready(ToSwarm::Dial { opts }); + } + + // Poll the stream that fires when we need to start a random Kademlia query. + while self.next_kad_random_query.poll_tick(cx).is_ready() { + if self.n_node_connected < self.target_peer_count { + // We still have not hit the discovery max, send random request for peers. + let random_peer_id = PeerId::random(); + debug!( + "Libp2p <= Starting random Kademlia request for {:?}", + random_peer_id + ); + if let Some(kademlia) = self.discovery.kademlia.as_mut() { + kademlia.get_closest_peers(random_peer_id); + } + } + + // Schedule the next random query with exponentially increasing delay, + // capped at 60 seconds. + self.next_kad_random_query = tokio::time::interval(self.duration_to_next_kad); + // we need to reset the interval, otherwise the next tick completes immediately. + self.next_kad_random_query.reset(); + + self.duration_to_next_kad = + cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); + } + + // Poll discovery events. + while let Poll::Ready(ev) = self.discovery.poll(cx) { + match ev { + ToSwarm::GenerateEvent(ev) => { + match &ev { + DerivedDiscoveryBehaviourEvent::Identify(ev) => { + if let identify::Event::Received { peer_id, info, .. } = ev { + self.peer_info.entry(*peer_id).or_default().identify_info = + Some(info.clone()); + if let Some(kademlia) = self.discovery.kademlia.as_mut() { + for address in &info.listen_addrs { + if self.n_node_connected >= self.target_peer_count { + // Already over discovery max, don't add new peers. + continue; + } + info!("identify protocol add node: {:?}, n_node_connected: {:?}", address, self.n_node_connected); + kademlia.add_address(peer_id, address.clone()); + } + } + } + } + DerivedDiscoveryBehaviourEvent::Autonat(_) => {} + DerivedDiscoveryBehaviourEvent::Upnp(ev) => match ev { + upnp::Event::NewExternalAddr(addr) => { + info!("UPnP NewExternalAddr: {addr}"); + } + upnp::Event::ExpiredExternalAddr(addr) => { + info!("UPnP ExpiredExternalAddr: {addr}"); + } + upnp::Event::GatewayNotFound => { + info!("UPnP GatewayNotFound"); + } + upnp::Event::NonRoutableGateway => { + info!("UPnP NonRoutableGateway"); + } + }, + DerivedDiscoveryBehaviourEvent::Kademlia(ev) => match ev { + // Adding to Kademlia buckets is automatic with our config, + // no need to do manually. + kad::Event::RoutingUpdated { .. } => {} + kad::Event::RoutablePeer { .. } => {} + kad::Event::PendingRoutablePeer { .. } => { + // Intentionally ignore + } + other => { + trace!("Libp2p => Unhandled Kademlia event: {:?}", other) + } + }, + DerivedDiscoveryBehaviourEvent::Mdns(ev) => match ev { + MdnsEvent::Discovered(list) => { + if self.n_node_connected >= self.target_peer_count { + // Already over discovery max, don't add discovered peers. + // We could potentially buffer these addresses to be added later, + // but mdns is not an important use case and may be removed in future. + continue; + } + + info!( + "mdns n_node_connected {:?}, target_peer_count {:?}", + self.n_node_connected, self.target_peer_count + ); + // Add any discovered peers to Kademlia + for (peer_id, multiaddr) in list { + if let Some(kad) = self.discovery.kademlia.as_mut() { + kad.add_address(peer_id, multiaddr.clone()); + } + } + } + MdnsEvent::Expired(list) => { + for (peer_id, multiaddr) in list { + debug!("mDNS discover peer has expired: {peer_id}"); + if let Some(kad) = self.discovery.kademlia.as_mut() { + kad.remove_address(peer_id, &multiaddr); + } + } + } + }, + } + self.pending_events + .push_back(DiscoveryEvent::Discovery(Box::new(ev))); + } + ToSwarm::Dial { opts } => { + return Poll::Ready(ToSwarm::Dial { opts }); + } + ToSwarm::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(ToSwarm::NotifyHandler { + peer_id, + handler, + event, + }) + } + ToSwarm::CloseConnection { + peer_id, + connection, + } => { + return Poll::Ready(ToSwarm::CloseConnection { + peer_id, + connection, + }) + } + ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => { + return Poll::Ready(ToSwarm::RemoveListener { id }) + } + ToSwarm::NewExternalAddrCandidate(addr) => { + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr)) + } + ToSwarm::ExternalAddrConfirmed(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) + } + ToSwarm::ExternalAddrExpired(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) + } + _ => {} + } + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use libp2p::{identity::Keypair, swarm::SwarmEvent, Swarm}; + use libp2p_swarm_test::SwarmExt as _; + + use super::*; + + #[tokio::test] + async fn kademlia_test() { + fn new_discovery( + keypair: Keypair, + seed_peers: impl IntoIterator, + ) -> DiscoveryBehaviour { + DiscoveryConfig::new(keypair.public(), "calibnet") + .with_mdns(false) + .with_kademlia(true) + .with_user_defined(seed_peers) + .unwrap() + .target_peer_count(128) + .finish() + .unwrap() + } + + let mut b = Swarm::new_ephemeral(|k| new_discovery(k, vec![])); + b.listen().with_memory_addr_external().await; + let b_peer_id = *b.local_peer_id(); + let b_addresses: Vec<_> = b + .external_addresses() + .map(|addr| { + let mut addr = addr.clone(); + addr.push(multiaddr::Protocol::P2p(b_peer_id)); + addr + }) + .collect(); + + let mut c = Swarm::new_ephemeral(|k| new_discovery(k, vec![])); + c.listen().with_memory_addr_external().await; + let c_peer_id = *c.local_peer_id(); + if let Some(c_kad) = c.behaviour_mut().discovery.kademlia.as_mut() { + for addr in b.external_addresses() { + c_kad.add_address(&b_peer_id, addr.clone()); + } + } + + let mut a = Swarm::new_ephemeral(|k| new_discovery(k, b_addresses)); + + // Bootstrap `a` and `c` + a.behaviour_mut().bootstrap().unwrap(); + c.behaviour_mut().bootstrap().unwrap(); + + // Run event loop of `b` and `c` + tokio::spawn(b.loop_on_next()); + tokio::spawn(c.loop_on_next()); + + // Wait until `c` is connected to `a` + a.wait(|e| match e { + SwarmEvent::Behaviour(DiscoveryEvent::PeerConnected(peer_id)) => { + if peer_id == c_peer_id { + Some(()) + } else { + None + } + } + _ => None, + }) + .await; + } +} diff --git a/crates/gossip/src/lib.rs b/crates/gossip/src/lib.rs new file mode 100644 index 0000000..7f7cf6f --- /dev/null +++ b/crates/gossip/src/lib.rs @@ -0,0 +1,3 @@ +pub mod network; +pub mod discovery; +pub mod peer_manager; diff --git a/crates/gossip/src/network.rs b/crates/gossip/src/network.rs new file mode 100644 index 0000000..b4af2a0 --- /dev/null +++ b/crates/gossip/src/network.rs @@ -0,0 +1,531 @@ +/// Gossip implements a basic gossip network using libp2p. +/// It currently supports discovery via mdns and bootnodes, dht. +use ahash::HashSetExt; +use color_eyre::eyre::{eyre, Result, WrapErr}; +use crypto::core::DigestHash; +use futures::StreamExt; +use libp2p::kad::{BootstrapResult, QueryId}; +use libp2p::{ + connection_limits, + allow_block_list::{self, BlockedPeers}, + core::upgrade::Version, + gossipsub::{self, Message, MessageId, TopicHash}, + identity::{self, Keypair}, + noise, ping, + swarm::{NetworkBehaviour, Swarm, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Transport, +}; +use serde::{Deserialize, Serialize}; +use std::{ + pin::Pin, + str::FromStr, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, info, trace, warn}; + +pub use libp2p::gossipsub::Sha256Topic; + +use crate::discovery::{DiscoveryBehaviour, DiscoveryConfig}; +use crate::peer_manager::PeerManager; + +pub const LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/9600"; +// Configures the maximum number of concurrent established connections per peer, regardless of direction (incoming or outgoing). +const MAX_ESTABLISHED_PER_PEER: u32 = 4; + +#[derive(NetworkBehaviour)] +pub struct GossipnetBehaviour { + // Behaviours that manage connections should come first, to get rid of some panics in debug build. + // See + connection_limits: connection_limits::Behaviour, + blocked_peers: allow_block_list::Behaviour, + ping: ping::Behaviour, + gossipsub: gossipsub::Behaviour, + pub discovery: DiscoveryBehaviour, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum Discover { + Config, + MDns, + DHT, +} + +impl FromStr for Discover { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "config" => Ok(Discover::Config), + "mdns" => Ok(Discover::MDns), + "dht" => Ok(Discover::DHT), + _ => Err(format!("Invalid value for Discover: {}", s)), + } + } +} + +impl Default for Discover { + fn default() -> Self { + Discover::DHT + } +} + +pub struct NetworkBuilder { + local_key: Keypair, + bootnodes: Option>, + listen_addr: String, +} + +impl NetworkBuilder { + pub fn new(local_key: Keypair) -> Self { + Self { + local_key, + bootnodes: None, + listen_addr: LISTEN_ADDR.to_string(), + } + } + + pub fn bootnodes(mut self, bootnodes: Vec) -> Self { + self.bootnodes = Some(bootnodes); + self + } + + pub fn listen_addr(mut self, listen_addr: String) -> Self { + self.listen_addr = listen_addr; + self + } + + pub fn build(self) -> Result { + Network::new( + self.local_key, + self.bootnodes, + self.listen_addr, + Discover::Config, + 50, + 1.0, + ) + } +} + +impl Default for NetworkBuilder { + fn default() -> Self { + Self::new(identity::Keypair::generate_ed25519()) + } +} + +pub struct Network { + pub local_key: Keypair, + pub multiaddr: Multiaddr, + swarm: Swarm, + peer_manager: PeerManager, + _discover: Discover, + terminated: bool, +} + +impl Network { + pub fn new( + local_key: Keypair, + bootnodes: Option>, + listen_addr: String, + discover: Discover, + max_discover_node: u32, + disconnect_rate: f64, + ) -> Result { + let local_peer_id = PeerId::from(local_key.public()); + + let transport = tcp::tokio::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate(noise::Config::new(&local_key)?) + .multiplex(yamux::Config::default()) + .boxed(); + + let gossipsub = Network::new_gossip(&local_key)?; + let swarm_config = libp2p::swarm::Config::with_tokio_executor() + .with_idle_connection_timeout(Duration::from_secs(300)); + + let bootlist: Vec = if let Some(addrs) = bootnodes { + addrs + .into_iter() + .filter_map(|addr| match Multiaddr::from_str(&addr) { + Ok(remote) => Some(remote), + Err(e) => { + eprintln!("Failed to parse address: {}, error: {:?}", addr, e); + None + } + }) + .collect() + } else { + Vec::new() + }; + + let (enable_mdns, enable_kad) = match discover { + Discover::MDns => (true, false), + Discover::DHT => (false, true), + _ => (false, false), + }; + let discovery = DiscoveryConfig::new(local_key.public(), "hetu") + .with_mdns(enable_mdns) + .with_kademlia(enable_kad) + .with_user_defined(bootlist) + .map_err(|e| eyre!("bootnode format error: {}", e))? + .target_peer_count(max_discover_node as u64) + .finish() + .map_err(|e| eyre!("bootstrap error: {}", e))?; + + let connection_limits = connection_limits::Behaviour::new( + connection_limits::ConnectionLimits::default() + .with_max_pending_incoming(Some( + max_discover_node.saturating_mul(MAX_ESTABLISHED_PER_PEER), + )) + .with_max_pending_outgoing(Some( + max_discover_node.saturating_mul(MAX_ESTABLISHED_PER_PEER), + )) + .with_max_established_incoming(Some(max_discover_node)) + .with_max_established_outgoing(Some(max_discover_node)) + .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)), + ); + + let mut swarm = { + let behaviour = GossipnetBehaviour { + connection_limits, + blocked_peers: allow_block_list::Behaviour::default(), + gossipsub, + ping: ping::Behaviour::default(), + discovery, + }; + Swarm::new(transport, behaviour, local_peer_id, swarm_config) + }; + + swarm.listen_on(listen_addr.parse()?)?; + + // Bootstrap with Kademlia or config dial + if let Err(e) = swarm.behaviour_mut().discovery.bootstrap() { + warn!("Failed to bootstrap with Kademlia: {e}"); + } + + let multiaddr = Multiaddr::from_str(&listen_addr)?; + let peer_manager = PeerManager::new(disconnect_rate); + Ok(Network { + local_key, + multiaddr, + swarm, + peer_manager, + _discover: discover, + terminated: false, + }) + } + + fn new_gossip(local_key: &Keypair) -> Result { + let message_id_fn = |message: &gossipsub::Message| { + use DigestHash as _; + let hash = message.data.blake2(); + gossipsub::MessageId::from(hash.as_bytes()) + }; + let gossipsub_config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(1)) + .duplicate_cache_time(Duration::from_secs(10)) + .max_transmit_size(1024 * 1024) // 1 MB + .history_length(2) + .history_gossip(1) + .gossip_lazy(5) // Minimum number of peers to gossip to + .gossip_factor(0.25) // Fraction of peers to gossip to + .gossip_retransimission(3) // Number of times to retransmit messages + .validation_mode(gossipsub::ValidationMode::Strict) // the default is Strict (enforce message signing) + .message_id_fn(message_id_fn) // content-address messages so that duplicates aren't propagated + // .do_px() // not implemented by rust-libp2p + .build() + .map_err(|e| eyre!("failed to build gossipsub config: {}", e))?; + let gossipsub = gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(local_key.clone()), + gossipsub_config, + ) + .map_err(|e| eyre!("failed to create gossipsub behaviour: {}", e))?; + Ok(gossipsub) + } + + pub fn restart_gossip(&mut self) -> Result<(), color_eyre::eyre::Error> { + self.swarm.behaviour_mut().gossipsub = Network::new_gossip(&self.local_key)?; + Ok(()) + } + + pub async fn publish(&mut self, message: Vec, topic: Sha256Topic) -> Result { + self.swarm + .behaviour_mut() + .gossipsub + .publish(topic, message) + .wrap_err("failed to publish message") + } + + pub fn subscribe(&mut self, topic: &Sha256Topic) { + self.swarm + .behaviour_mut() + .gossipsub + .subscribe(topic) + .unwrap(); + } + + pub fn unsubscribe(&mut self, topic: &Sha256Topic) { + self.swarm + .behaviour_mut() + .gossipsub + .unsubscribe(topic) + .unwrap(); + } + + /// Bootstrap Kademlia network + pub fn bootstrap(&mut self) -> Result { + self.swarm + .behaviour_mut() + .discovery + .bootstrap() + .map_err(|e| e.to_string()) + } + + /// Get peer node numbers + pub fn peer_nums(&mut self) -> usize { + self.swarm.behaviour_mut().discovery.peers().len() + } + + /// Get all_mesh_peers + pub fn all_mesh_peers(&mut self) -> usize { + self.swarm + .behaviour_mut() + .gossipsub + .all_mesh_peers() + .count() + } + + /// disconnect by traffic & left bootnode + pub fn disconnect_by_traffic(&mut self) { + let conn_peers = &self.swarm.behaviour().discovery.peers; + let mut peers = self.peer_manager.traffic.top_traffic_peers(); + self.peer_manager.traffic.clear_peer_traffic(); + peers.retain(|id| conn_peers.contains(id)); + let custom_seed_peers = self.swarm.behaviour().discovery.seed_peers().clone(); + + for peer_id in &peers { + let is_seed_peer = custom_seed_peers + .iter() + .any(|(seed_peer_id, _)| seed_peer_id == peer_id); + + if !is_seed_peer { + let addr = self + .swarm + .behaviour() + .discovery + .peer_info(peer_id) + .map_or(ahash::HashSet::new(), |info| info.addresses.clone()); + if ! addr.is_empty() { + info!("Disconnecting & add blocked list due to high traffic, Multiaddr: {:?}", addr); + self.swarm.behaviour_mut().discovery.remove_peer(peer_id); + self.peer_manager.peer_ban_list.insert(*peer_id, None); + self.swarm.behaviour_mut().blocked_peers.block_peer(*peer_id); + // let _ = self.swarm.disconnect_peer_id(*peer_id); + } + } + } + } + + pub fn recover_connecting(&mut self) { + for (peer_id, _expire) in &self.peer_manager.peer_ban_list { + info!("Recover PeerId: {:?} and delete from blocked list", peer_id); + self.swarm.behaviour_mut().blocked_peers.unblock_peer(*peer_id); + } + self.peer_manager.peer_ban_list.clear(); + } +} + +#[derive(Debug)] +pub enum Event { + NewListenAddr(Multiaddr), + Message(Message), + MdnsPeersConnected(Vec), + MdnsPeersDisconnected(Vec), + PeerConnected(PeerId), + PeerSubscribed(PeerId, TopicHash), + Bootstrap(BootstrapResult), + NewExternalAddrCandidate(Multiaddr), + ExternalAddrExpired(Multiaddr), +} + +impl futures::Stream for Network { + type Item = Event; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + while let Poll::Ready(maybe_event) = self.swarm.poll_next_unpin(cx) { + let Some(event) = maybe_event else { + self.terminated = true; + return Poll::Ready(None); + }; + + match event { + SwarmEvent::Behaviour(GossipnetBehaviourEvent::Gossipsub( + gossipsub::Event::Message { + propagation_source: peer_id, + message_id: id, + message, + }, + )) => { + debug!( + "Got message: '{:?}' with id: {id} from peer: {peer_id}", + message.data, + ); + self.peer_manager.traffic.record_traffic(peer_id, message.data.len() as u64); + return Poll::Ready(Some(Event::Message(message))); + } + SwarmEvent::NewListenAddr { address, .. } => { + debug!("Local node is listening on {address}"); + return Poll::Ready(Some(Event::NewListenAddr(address))); + } + SwarmEvent::Behaviour(GossipnetBehaviourEvent::Gossipsub( + gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + debug!( + "Peer {peer_id} subscribed to topic: {topic:?}", + peer_id = peer_id, + topic = topic, + ); + return Poll::Ready(Some(Event::PeerSubscribed(peer_id, topic))); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + concurrent_dial_errors: _, + established_in: _, + connection_id: _, + } => { + debug!( + "Connection with {peer_id} & {endpoint:?} established (total: {num_established})", + peer_id = peer_id, + num_established = num_established, + ); + self.swarm + .behaviour_mut() + .gossipsub + .add_explicit_peer(&peer_id); + return Poll::Ready(Some(Event::PeerConnected(peer_id))); + } + _ => { + debug!("unhandled swarm event: {:?}", event); + trace!("Event type: {:?}", std::any::type_name_of_val(&event)); + } + } + } + + Poll::Pending + } +} + +#[cfg(test)] +mod test { + use super::*; + + use futures::{channel::oneshot, join}; + use tokio::select; + + const TEST_TOPIC: &str = "test"; + + #[tokio::test] + async fn test_gossip_two_nodes_by_dm() { + let (bootnode_tx, bootnode_rx) = oneshot::channel(); + let (alice_tx, mut alice_rx) = oneshot::channel(); + + let msg_a = b"hello world".to_vec(); + let recv_msg_a = msg_a.clone(); + let msg_b = b"i am responding".to_vec(); + let recv_msg_b = msg_b.clone(); + + let alice_handle = tokio::task::spawn(async move { + let topic = Sha256Topic::new(TEST_TOPIC); + let local_key = identity::Keypair::generate_ed25519(); + + let listen_addr = format!("/ip4/0.0.0.0/tcp/{}", 9000); + let mut alice = + Network::new(local_key, None, listen_addr, Discover::Config, 50, 1.0).unwrap(); + alice.subscribe(&topic); + + let Some(event) = alice.next().await else { + panic!("expected stream event"); + }; + + match event { + Event::NewListenAddr(addr) => { + println!("Alice listening on {:?}", addr); + bootnode_tx.send(addr.clone()).unwrap(); + } + _ => panic!("unexpected event"), + }; + + loop { + let Some(event) = alice.next().await else { + break; + }; + + match event { + Event::PeerConnected(peer_id) => { + println!("Alice connected to {:?}", peer_id); + } + Event::PeerSubscribed(peer_id, topic_hash) => { + println!("Remote peer {:?} subscribed to {:?}", peer_id, topic_hash); + alice.publish(msg_a.clone(), topic.clone()).await.unwrap(); + } + Event::Message(msg) => { + println!("Alice got message: {:?}", msg); + assert_eq!(msg.data, recv_msg_b); + alice_tx.send(()).unwrap(); + return; + } + _ => {} + } + } + }); + + let bob_handle = tokio::task::spawn(async move { + let topic = Sha256Topic::new(TEST_TOPIC); + + let bootnode = bootnode_rx.await.unwrap(); + let local_key = identity::Keypair::generate_ed25519(); + let listen_addr = format!("/ip4/0.0.0.0/tcp/{}", 9001); + let mut bob = Network::new( + local_key, + Some(vec![bootnode.to_string()]), + listen_addr, + Discover::Config, + 50, + 1.0, + ) + .unwrap(); + bob.subscribe(&topic); + + loop { + select! { + event = bob.next() => { + let Some(event) = event else { + continue; + }; + + match event { + Event::PeerConnected(peer_id) => { + println!("Bob connected to {:?}", peer_id); + } + Event::Message(msg) => { + println!("Bob got message: {:?}", msg); + assert_eq!(msg.data, recv_msg_a); + bob.publish(msg_b.clone(), topic.clone()).await.unwrap(); + } + _ => {} + } + } + _ = &mut alice_rx => { + return; + } + } + } + }); + + let (res_a, res_b) = join!(alice_handle, bob_handle); + res_a.unwrap(); + res_b.unwrap(); + } +} diff --git a/crates/gossip/src/peer_manager.rs b/crates/gossip/src/peer_manager.rs new file mode 100644 index 0000000..22d9c7a --- /dev/null +++ b/crates/gossip/src/peer_manager.rs @@ -0,0 +1,89 @@ +use crate::network::GossipnetBehaviour; +use libp2p::{PeerId, Swarm}; +use std::{collections::HashMap, time::Instant}; +use tokio::time::{interval, Duration}; + +pub struct PeerManager { + pub traffic: TrafficMonitor, + pub peer_ban_list: HashMap>, +} + +impl PeerManager { + pub fn new(fraction_to_disconnect: f64) -> Self { + Self { + traffic: TrafficMonitor::new(fraction_to_disconnect), + peer_ban_list: HashMap::new(), + } + } +} + +pub struct TrafficMonitor { + peer_traffic: HashMap, + fraction_to_disconnect: f64, +} + +impl TrafficMonitor { + pub fn new(fraction_to_disconnect: f64) -> Self { + Self { + peer_traffic: HashMap::new(), + fraction_to_disconnect, + } + } + + // Monitor peers traffic and control whether is need to disconnect. + async fn _monitor(&mut self, swarm: &mut Swarm) { + let mut interval = interval(Duration::from_secs(60)); + + loop { + interval.tick().await; + + if self.peer_traffic.is_empty() { + continue; + } + + let mut traffic_list: Vec<_> = self.peer_traffic.iter().collect(); + traffic_list.sort_by(|a, b| b.1.cmp(a.1)); + + let num_to_disconnect = + (traffic_list.len() as f64 * self.fraction_to_disconnect) as usize; + + for (peer_id, _) in traffic_list.iter().take(num_to_disconnect) { + println!("Disconnecting PeerId: {:?} due to high traffic", peer_id); + Swarm::disconnect_peer_id(swarm, **peer_id).ok(); + } + + self.peer_traffic.clear(); + } + } + + /// Return slice of ordered peers from the peer manager. Ordering + /// is based on traffic size of the peer. + pub fn top_traffic_peers(&mut self) -> Vec { + let mut result = Vec::new(); + + if !self.peer_traffic.is_empty() { + let mut traffic_list: Vec<_> = self.peer_traffic.iter().collect(); + traffic_list.sort_by(|a, b| b.1.cmp(a.1)); + + let num_to_disconnect = + (traffic_list.len() as f64 * self.fraction_to_disconnect) as usize; + + let top_peers: Vec = traffic_list + .iter() + .take(num_to_disconnect) + .map(|(&peer_id, _)| peer_id) + .collect(); + result = top_peers; + } + + result + } + + pub fn record_traffic(&mut self, peer_id: PeerId, bytes: u64) { + *self.peer_traffic.entry(peer_id).or_insert(0) += bytes; + } + + pub fn clear_peer_traffic(&mut self) { + self.peer_traffic.clear(); + } +} diff --git a/crates/vlc/src/lib.rs b/crates/vlc/src/lib.rs index a5f1827..67932f7 100644 --- a/crates/vlc/src/lib.rs +++ b/crates/vlc/src/lib.rs @@ -129,6 +129,9 @@ impl Clock { #[cfg(test)] mod tests { use super::*; + use bincode::Options; + use sha2::Sha256; + use sha2::Digest; #[test] fn clock_inc() { @@ -174,4 +177,57 @@ mod tests { assert_eq!(c3.partial_cmp(&c1), Some(cmp::Ordering::Less)); assert_eq!(c1.partial_cmp(&c3), Some(cmp::Ordering::Greater)); } + + #[test] + #[ignore] + fn clock_serialize() { + let mut c1 = Clock::new(); + c1.inc(0); + c1.inc(1); + c1.inc(1); + c1.inc(2); + c1.inc(3); + let ser1 = bincode::options().serialize(&c1).unwrap(); + + let mut c2 = Clock::new(); + c2.inc(0); + c2.inc(1); + c2.inc(1); + c2.inc(2); + c2.inc(3); + let ser2 = bincode::options().serialize(&c2).unwrap(); + + println!("{:?}, {:?}", c1, c2); + assert_eq!(c1, c2); // ignore diff order, random + // not equal, no order + assert_ne!(ser1, ser2); + } + + #[test] + #[ignore] + fn clock_sha256() { + let mut c1 = Clock::new(); + c1.inc(0); + c1.inc(1); + c1.inc(1); + c1.inc(2); + let ser1 = bincode::options().serialize(&c1).unwrap(); + + let mut f_hasher_1 = Sha256::new(); + f_hasher_1.update(ser1.clone()); + let hash_1 = f_hasher_1.finalize(); + + let unser1 = bincode::options().deserialize::(&ser1).unwrap(); + assert_eq!(c1, unser1); // ignore diff order + + // not equal + let ser2 = bincode::options().serialize(&unser1).unwrap(); + assert_ne!(ser1, ser2); + + // not equal + let mut f_hasher_2 = Sha256::new(); + f_hasher_2.update(ser2); + let hash_2 = f_hasher_2.finalize(); + assert_ne!(hash_1, hash_2); + } } diff --git a/crates/vlc/src/ordinary_clock.rs b/crates/vlc/src/ordinary_clock.rs index c1c8d24..ec3ae75 100644 --- a/crates/vlc/src/ordinary_clock.rs +++ b/crates/vlc/src/ordinary_clock.rs @@ -1,9 +1,9 @@ //! This clock use the BTreeMap as its core data structure. -use std::{cmp::Ordering, collections::BTreeMap}; -use serde::{Deserialize, Serialize}; -use sha2::{Sha256, Digest}; use bincode::Options; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::{cmp::Ordering, collections::BTreeMap}; pub trait Clock: PartialOrd + Clone + Send + Sync + 'static { fn reduce(&self) -> LamportClock; @@ -17,7 +17,7 @@ impl Clock for LamportClock { } } -/// clock key_id +/// clock key_id pub type KeyId = u64; #[derive( @@ -80,7 +80,9 @@ impl OrdinaryClock { pub fn calculate_sha256(&self) -> [u8; 32] { let mut hasher = Sha256::new(); - let data = bincode::options().serialize(&self.0).expect("Failed to serialize data"); + let data = bincode::options() + .serialize(&self.0) + .expect("Failed to serialize data"); // Update the hasher with the JSON string hasher.update(data); @@ -135,16 +137,23 @@ impl Clock for OrdinaryClock { } } - #[cfg(test)] mod tests { use super::*; - use std::{sync::{atomic::{AtomicUsize, Ordering}, Arc}, time::{Duration, Instant}}; - use rand::rngs::OsRng; + use crypto::{ + core::DigestHash, + recovery::{recover_public_key, sign_message_recover_pk}, + }; use futures::future::join_all; + use rand::rngs::OsRng; + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, + }; use tokio::runtime::Builder; - use crypto::{core::DigestHash, recovery::{recover_public_key, sign_message_recover_pk}}; - #[test] fn default_is_genesis() -> anyhow::Result<()> { @@ -158,25 +167,28 @@ mod tests { clock1.insert(1, 10); clock1.insert(2, 0); clock1.insert(3, 5); - + let mut clock2 = BTreeMap::new(); clock2.insert(1, 0); clock2.insert(2, 20); clock2.insert(3, 2); - + let mut clock3 = BTreeMap::new(); clock3.insert(1, 7); clock3.insert(2, 15); clock3.insert(4, 8); - + let oc1 = OrdinaryClock(clock1); let oc2 = OrdinaryClock(clock2); let oc3 = OrdinaryClock(clock3); - + let clocks = vec![&oc1, &oc2, &oc3]; let base_clock = OrdinaryClock::base(clocks.into_iter()); println!("{:?}", base_clock); // Should print: OrdinaryClock({1: 0, 2: 0, 3: 2, 4: 8}) - assert_eq!(base_clock, OrdinaryClock(BTreeMap::from([(1, 0), (2, 0), (3, 2), (4, 8)]))); + assert_eq!( + base_clock, + OrdinaryClock(BTreeMap::from([(1, 0), (2, 0), (3, 2), (4, 8)])) + ); Ok(()) } @@ -185,7 +197,7 @@ mod tests { let mut clock = OrdinaryClock((0..4).map(|i| (i as _, 0)).collect()); clock = clock.update(vec![OrdinaryClock::default()].iter(), 0); println!("{:?}, {:?}", clock, clock.calculate_sha256()); - + // Tips: when clock is hashmap, this serialize and sha256 can't reproduce, every time is different. Ok(()) } @@ -193,7 +205,7 @@ mod tests { #[test] #[ignore] fn hash_big_clock_sha256() -> anyhow::Result<()> { - let clock = OrdinaryClock((0..1<<27).map(|i| (i as _, 0)).collect()); + let clock = OrdinaryClock((0..1 << 27).map(|i| (i as _, 0)).collect()); let start_time = Instant::now(); let clock_hash = clock.sha256().to_fixed_bytes(); println!("{:?}, {:?}", clock_hash, start_time.elapsed()); @@ -216,7 +228,8 @@ mod tests { break; } - let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + let updated_clock = + current_clock.update(vec![clock.clone(); num_merged].iter(), 0); count += 1; current_clock = updated_clock; } @@ -224,22 +237,21 @@ mod tests { }; close_loops_session.await?; - println!( - "key {size},merged {num_merged}, tps {}", - count as f32 / 10. - ); + println!("key {size},merged {num_merged}, tps {}", count as f32 / 10.); } Ok(()) } - + #[tokio::test] #[ignore] async fn stress_raw_update_concurrency() -> anyhow::Result<()> { let core = num_cpus::get(); - let rt = Arc::new(Builder::new_multi_thread() - .worker_threads(core) - .build() - .unwrap()); + let rt = Arc::new( + Builder::new_multi_thread() + .worker_threads(core) + .build() + .unwrap(), + ); for size in (0..=12).step_by(2).map(|n| 1 << n) { let count = Arc::new(AtomicUsize::new(0)); @@ -251,7 +263,7 @@ mod tests { for size in shifts { let num_merged = 0; let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); - + let count_clone = Arc::clone(&count); let start_time = Instant::now(); let close_loops_session = async move { @@ -261,8 +273,9 @@ mod tests { if start_time.elapsed() >= Duration::from_secs(10) { break; } - - let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + + let updated_clock = + current_clock.update(vec![clock.clone(); num_merged].iter(), 0); count_clone.fetch_add(1, Ordering::Relaxed); current_clock = updated_clock; } @@ -275,7 +288,7 @@ mod tests { let clock = result?; println!("key: {}, clock: {:?}", size, clock.0.get(&0)); } - + println!( "key {}, merged 0, tps {}", size, @@ -296,15 +309,17 @@ mod tests { let secp = secp256k1::Secp256k1::new(); let (secret_key, public_key) = secp.generate_keypair(&mut OsRng); - + for size in (0..=12).step_by(2).map(|n| 1 << n) { let num_merged = 0; let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); let clock_hash = clock.sha256().to_fixed_bytes(); let mut count = 0; - + // sign once - let signature_recover = sign_message_recover_pk(&secp, &secret_key, &clock.sha256().to_fixed_bytes()).unwrap(); + let signature_recover = + sign_message_recover_pk(&secp, &secret_key, &clock.sha256().to_fixed_bytes()) + .unwrap(); let start_time = Instant::now(); let close_loops_session = async { @@ -313,11 +328,51 @@ mod tests { if start_time.elapsed() >= Duration::from_secs(10) { break; } - + // verify - let recover_pubkey = recover_public_key(&secp, &signature_recover, &clock_hash).unwrap(); + let recover_pubkey = + recover_public_key(&secp, &signature_recover, &clock_hash).unwrap(); assert_eq!(recover_pubkey, public_key); + // update + let updated_clock = + current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + count += 1; + current_clock = updated_clock; + } + anyhow::Ok(()) + }; + + close_loops_session.await?; + println!("key {size},merged {num_merged}, tps {}", count as f32 / 10.); + } + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn stress_signature_update() -> anyhow::Result<()> { + use DigestHash as _; + + let secp = secp256k1::Secp256k1::new(); + let (secret_key, _public_key) = secp.generate_keypair(&mut OsRng); + for size in (0..=12).step_by(2).map(|n| 1 << n) { + let num_merged = 0; + let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); + let mut count = 0; + + let start_time = Instant::now(); + let close_loops_session = async { + let mut current_clock = clock.clone(); + loop { + if start_time.elapsed() >= Duration::from_secs(10) { + break; + } + + // sign + let clock_hash = clock.sha256().to_fixed_bytes(); + sign_message_recover_pk(&secp, &secret_key, &clock_hash).unwrap(); + // update let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); count += 1; @@ -334,7 +389,7 @@ mod tests { } Ok(()) } - + #[tokio::test] #[ignore] async fn stress_signature_verify_update() -> anyhow::Result<()> { @@ -342,7 +397,7 @@ mod tests { let secp = secp256k1::Secp256k1::new(); let (secret_key, public_key) = secp.generate_keypair(&mut OsRng); - + for size in (0..=12).step_by(2).map(|n| 1 << n) { let num_merged = 0; let clock = OrdinaryClock((0..size).map(|i| (i as _, 0)).collect()); @@ -356,33 +411,36 @@ mod tests { if start_time.elapsed() >= Duration::from_secs(10) { break; } - + // verify if !signatures.is_none() { let clock_hash = current_clock.sha256().to_fixed_bytes(); - let recover_pubkey = recover_public_key(&secp, &signatures.unwrap(), &clock_hash).unwrap(); + let recover_pubkey = + recover_public_key(&secp, &signatures.unwrap(), &clock_hash).unwrap(); assert_eq!(recover_pubkey, public_key); } // update - let updated_clock = current_clock.update(vec![clock.clone(); num_merged].iter(), 0); + let updated_clock = + current_clock.update(vec![clock.clone(); num_merged].iter(), 0); count += 1; current_clock = updated_clock; - + // sign - let signature_recover = sign_message_recover_pk(&secp, &secret_key, ¤t_clock.sha256().to_fixed_bytes()); - signatures = Some(signature_recover.unwrap()); + let signature_recover = sign_message_recover_pk( + &secp, + &secret_key, + ¤t_clock.sha256().to_fixed_bytes(), + ) + .unwrap(); + signatures = Some(signature_recover); } anyhow::Ok(()) }; close_loops_session.await?; - println!( - "key {size},merged {num_merged}, tps {}", - count as f32 / 10. - ); + println!("key {size},merged {num_merged}, tps {}", count as f32 / 10.); } Ok(()) } - -} \ No newline at end of file +} diff --git a/demos/README.md b/demos/README.md index 410bc9d..f9db1bb 100644 --- a/demos/README.md +++ b/demos/README.md @@ -25,7 +25,7 @@ Randomness serves a vital role in nearly every aspect of current society,the i #### VLC & VRF Proposal Randomness serves a vital role in nearly every aspect of current society,the idea is to intergrate the ablility of logical clocks into random generator. To generate verifiable, fair random numbers, the proposal integrates VRF. -## [tee_vlc](./tee_vlc/) +## [TEE_VLC](./tee_vlc/) This module verifiable logic clock is an implementation of Chronos's TEE backend. @@ -34,6 +34,14 @@ And some features as follow: * Use the aws nitro enclave as its trust execution environment. * Support functions test and press test cases. +## [Test_VLC_Net](./test_vlc_net/) + +This module is designed for testing the VLC network. It combines VLC and gossip protocols to construct a multi-node network. + +Features include: +* Configuration: It uses various commands to define details of the VLC network. +* Demonstration: The Test_VLC_Net will collect useful data metrics during operation. +* Streamlined workflow: This program maintains concise and core workflows for better testing. ## [Test-Conflict](./test_conflict/) diff --git a/demos/tee_vlc/Cargo.toml b/demos/tee_vlc/Cargo.toml index 44c62ac..297edbb 100644 --- a/demos/tee_vlc/Cargo.toml +++ b/demos/tee_vlc/Cargo.toml @@ -4,11 +4,11 @@ version = "0.1.0" edition = "2021" [features] -ordinary = [ - "nitro-enclaves", - "reqwest", +ordinary = ["nitro-enclaves", "reqwest"] +nitro-enclaves = [ + "aws-nitro-enclaves-nsm-api", + "aws-nitro-enclaves-attestation", ] -nitro-enclaves = ["aws-nitro-enclaves-nsm-api", "aws-nitro-enclaves-attestation"] [dependencies] @@ -20,16 +20,32 @@ derive-where = "1.2.7" tracing = "0.1.40" tracing-subscriber = "0.3.18" rand = "0.8.5" -vlc ={ path = "../../crates/vlc", version = "0.1.0"} -types ={ path = "../../crates/types", version = "0.1.0"} -crypto ={ path = "../../crates/crypto", version = "0.1.0"} -enclaves ={ path = "../../crates/enclaves", version = "0.1.0"} +vlc = { path = "../../crates/vlc", version = "0.1.0" } +types = { path = "../../crates/types", version = "0.1.0" } +crypto = { path = "../../crates/crypto", version = "0.1.0" } +enclaves = { path = "../../crates/enclaves", features = [ + "nitro-enclaves", +], version = "0.1.0" } serde = { version = "1.0.195", features = ["derive"] } nix = { version = "0.28.0", features = ["socket", "sched", "resource"] } -tikv-jemallocator = { version = "0.5.4", optional = true } -tokio = { version = "1.35.1", features = ["net", "time", "sync", "rt", "signal", "macros", "rt-multi-thread", "fs", "process", "io-util"] } +tikv-jemallocator = { version = "0.6.0", optional = true } +tokio = { version = "1.35.1", features = [ + "net", + "time", + "sync", + "rt", + "signal", + "macros", + "rt-multi-thread", + "fs", + "process", + "io-util", +] } tokio-util = "0.7.10" anyhow = { version = "1.0.79", features = ["backtrace"] } -reqwest = { version = "0.12.4", features = ["json", "multipart"], optional = true } +reqwest = { version = "0.12.4", features = [ + "json", + "multipart", +], optional = true } aws-nitro-enclaves-nsm-api = { version = "0.4.0", optional = true } aws-nitro-enclaves-attestation = { git = "https://github.com/neatsys/aws-nitro-enclaves-attestation", version = "0.1.0", optional = true } diff --git a/demos/tee_vlc/src/bin/call_vlc_client.rs b/demos/tee_vlc/src/bin/call_vlc_client.rs index 1f585c4..3b9f93e 100644 --- a/demos/tee_vlc/src/bin/call_vlc_client.rs +++ b/demos/tee_vlc/src/bin/call_vlc_client.rs @@ -14,6 +14,7 @@ use tokio::{ // tee id const CID: u32 = 16; +const INITIAL_ZERO_VALUE: u64 = 0; #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { @@ -119,7 +120,7 @@ where let clock = C::try_from(OrdinaryClock((0..size).map(|i| (i as _, 0)).collect())).map_err(Into::into)?; let start = Instant::now(); - update_sender.send(Update(clock, Default::default(), 0))?; + update_sender.send(Update(clock, Default::default(), INITIAL_ZERO_VALUE))?; let Some((_, clock, elapsed)) = update_ok_receiver.recv().await else { anyhow::bail!("missing UpdateOk") }; @@ -131,7 +132,7 @@ where for _ in 0..5 { sleep(Duration::from_millis(100)).await; - let update = Update(clock.clone(), vec![clock.clone(); num_merged], 0); + let update = Update(clock.clone(), vec![clock.clone(); num_merged], INITIAL_ZERO_VALUE); let start = Instant::now(); update_sender.send(update)?; let Some((_, clock, elapsed_in_tee)) = update_ok_receiver.recv().await else { diff --git a/demos/test_vlc_net/Cargo.toml b/demos/test_vlc_net/Cargo.toml new file mode 100644 index 0000000..2948f7a --- /dev/null +++ b/demos/test_vlc_net/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "test_vlc_net" +version = "0.1.0" +edition = "2021" + +[dependencies] +gossip = { path = "../../crates/gossip" } +vlc = { path = "../../crates/vlc" } +libp2p = { version = "0.54.1", features = [ + "gossipsub", + "mdns", + "noise", + "macros", + "ping", + "tcp", + "tokio", + "yamux", +] } +tokio = { version = "1.40.0", features = ["full"] } +tokio-stream = { version = "0.1", features = ["fs", "io-util"] } +console-subscriber = "0.4.0" +ed25519-dalek = "2.1.1" +bincode = "1.3.3" +sha2 = "0.10.8" +base58 = "0.2.0" +base64 = "0.22" +num_cpus = "1.16.0" +sysinfo = "0.32.0" +anyhow = { version = "1.0.90", features = ["backtrace"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +clap = { version = "4.5.17", features = ["derive"] } +tracing = "0.1.40" +jemalloc_pprof = "0.6.0" +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = { version = "0.6.0", features = [ + "profiling", + "unprefixed_malloc_on_supported_platforms", +] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } diff --git a/demos/test_vlc_net/README.md b/demos/test_vlc_net/README.md new file mode 100644 index 0000000..87c049d --- /dev/null +++ b/demos/test_vlc_net/README.md @@ -0,0 +1,154 @@ +# Test VLC Net + +This module is mainly convenient for testing vlc network. It combines vlc and gossip for constructing the multi-node network. + +And some features as follow: +- Configuration: It uses the variety commands to define detail of the vlc network. +- Demonstrate: The test_vlc_net will collect useful data metric in work period. +- Streamlined workflow: This program keep concise and core workflows for better testing. + +## Compile server node + +```bash +# For compling tikv-jemalloc-sys +sudo apt-get install make + +# if enable the tokio console function, need export env variate. Then build +export RUSTFLAGS="--cfg tokio_unstable" + +cargo build -p test_vlc_net +cd target/debug +``` + +## Generate config files + +### DHT feature + +If using DHT as the discovery protocol, first generate the `bootnode` configuration. + +```bash +./test_vlc_net --server server0.json generate --host /ip4/0.0.0.0/tcp/ --max-discover-node 4096 +cat server0.json +``` + +### Business node configuration + +Generate configurations for other business nodes: + +```bash +for I in {1..4} +do + PORT=$((9600 + I)) + TOKIO_CONSOLE_PORT=$((6669 + I)) + ./test_vlc_net --server server"$I".json generate \ + --host /ip4/127.0.0.1/tcp/ \ + --port "$PORT" \ + --topic vlc \ + --trigger-us 1000 \ + --bootnodes "/ip4/127.0.0.1/tcp/9601" \ + --max-sys-memory-percent 80 \ + --max-proc-memory-mb 8192 \ + --enable-tx-send \ + --tokio-console-port "$TOKIO_CONSOLE_PORT" +done +``` + +**Note**: Replace the `--bootnodes` value with the `multi_addr` field from the bootnode config file `server0.json`. +**Example**: `/ip4/127.0.0.1/tcp/9600/p2p/12D3KooWQt4eiRVEZGFcThutsLvUbJS2rgLJ9QnE1ptp9ZmjRMay` + +### Configuration parameters explained + +* --server: Specifies the output config file name +* --host: Sets the IP address and protocol for the node +* --port: Sets the port number for the node +* --topic: Defines the topic for the VLC network +* --trigger-us: Sets the trigger interval in microseconds +* --bootnodes: Specifies the address of the bootstrap node(s) +* --enable-tx-send: Enables transaction sending capability +* --tokio-console-port: Sets the port for the tokio console (if enabled) +* --max-sys-memory-percent: Sets the maximum system memory usage for sending event. +* --disconnect-rate: Sets the disconnect rate with peers when memory almost exhausted. +* --max-proc-memory-mb: Set the maximum process memory usage for keeping network connection. + + +## Run multi-node network + +### Start the bootnode (if using DHT) + +If DHT is enabled, first start the bootnode: + +```bash +nohup ./test_vlc_net --server server0.json run >>server0.log 2>&1 & +``` + +### Start business nodes + +Run the following script to start multiple business nodes: + +```bash +for I in {1..4} +do + nohup ./test_vlc_net --server server"$I".json run >>server.log 2>&1 & +done + +# if enable the tokio console function for watching debug metric +cargo install tokio-console +tokio-console +``` + +### Monitor with tokio console (optional) + +If you've enabled the tokio console function and want to monitor runtime metrics: + +1.Install the tokio-console tool: + +```bash +cargo install tokio-console +``` + +2.Run the console: + +```bash +tokio-console +``` + +This will allow you to view detailed tokio runtime information about your VLC network nodes. + +## Monitor network metrics + +To view the log output of all nodes: + +```bash +tail -f server.log +``` + +To filter and watch only the connected node count: + +```bash +tail -F server.log | grep --line-buffered "Connected node num" +``` + +## Gracefully stop the network + +### To stop all running nodes: + +```bash +pkill -f 'test_vlc_net.*run' +``` + +**Note**: This command will terminate all processes that match the pattern 'test_vlc_net.*run'. Ensure you don't have any other important processes matching this pattern before running it. + +For a more selective approach, you can stop nodes individually using their process IDs: + +1.Find the process IDs: + +```bash +ps aux | grep '[t]est_vlc_net.*run' +``` + +2.Stop each process using its ID: +```bash +kill +``` + +Replace `` with the actual process ID from the previous command's output. \ No newline at end of file diff --git a/demos/test_vlc_net/scripts/multi_node_test.sh b/demos/test_vlc_net/scripts/multi_node_test.sh new file mode 100755 index 0000000..62c74c2 --- /dev/null +++ b/demos/test_vlc_net/scripts/multi_node_test.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +# ./multi_node_test.sh aws_machines.txt https://github.com/hetu-project/chronos.git + +if [ "$#" -ne 2 ]; then + echo "Usage: \$0 " + exit 1 +fi + +AWS_MACHINE_LIST="$1" +CLEAN_MODE="no" +if [ "$2" = "clean" ]; then + CLEAN_MODE="yes" +else + GIT_REPOSITORY_URL="$2" +fi + +if [ ! -f "$AWS_MACHINE_LIST" ]; then + echo "File $AWS_MACHINE_LIST does not exist." + exit 1 +fi + +if [ "$CLEAN_MODE" = "yes" ]; then + while IFS= read -r machine_ip; do + echo "Clean to $machine_ip" + + ssh -i ~/.ssh/vlc_net_test.pem -o "StrictHostKeyChecking=no" ubuntu@"$machine_ip" -y <<'EOF' + rm test_vlc_net server* + pkill -f 'test_vlc_net --server' +EOF + done <"$AWS_MACHINE_LIST" + exit 1 +fi + +echo "Update the repo dependency..." +sudo apt update -y +sudo apt install -y "Development Tools" +sudo apt install -y curl + +echo "Install rust buildtools" +if ! command -v rustc &>/dev/null; then + echo "Rust is not installed. Installing Rust..." + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs -o rustup.sh + sh rustup.sh -y + # shellcheck disable=SC1091 + source "$HOME/.cargo/env" + rustc --version +else + echo "Rust is already installed." +fi + +echo "Starting local build..." +if command -v ./test_vlc_net &>/dev/null; then + echo "test_vlc_net is already installed." + EXECUTABLE_PATH="./test_vlc_net" +else + git clone "$GIT_REPOSITORY_URL" + REPO_DIR=$(basename "$GIT_REPOSITORY_URL" .git) + + cd "$REPO_DIR" || exit + git checkout poc_1 + + cargo build + cargo build -p test_vlc_net + + EXECUTABLE_PATH="./target/release/test_vlc_net" +fi + +if [ ! -f "$EXECUTABLE_PATH" ]; then + echo "Build failed: $EXECUTABLE_PATH not found." + exit 1 +fi + +while IFS= read -r machine_ip; do + echo "Deploying to $machine_ip" + scp -i ~/.ssh/vlc_net_test.pem -o "StrictHostKeyChecking=no" $EXECUTABLE_PATH ubuntu@"$machine_ip":/home/ubuntu/test_vlc_net + + ssh -i ~/.ssh/vlc_net_test.pem -o "StrictHostKeyChecking=no" ubuntu@"$machine_ip" <<"EOF" + # sudo apt update -y + chmod +x /home/ubuntu/test_vlc_net + echo "generate" + for I in {1..10} + do + PORT=$((9600 + I)) + TOKIO_CONSOLE_PORT=$((6669 + I)) + ./test_vlc_net --server server"$I".json generate --host /ip4/0.0.0.0/tcp/ --port "$PORT" --topic vlc --trigger-us 1000000 \ + --bootnodes "/ip4/213.136.78.134/tcp/9600/p2p/12D3KooWFuHHoBaXFy3ahQfTtd2Unyhwyvwnww1YXzbXaUyhRbxA" --concurrent-verify 100 \ + --tokio-console-port "$TOKIO_CONSOLE_PORT" --max-discover-node 2 --time-window-s 5 + done + + echo "run node" + for I in {1..10} + do + nohup ./test_vlc_net --server server"$I".json run >>server.log 2>&1 & + done +EOF +done <"$AWS_MACHINE_LIST" + +echo "Deployment completed." diff --git a/demos/test_vlc_net/scripts/watch_dog.py b/demos/test_vlc_net/scripts/watch_dog.py new file mode 100644 index 0000000..13f4691 --- /dev/null +++ b/demos/test_vlc_net/scripts/watch_dog.py @@ -0,0 +1,65 @@ +# sudo apt install python3-pip +# pip install watchdog --break-system-packages +# python3 watch_dog.py /path/to/your/logfile.log --port 8000 + +import os +import http.server +import socketserver +import argparse +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +parser = argparse.ArgumentParser(description='Simple HTTP server to view log files.') +parser.add_argument('log_file_path', type=str, help='Path to the log file to be monitored') +parser.add_argument('--port', type=int, default=8000, help='Port to serve HTTP requests (default: 8000)') +args = parser.parse_args() + +LOG_FILE_PATH = args.log_file_path +PORT = args.port + +class LogFileHandler(FileSystemEventHandler): + def __init__(self, file_path): + self.file_path = file_path + self._content = self._read_file() + + def _read_file(self): + try: + with open(self.file_path, 'r') as file: + return file.read() + except (FileNotFoundError, PermissionError) as e: + print(f"Error reading log file: {e}") + return "" + + def on_modified(self, event): + if event.src_path == self.file_path: + self._content = self._read_file() + + @property + def content(self): + return self._content + +log_handler = LogFileHandler(LOG_FILE_PATH) +observer = Observer() +observer.schedule(log_handler, os.path.dirname(LOG_FILE_PATH), recursive=False) +observer.start() + +class CustomHandler(http.server.SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == '/logs': + self.send_response(200) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(log_handler.content.encode()) + else: + self.send_response(404) + self.end_headers() + +with socketserver.TCPServer(("", PORT), CustomHandler) as httpd: + print(f"Serving at port {PORT}") + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("Server stopped by user") + finally: + observer.stop() + observer.join() diff --git a/demos/test_vlc_net/src/main.rs b/demos/test_vlc_net/src/main.rs new file mode 100644 index 0000000..af11667 --- /dev/null +++ b/demos/test_vlc_net/src/main.rs @@ -0,0 +1,191 @@ +pub mod server; +use anyhow::anyhow; +use clap::{Parser, Subcommand}; +use server::{generate_config, run, ServerConfig}; +use std::net::Ipv4Addr; +use std::{fs::File, io::Write, process, sync::Arc}; +use tracing::*; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[derive(Parser, Debug)] +#[command(author="Hetu Protocol", version="0.1", about="This server be used to test vlc network.", long_about = None)] +struct Args { + #[arg(short, long, default_value = "server.json")] + server: String, + + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + Run, + Generate { + #[arg(long, default_value = "/ip4/127.0.0.1/tcp/")] + host: String, + #[arg(long, default_value = "9600")] + port: u16, + #[arg(long, default_value = "vlc")] + topic: String, + #[arg(long)] + bootnodes: Option>, + #[arg(long, default_value = "dht")] + discover: String, + #[arg(long, default_value = "5")] + max_discover_node: u32, // max target peer count + #[arg(long, default_value = "1.0")] + disconnect_rate: f64, // disconnet rate with peers when memory exhausted + #[arg(long, default_value = "false")] + enable_tx_send: bool, + #[arg(long, default_value = "0")] // cpu cores + concurrent_verify: u64, + #[arg(long, default_value = "1")] + time_window_s: u64, + #[arg(long, default_value = "1000")] + trigger_us: u64, + #[arg(long, default_value = "80")] + max_sys_memory_percent: u64, + #[arg(long, default_value = "8192")] + max_proc_memory_mb: u64, + #[arg(long, default_value = "0")] + init_clock_keys: u32, + #[arg(long, default_value = "10")] // Hello Hetu + payload_bytes_len: u64, + #[arg(long, default_value = "false")] + print_vlc: bool, + #[arg(long, default_value = "false")] + tokio_console: bool, + #[arg(long, default_value = "6669")] + tokio_console_port: u16, + #[arg(long, default_value = "false")] + pprof: bool, + }, +} + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + +#[tokio::main] +async fn main() { + // set default log level: INFO + let rust_log = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); + let fmt_layer = fmt::layer().with_filter(EnvFilter::new(rust_log)); + let log_layers = tracing_subscriber::registry().with(fmt_layer); + + // graceful exit process + let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>(); + let shutdown_signal = async move { + tokio::signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + shutdown_sender + .send(()) + .expect("Failed to send shutdown signal"); + }; + + let args = Args::parse(); + match &args.command { + Command::Run => { + let file = File::open(&args.server) + .expect("Server configuration file is not exist,failed to open"); + let reader = std::io::BufReader::new(file); + let config: ServerConfig = serde_json::from_reader(reader) + .expect("Format of configuration file is not correct"); + if config.tokio_console { + let console_layer = console_subscriber::ConsoleLayer::builder() + .server_addr((Ipv4Addr::LOCALHOST, config.tokio_console_port)) + .spawn(); + log_layers.with(console_layer).init(); + } else { + log_layers.init(); + } + let is_pprof = if config.pprof { + #[allow(non_upper_case_globals)] + #[export_name = "malloc_conf"] + pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; + info!("prof:true,prof_active:true,lg_prof_sample:19"); + Some(true) + } else { + None + }; + + info!("start test vlc node server"); + tokio::select! { + _ = shutdown_signal => { + info!("Received Ctrl+C, shutting down"); + if let Some(_guard) = is_pprof { + let mut heap_file = File::create("heap.pb.gz").expect("Failed to create jemalloc heap file"); + if let Ok(jemalloc) = jemalloc_activated().await { + heap_file.write_all(&jemalloc).expect("Failed to write to jemalloc heap file"); + } + } + } + _ = shutdown_receiver => { + info!("Shutdown signal received"); + } + result = run(Arc::new(config.clone())) => { + if let Err(e) = result { + error!("Server error: {:?}", e); + } + } + } + process::exit(0); + } + Command::Generate { + host, + port, + topic, + bootnodes, + discover, + max_discover_node, + enable_tx_send, + concurrent_verify, + trigger_us, + max_sys_memory_percent, + max_proc_memory_mb, + disconnect_rate, + time_window_s, + init_clock_keys, + payload_bytes_len, + print_vlc, + tokio_console, + tokio_console_port, + pprof, + .. + } => { + info!("start generate vlc node config file"); + generate_config( + host.clone(), + *port, + topic.clone(), + bootnodes.clone(), + discover.clone(), + *max_discover_node, + *enable_tx_send, + *concurrent_verify, + *trigger_us, + *max_sys_memory_percent, + *max_proc_memory_mb, + *disconnect_rate, + *time_window_s, + *init_clock_keys, + *payload_bytes_len, + *print_vlc, + *tokio_console, + *tokio_console_port, + *pprof, + &args.server, + ); + } + } +} + +pub async fn jemalloc_activated() -> Result, anyhow::Error> { + let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await; + if prof_ctl.activated() { + let pprof = prof_ctl.dump_pprof()?; + return Ok(pprof); + } + Err(anyhow!("jemalloc profiling is not activated")) +} diff --git a/demos/test_vlc_net/src/server.rs b/demos/test_vlc_net/src/server.rs new file mode 100644 index 0000000..bc33cd1 --- /dev/null +++ b/demos/test_vlc_net/src/server.rs @@ -0,0 +1,993 @@ +use anyhow::anyhow; +use base58::FromBase58; +use base64::{engine::general_purpose, Engine as _}; +use bincode::Options; +use gossip::network::{Discover, Event, Network, Sha256Topic}; +use libp2p::{ + futures::StreamExt, + identity::{self, Keypair, PublicKey}, + multiaddr, PeerId, +}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::io::Write; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; +use std::{fs::File, str::FromStr}; +use sysinfo::{ProcessesToUpdate, System}; +use tokio::{ + sync::{mpsc, watch, OwnedSemaphorePermit, RwLock, Semaphore}, + time::{self, sleep, timeout, Duration}, +}; +use tracing::{debug, error, info, warn}; +use vlc::ordinary_clock::OrdinaryClock; + +const _DEFAULT_PAYLOAD: &str = "Hello Hetu"; +const NET_TIMEOUT: u64 = 1; // one second +const MB_TO_BYTES: u64 = 1024 * 1024; // 1 MB = 1024 * 1024 bytes + +// Metric visualization can utilities prometheus, and grafana toolboxs. +#[derive(Debug, Clone)] +struct OuterMsg { + data: Vec, + topic: Sha256Topic, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct P2PMsg { + clock: OrdinaryClock, + data: Vec, + pub_key: Option>, + signature: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct ServerConfig { + pub host: String, + pub port: u16, + pub topic: String, + pub private_key: String, + pub multi_addr: String, + pub bootnodes: Option>, + pub discover: Discover, + pub max_discover_node: u32, + pub enable_tx_send: bool, + pub concurrent_verify: u64, + pub trigger_us: u64, + pub max_sys_memory_percent: u64, + pub min_sys_memory_percent: u64, + pub max_proc_memory_mb: u64, + pub disconnect_rate: f64, + pub time_window_s: u64, + pub init_clock_keys: u32, + pub payload_bytes_len: u64, + pub print_vlc: bool, + pub tokio_console: bool, + pub tokio_console_port: u16, + pub pprof: bool, +} + +pub struct Server { + pub config: Arc, + pub node_id: u64, + pub peer_id: PeerId, + pub local_key: Keypair, + pub state: Arc>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StateUpdate { + IncrementSendCount(usize), + IncrementReceiveCount(usize, VLCUpdate), + UpdateClock(VLCUpdate), + Exit(()), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VLCUpdate { + others: Vec, + id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MemoryWatcherConfig { + is_bootnode: bool, + is_sender: bool, + proc_max_mem_mb: u64, + sys_max_mem_usage: f64, + sys_min_mem_usage: f64, + sys_disconnect_mem: f64, +} + +impl Server { + fn generate_data_payload(payload_bytes_len: usize) -> String { + if payload_bytes_len <= 10 { + _DEFAULT_PAYLOAD.to_string() + } else { + format!("{}{}", _DEFAULT_PAYLOAD, "A".repeat(payload_bytes_len - 10)) + } + } + + fn mb_to_bytes(mb: u64) -> u64 { + mb * MB_TO_BYTES + } + + fn identity_to_u64(identity: &str) -> Option { + let bytes = identity.from_base58().ok()?; + + // Hash the bytes using SHA-256 + let hash = Sha256::digest(&bytes); + + // Take the first 8 bytes of the hash and convert to u64 + let mut u64_bytes = [0u8; 8]; + u64_bytes.copy_from_slice(&hash[..8]); + + Some(u64::from_be_bytes(u64_bytes)) + } + + fn sys_memory_usage(system_infos: &mut System) -> f64 { + system_infos.refresh_memory(); + + let total_memory = system_infos.total_memory() as f64; + let used_memory = system_infos.used_memory() as f64; + (used_memory / total_memory) * 100.0 + } + + /// Return memory usage of process in bytes + fn process_memory_usage(sys: &mut System) -> u64 { + sys.refresh_processes(ProcessesToUpdate::All, true); + + let pid = sysinfo::get_current_pid().expect("Failed to get current PID"); + + if let Some(process) = sys.process(pid) { + process.memory() + } else { + 0 + } + } + + async fn event_trigger_interval( + self: Arc, + tx: mpsc::Sender, + rx_open: watch::Receiver, + ) { + if self.config.bootnodes.is_none() { + return; + } + + if self.config.enable_tx_send { + let target_interval = Duration::from_micros(self.config.trigger_us); + let min_interval = Duration::from_millis(1000); + let mut current_interval = target_interval; + + let node_id = self.node_id; + let pub_key = self.local_key.public().encode_protobuf(); + let topic = Sha256Topic::new(self.config.topic.clone()); + let payload_data = + Server::generate_data_payload(self.config.payload_bytes_len as usize); + + let mut interval = time::interval(current_interval); + loop { + interval.tick().await; // control tick time + + if !*rx_open.borrow() { + if current_interval < min_interval { + current_interval = current_interval * 4; + if current_interval > min_interval { + current_interval = min_interval; + } + interval = time::interval(current_interval); + info!("Event trigger task start exponential damping due to high memory usage."); + } + } + + let self_clone = self.clone(); + self_clone + .event_trigger_task(node_id, &payload_data, &pub_key, &topic, &tx) + .await; + + if *rx_open.borrow() && current_interval > target_interval { + current_interval = target_interval; + interval = time::interval(current_interval); + } + } + } + } + + async fn event_trigger_task( + self: Arc, + node_id: u64, + payload_data: &String, + pub_key: &Vec, + topic: &Sha256Topic, + tx: &mpsc::Sender, + ) -> bool { + // Update state and prepare message + let (event, message, message_len, send_counts) = match self + .clone() + .update_prepare_msg(node_id, payload_data, pub_key) + .await + { + Some(value) => value, + None => return false, + }; + + // send message with vlc + if send_counts % 100 == 0 { + debug!("Publish event msg: {:?}", event); + } + let value = OuterMsg { + data: message, + topic: topic.clone(), + }; + if timeout(Duration::from_secs(NET_TIMEOUT), tx.send(value)) + .await + .is_err() + { + error!("Network task has been closed, or publish to network timeout. Retrying in 5 second..."); + sleep(Duration::from_secs(5)).await; + return false; + } + + // Increment send count + { + let mut state = self.state.write().await; + state.metrics.send_count += 1; + state.metrics.total_send_bytes += message_len as u64; + } + true + } + + async fn update_prepare_msg( + self: Arc, + node_id: u64, + payload_data: &String, + pub_key: &Vec, + ) -> Option<(P2PMsg, Vec, usize, u64)> { + let (event, message, message_len, send_counts) = { + // must sync update + let mut state = self.state.write().await; + let send_counts = state.metrics.send_count; + state.clock = state.clock.update([].iter(), node_id); + state.metrics.clock_update_count = state.metrics.clock_update_count + 1; + let mut event = P2PMsg { + clock: state.clock.clone(), + data: payload_data.clone().into(), + pub_key: None, + signature: None, + }; + + // Serialize and sign the event + let serialized_event = match bincode::options().serialize(&event) { + Ok(vec) => vec, + Err(e) => { + error!("Failed to serialize event: {}", e); + return None; + } + }; + + let signature = match self.local_key.sign(&serialized_event) { + Ok(sig) => sig, + Err(e) => { + error!("Failed to sign event: {}", e); + return None; + } + }; + + event.signature = Some(signature); + event.pub_key = Some(pub_key.clone()); + let message = match bincode::options().serialize(&event) { + Ok(msg) => msg, + Err(e) => { + error!("Failed to serialize signed event: {}", e); + return None; + } + }; + let message_len = message.len(); + (event, message, message_len, send_counts) + }; + Some((event, message, message_len, send_counts)) + } + + async fn event_handler( + self: Arc, + mut rx_net: mpsc::Receiver, + tx_async_state: mpsc::Sender, + ) { + if self.config.bootnodes.is_none() { + info!("bootnode don't parse events"); + return; + } + let semaphore = Arc::new(Semaphore::new(self.config.concurrent_verify as usize)); + + // Event loop to handle incoming messages + loop { + let event = match rx_net.recv().await { + Some(ev) => ev, + None => break, // Receiver has been closed + }; + + match event { + Event::PeerConnected(peer_id) => { + debug!("connected to {:?}", peer_id); + } + Event::PeerSubscribed(peer_id, topic_hash) => { + debug!("Remote peer {:?} subscribed to {:?}", peer_id, topic_hash); + } + Event::Message(msg) => { + if self.config.concurrent_verify <= 1 { + self.clone() + .handle_p2p_msg(msg, tx_async_state.clone(), None) + .await; + } else { + let self_clone = self.clone(); + let tx_state_clone = tx_async_state.clone(); + let semaphore_clone = semaphore.clone(); + if let Ok(permit) = semaphore_clone.acquire_owned().await { + tokio::spawn(async move { + self_clone + .handle_p2p_msg(msg, tx_state_clone, Some(permit)) + .await; + }); + } else { + error!("Semaphore has been closed"); + } + } + } + _ => {} + } + } + } + + async fn handle_p2p_msg( + self: Arc, + msg: libp2p::gossipsub::Message, + tx_async_state: mpsc::Sender, + _permit: Option, + ) -> bool { + // Deserialize and verify + let msg_len = msg.data.len(); + let received_event: P2PMsg = match bincode::options().deserialize::(&msg.data) { + Ok(event) => event, + Err(e) => { + error!("Failed to deserialize received message: {}", e); + return false; + } + }; + debug!("Recv message: {:?}", received_event); + let received_clock = received_event.clock; + let received_signature = match received_event.signature { + Some(sig) => sig, + None => { + warn!("Received event without a signature."); + return false; + } + }; + + let serialized_event = match bincode::options().serialize(&P2PMsg { + clock: received_clock.clone(), + data: received_event.data.clone(), + pub_key: None, + signature: None, + }) { + Ok(vec) => vec, + Err(e) => { + error!("Failed to serialize event for verification: {}", e); + return false; + } + }; + + // Verify signature + if received_event.pub_key.is_some() { + let pubkey = match PublicKey::try_decode_protobuf(&received_event.pub_key.unwrap()) { + Ok(pk) => pk, + Err(e) => { + error!("Failed to decode public key: {}", e); + return false; + } + }; + if pubkey.verify(&serialized_event, &received_signature) { + { + if let Err(err) = tx_async_state + .send(StateUpdate::IncrementReceiveCount( + msg_len, + VLCUpdate { + others: vec![received_clock], + id: self.node_id, + }, + )) + .await + { + error!("Update state send fail, err: {:?}", err); + } + } + } else { + warn!("signature validate failed.") + } + } else { + warn!("lack pubkey to verify.") + } + true + } + + async fn start_tps_monitoring(self: Arc) { + if self.config.bootnodes.is_none() { + return; + } + + // default tps time window uses 1s + let time_window = self.config.time_window_s; + let mut interval = time::interval(Duration::from_secs(time_window)); + let mut prev_metrics = ServerMetric::default(); + + loop { + interval.tick().await; + + let state_read = self.state.read().await; + let current_metrics = &state_read.metrics; + let node_id = &self.node_id; + + let send_tps = current_metrics.send_count - prev_metrics.send_count; + let receive_tps = current_metrics.receive_count - prev_metrics.receive_count; + let clock_update_tps = + current_metrics.clock_update_count - prev_metrics.clock_update_count; + let send_bytes_tps = current_metrics.total_send_bytes - prev_metrics.total_send_bytes; + let recv_bytes_tps = current_metrics.total_recv_bytes - prev_metrics.total_recv_bytes; + + prev_metrics = current_metrics.clone(); + + info!("Id {}, Send TPS: {}", node_id, send_tps / time_window); + info!("Id {}, Receive TPS: {}", node_id, receive_tps / time_window); + info!( + "Id {}, Clock Update TPS: {}", + node_id, + clock_update_tps / time_window + ); + info!( + "Id {}, Total Send Bytes: {}", + node_id, + send_bytes_tps / time_window + ); + info!( + "Id {}, Total Receive Bytes: {}", + node_id, + recv_bytes_tps / time_window + ); + if self.config.print_vlc { + info!("Id {}, Current vlc value: {:?}", node_id, state_read.clock); + } + } + } + + async fn async_state_update( + state: Arc>, + mut rx: mpsc::Receiver, + ) { + while let Some(update) = rx.recv().await { + let mut state = state.write().await; + match update { + StateUpdate::IncrementSendCount(message_len) => { + state.metrics.send_count += 1; + state.metrics.total_send_bytes += message_len as u64; + } + StateUpdate::IncrementReceiveCount(msg_len, vlc_update) => { + state.metrics.receive_count += 1; + state.metrics.total_recv_bytes += msg_len as u64; + state.metrics.clock_update_count += 1; + state.clock = state.clock.update(vlc_update.others.iter(), vlc_update.id); + } + StateUpdate::UpdateClock(vlc_update) => { + state.clock = state.clock.update(vlc_update.others.iter(), vlc_update.id); + state.metrics.clock_update_count += 1; + } + StateUpdate::Exit(_) => break, + } + } + } + + async fn network_task( + mut network: Network, + mut rx_publish: mpsc::Receiver, + tx_net: mpsc::Sender, + tx_sender_open: watch::Sender, + mem: MemoryWatcherConfig, + ) { + let mut check_interval = time::interval(Duration::from_secs(2)); + let mut system_infos = System::new(); + loop { + tokio::select! { + Some(outer_msg) = rx_publish.recv() => { + let _ = network.publish(outer_msg.data, outer_msg.topic).await; + }, + Some(event) = network.next() => { + if timeout(Duration::from_secs(NET_TIMEOUT), tx_net.send(event)).await.is_err(){ + error!("send to recv handler timeout, event_handler busy and drop event"); + }; + }, + _ = check_interval.tick() => { + info!("Connected node num: {}, addr: {:?}", network.peer_nums(), network.multiaddr); + // first check process resources + let now_proc_memory_bytes = Server::process_memory_usage(&mut system_infos); + if now_proc_memory_bytes > Server::mb_to_bytes(mem.proc_max_mem_mb) { + info!("Process memory exceeded maximum usage {:?} MB, current {:?} Bytes, auto disconnect", mem.proc_max_mem_mb, now_proc_memory_bytes); + let _ = tx_sender_open.send(false); // silent send + network.disconnect_by_traffic(); + continue; + } + // second check system resources + let sys_memory_usage_percentage = Server::sys_memory_usage(&mut system_infos); + if sys_memory_usage_percentage > mem.sys_disconnect_mem { + info!("System memory usage exceeded {:?}, current usage {:?}, auto disconnect", mem.sys_disconnect_mem, sys_memory_usage_percentage); + network.disconnect_by_traffic(); + } + if ! mem.is_bootnode && mem.is_sender { + if sys_memory_usage_percentage > mem.sys_max_mem_usage { + info!("System memory usage exceeded {:?}, current usage {:?}, pausing event trigger task...", mem.sys_max_mem_usage, sys_memory_usage_percentage); + if let Err(err) = tx_sender_open.send(false) { + error!("tx_sender_open send error: {:?}", err); + } + } else if sys_memory_usage_percentage > mem.sys_min_mem_usage { + info!("System memory current usage {:?}", sys_memory_usage_percentage); + } else if sys_memory_usage_percentage < mem.sys_min_mem_usage { + info!("System memory usage below {:?}, current usage {:?}, proceeding with computation...", mem.sys_min_mem_usage, sys_memory_usage_percentage); + network.recover_connecting(); + if let Err(err) = tx_sender_open.send(true) { + error!("tx_sender_open send error: {:?}", err); + } + } + } + }, + } + } + } +} + +pub type ServerArc = Arc; + +/// A cache state of a server node. +#[derive(Debug, Clone, Default)] +pub struct ServerState { + pub clock: OrdinaryClock, + pub metrics: ServerMetric, + pub message_ids: VecDeque, + pub cache_items: BTreeMap, + pub cache_maximum: u64, +} + +#[derive(Debug, Clone, Default)] +pub struct ServerMetric { + pub send_count: u64, + pub receive_count: u64, + pub clock_update_count: u64, + pub total_send_bytes: u64, + pub total_recv_bytes: u64, +} + +impl ServerState { + /// Create a new server state. + pub fn new(cache_maximum: u64) -> Self { + Self { + clock: OrdinaryClock::default(), + message_ids: VecDeque::new(), + cache_items: BTreeMap::new(), + cache_maximum, + metrics: ServerMetric::default(), + } + } +} +pub fn generate_config( + host: String, + port: u16, + topic: String, + bootnodes: Option>, + discover: String, + max_discover_node: u32, + enable_tx_send: bool, + concurrent_verify: u64, + trigger_us: u64, + max_sys_memory_percent: u64, + max_proc_memory_mb: u64, + disconnect_rate: f64, + time_window_s: u64, + init_clock_keys: u32, + payload_bytes_len: u64, + print_vlc: bool, + tokio_console: bool, + tokio_console_port: u16, + pprof: bool, + output: &str, +) { + let discover = Discover::from_str(&discover).unwrap(); + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + let private_key = general_purpose::STANDARD.encode(local_key.to_protobuf_encoding().unwrap()); + let multi_addr = format!( + "{}{}{}", + host, + port, + multiaddr::Protocol::P2p(local_peer_id) + ); + let concurrent_verify = if concurrent_verify == 0 { + num_cpus::get() as u64 + } else { + concurrent_verify + }; + let min_sys_memory_percent = if max_sys_memory_percent > 5 { + max_sys_memory_percent - 2 + } else { + max_sys_memory_percent + }; + let config = ServerConfig { + host, + port, + topic, + private_key, + multi_addr, + bootnodes, + discover, + max_discover_node, + enable_tx_send, + concurrent_verify, + trigger_us, + max_sys_memory_percent, + min_sys_memory_percent, + max_proc_memory_mb, + disconnect_rate, + time_window_s, + init_clock_keys, + payload_bytes_len, + print_vlc, + tokio_console, + tokio_console_port, + pprof, + }; + + let config_json = serde_json::to_string_pretty(&config).unwrap(); + let mut file = File::create(output).unwrap(); + file.write_all(config_json.as_bytes()).unwrap(); +} + +pub async fn run(config: Arc) -> anyhow::Result<(), anyhow::Error> { + let local_key = if config.private_key.is_empty() { + identity::Keypair::generate_ed25519() + } else { + let decoded_key = general_purpose::STANDARD + .decode(config.private_key.clone()) + .map_err(|e| anyhow!("Failed to decode private key: {}", e))?; + identity::Keypair::from_protobuf_encoding(&decoded_key) + .map_err(|e| anyhow!("Failed to create keypair from decoded key: {}", e))? + }; + let local_peer_id = PeerId::from(local_key.public()); + let node_id = Server::identity_to_u64(&local_peer_id.to_base58()); + if node_id.is_none() { + return Err(anyhow!("peer_id string is not base58 format!")); + } + let topic = Sha256Topic::new(config.topic.clone()); + let listen_addr = format!( + "{}{}{}", + config.host, + config.port, + multiaddr::Protocol::P2p(local_peer_id) + ); + info!( + "node_id={:?}, topic={}, listen_addr={}", + node_id, + config.topic.clone(), + listen_addr + ); + let mut swarm = Network::new( + local_key.clone(), + config.bootnodes.clone(), + listen_addr, + config.discover.clone(), + config.max_discover_node, + config.disconnect_rate, + ) + .map_err(|err| anyhow!(err.to_string()))?; + let is_bootnode = config.bootnodes.is_none(); + // not subscribe topic only when bootnode & DHT + if !is_bootnode || config.discover != Discover::DHT { + swarm.subscribe(&topic); + } + + let mem_conf = MemoryWatcherConfig { + is_bootnode, + is_sender: config.enable_tx_send, + proc_max_mem_mb: config.max_proc_memory_mb, + sys_max_mem_usage: config.max_sys_memory_percent as f64, + sys_min_mem_usage: config.min_sys_memory_percent as f64, + sys_disconnect_mem: 95.0, + }; + let mut state = ServerState::new(500); + state.clock = OrdinaryClock((0..config.init_clock_keys).map(|i| (i as _, 0)).collect()); + let state_arc = Arc::new(RwLock::new(state)); + let server_arc = Arc::new(Server { + config, + node_id: node_id.unwrap(), + peer_id: local_peer_id, + local_key, + state: state_arc.clone(), + }); + + let (tx_publish, rx_publish) = mpsc::channel(1000); + let (tx_sender_open, rx_sender_open) = watch::channel(true); + let (tx_net, rx_net) = mpsc::channel(1000); + let (tx_async_state, rx_async_state) = mpsc::channel(1000); + + // Periodic message sender + let message_sender = tokio::spawn( + server_arc + .clone() + .event_trigger_interval(tx_publish, rx_sender_open), + ); + let event_handler = tokio::spawn(server_arc.clone().event_handler(rx_net, tx_async_state)); + let tps_monitoring = tokio::spawn(server_arc.clone().start_tps_monitoring()); + let state_mgr = tokio::spawn(Server::async_state_update(state_arc, rx_async_state)); + let network_task = tokio::spawn(Server::network_task( + swarm, + rx_publish, + tx_net, + tx_sender_open, + mem_conf, + )); + + // Wait for all tasks to complete + let _ = tokio::join!( + event_handler, + message_sender, + tps_monitoring, + network_task, + state_mgr + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::Instant; + + #[test] + fn test_verify_signature() { + let local_key = identity::Keypair::generate_ed25519(); + let pub_key_bytes = local_key.public().encode_protobuf(); + let peer_id = PeerId::from(local_key.public()); + let node_id = Server::identity_to_u64(&peer_id.to_base58()).unwrap(); + let clock = OrdinaryClock::default(); + clock.update(vec![].iter(), node_id); + let event = P2PMsg { + clock, + data: "data".into(), + pub_key: None, + signature: None, + }; + + // Serialize and sign the event + let serialized_event = bincode::options().serialize(&event).unwrap(); + let signature = local_key.sign(&serialized_event); + let pubkey = PublicKey::try_decode_protobuf(&pub_key_bytes).unwrap(); + assert_eq!(true, pubkey.verify(&serialized_event, &signature.unwrap())); + } + + #[test] + fn test_stress_ser_unser() { + let local_key = identity::Keypair::generate_ed25519(); + let pub_key_bytes = local_key.public().encode_protobuf(); + let peer_id = PeerId::from(local_key.public()); + let node_id = Server::identity_to_u64(&peer_id.to_base58()).unwrap(); + let clock = OrdinaryClock::default(); + clock.update(vec![].iter(), node_id); + let mut event_net = P2PMsg { + clock: clock.clone(), + data: _DEFAULT_PAYLOAD.into(), + pub_key: None, + signature: None, + }; + let event_raw = event_net.clone(); + + let duration = Duration::from_secs(1); + + // Serialize and sign the event + let start = Instant::now(); + let mut serialize_count = 0; + + while start.elapsed() < duration { + let serialized_event = bincode::serialize(&event_net).unwrap(); + let signature = local_key.sign(&serialized_event); + event_net.signature = Some(signature.unwrap()); + let _ = bincode::serialize(&event_net).unwrap(); + serialize_count += 1; + } + event_net.pub_key = Some(pub_key_bytes.clone()); + println!( + "Serialization and sign performed: {} times in 1 second", + serialize_count + ); + + // Measure deserialization repetitions in 1 second + let serialized_raw = bincode::serialize(&event_raw).unwrap(); + let signature = local_key.sign(&serialized_raw).unwrap(); + let serialized_net = bincode::serialize(&event_net).unwrap(); + let start = Instant::now(); + let mut deserialize_count = 0; + while start.elapsed() < duration { + let pubkey = PublicKey::try_decode_protobuf(&pub_key_bytes).unwrap(); + let _deserialized: P2PMsg = bincode::deserialize(&serialized_net).unwrap(); + let serialized_event = bincode::serialize(&event_raw).unwrap(); + assert_eq!(true, pubkey.verify(&serialized_event, &signature)); + deserialize_count += 1; + } + println!( + "Deserialization performed: {} times in 1 second", + deserialize_count + ); + } + + pub async fn make_test_server( + listen_addr: &str, + boot_nodes: Vec, + topic_str: &str, + ) -> anyhow::Result<(Server, Network, Sha256Topic)> { + let local_key = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(local_key.public()); + let node_id = Server::identity_to_u64(&peer_id.to_base58()).unwrap(); + let mut config = ServerConfig::default(); + config.bootnodes = Some(boot_nodes); + let config = Arc::new(config); + let state = Arc::new(RwLock::new(ServerState::new(500))); + let server = Server { + local_key, + state, + config, + node_id, + peer_id, + }; + + let topic = Sha256Topic::new(topic_str); + let mut swarm = Network::new( + server.local_key.clone(), + server.config.bootnodes.clone(), + listen_addr.to_string(), + Discover::Config, + 50, + 1.0, + ) + .map_err(|err| anyhow!(err.to_string()))?; + swarm.subscribe(&topic); + return Ok((server, swarm, topic)); + } + + #[tokio::test] + async fn test_press_trigger_event() -> anyhow::Result<()> { + let listen_addr = "/ip4/127.0.0.1/tcp/9601"; + let topic_str = "test"; + + let (server, mut swarm, topic) = make_test_server(&listen_addr, vec![], &topic_str) + .await + .unwrap(); + let server_arc = Arc::new(server); + let (tx_publish, mut rx_publish) = mpsc::channel::(100); + let (tx_end, mut rx_end) = mpsc::channel(1); + + let task_net = tokio::spawn(async move { + loop { + tokio::select! { + Some(outer_msg) = rx_publish.recv() => { + let _ = swarm.publish(outer_msg.data, outer_msg.topic).await; + }, + Some(_) = rx_end.recv() => { + break; + } + } + } + }); + + let node_id = server_arc.node_id; + let pub_key = server_arc.local_key.public().encode_protobuf(); + let payload_data = + Server::generate_data_payload(server_arc.config.payload_bytes_len as usize); + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(1) { + server_arc + .clone() + .event_trigger_task(node_id, &payload_data, &pub_key, &topic, &tx_publish) + .await; + } + let _ = tx_end.send(()).await; + print_metric(server_arc.state.clone(), 1).await; + let _ = tokio::join!(task_net); + Ok(()) + } + + async fn print_metric(state: Arc>, time_windows_s: u64) { + let state = state.read().await; + println!( + "Receive TPS: {}", + state.metrics.receive_count / time_windows_s + ); + println!( + "Clock Update TPS: {}", + state.metrics.clock_update_count / time_windows_s + ); + println!( + "Total Receive Bytes: {}", + state.metrics.total_recv_bytes / time_windows_s + ); + println!( + "Executed {} times in 1 second", + state.metrics.receive_count / time_windows_s + ); + } + + #[ignore] + #[tokio::test] + async fn test_press_recv_event() -> anyhow::Result<()> { + let listen_addr = "/ip4/127.0.0.1/tcp/9602"; + let topic_str = "vlc"; + // Need start node1 sender listen on /ip4/127.0.0.1/tcp/9601, + let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9601".to_string()]; + let time_windows_s = 1; + let (server, mut swarm, _topic) = make_test_server(&listen_addr, boot_nodes, &topic_str) + .await + .unwrap(); + let server_arc = Arc::new(server); + let (tx_net, mut rx_net) = mpsc::channel::(100); + let (tx_state, rx_state) = mpsc::channel(100); + let (tx_end, mut rx_end) = mpsc::channel(1); + + let task_net = tokio::spawn(async move { + loop { + tokio::select! { + Some(event) = swarm.next() => { + let _ = tx_net.send(event).await; + }, + Some(_) = rx_end.recv() => { + break; + } + } + } + }); + let task_state_update = tokio::spawn(Server::async_state_update( + server_arc.state.clone(), + rx_state, + )); + + let semaphore = Arc::new(Semaphore::new(3)); + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(time_windows_s) { + let event = match rx_net.recv().await { + Some(ev) => ev, + None => break, // Receiver has been closed + }; + + match event { + Event::PeerConnected(peer_id) => { + println!("connected to {:?}", peer_id); + } + Event::Message(msg) => { + let semaphore_clone = semaphore.clone(); + let permit = semaphore_clone.acquire_owned().await.unwrap(); + server_arc + .clone() + .handle_p2p_msg(msg, tx_state.clone(), Some(permit)) + .await; + } + _ => {} + } + } + let _ = tx_end.send(()).await; + let _ = tx_state.send(StateUpdate::Exit(())).await; + // Wait for tasks to complete + if let Err(e) = task_net.await { + eprintln!("Error in task_net: {:?}", e); + } + if let Err(e) = task_state_update.await { + eprintln!("Error in task_state_update: {:?}", e); + } + print_metric(server_arc.state.clone(), time_windows_s).await; + Ok(()) + } +}