Skip to content

Commit

Permalink
feat: node multi addrs (#98)
Browse files Browse the repository at this point in the history
* refactor NodeAddr and connect-to mechanism

* dynamic detect local ip in udp, tcp

* added compose transport macro
  • Loading branch information
giangndm authored Dec 21, 2023
1 parent e7e43a3 commit 8901f24
Show file tree
Hide file tree
Showing 62 changed files with 1,015 additions and 3,120 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ members = [
"packages/network",
"packages/core/utils",
"packages/core/identity",
"packages/core/multiaddr",
"packages/core/router",
"packages/routers/layers_spread_router",
"packages/services/dht_discovery",
Expand All @@ -22,6 +21,7 @@ members = [
"packages/transports/vnet",
"packages/transports/tcp",
"packages/transports/udp",
"packages/transports/compose",
"packages/apps/redis",
"packages/runner",
"examples",
Expand Down
12 changes: 6 additions & 6 deletions examples/examples/benchmark_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use atm0s_sdn::{KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, K
use atm0s_sdn::{LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent};
use atm0s_sdn::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent};
use atm0s_sdn::{NetworkPlane, NetworkPlaneConfig};
use atm0s_sdn::{NodeAddr, NodeAddrBuilder, NodeId, Protocol, UdpTransport};
use atm0s_sdn::{NodeAddr, NodeAddrBuilder, NodeId, UdpTransport};
use atm0s_sdn::{PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent};
use atm0s_sdn::{SystemTimer, Timer};
use bytes::Bytes;
Expand Down Expand Up @@ -34,15 +34,15 @@ enum ImplSdkEvent {

async fn run_node(node_id: NodeId, seeds: Vec<NodeAddr>) -> (PubsubSdk, NodeAddr) {
log::info!("Run node {} connect to {:?}", node_id, seeds);
let node_addr = Arc::new(NodeAddrBuilder::default());
node_addr.add_protocol(Protocol::P2p(node_id));
let transport = Box::new(UdpTransport::new(node_id, 0, node_addr.clone()).await);
let mut node_addr_builder = NodeAddrBuilder::new(node_id);
let socket = UdpTransport::prepare(0, &mut node_addr_builder).await;
let transport = Box::new(UdpTransport::new(node_addr_builder.addr(), socket));
let timer = Arc::new(SystemTimer());

let router = SharedRouter::new(node_id);
let manual = ManualBehavior::new(ManualBehaviorConf {
node_id,
node_addr: node_addr.addr(),
node_addr: node_addr_builder.addr(),
seeds,
local_tags: vec![],
connect_tags: vec![],
Expand All @@ -67,7 +67,7 @@ async fn run_node(node_id: NodeId, seeds: Vec<NodeAddr>) -> (PubsubSdk, NodeAddr
plane.stopped();
});

(pubsub_sdk, node_addr.addr())
(pubsub_sdk, node_addr_builder.addr())
}

#[async_std::main]
Expand Down
12 changes: 6 additions & 6 deletions examples/examples/benchmark_pubsub_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use atm0s_sdn::{KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, K
use atm0s_sdn::{LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent};
use atm0s_sdn::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent};
use atm0s_sdn::{NetworkPlane, NetworkPlaneConfig};
use atm0s_sdn::{NodeAddr, NodeAddrBuilder, NodeId, Protocol, UdpTransport};
use atm0s_sdn::{NodeAddr, NodeAddrBuilder, NodeId, UdpTransport};
use atm0s_sdn::{PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent};
use atm0s_sdn::{SystemTimer, Timer};
use bytes::Bytes;
Expand Down Expand Up @@ -34,15 +34,15 @@ enum ImplSdkEvent {

async fn run_node(node_id: NodeId, seeds: Vec<NodeAddr>) -> (PubsubSdk, NodeAddr) {
log::info!("Run node {} connect to {:?}", node_id, seeds);
let node_addr = Arc::new(NodeAddrBuilder::default());
node_addr.add_protocol(Protocol::P2p(node_id));
let transport = Box::new(UdpTransport::new(node_id, 0, node_addr.clone()).await);
let mut node_addr_builder = NodeAddrBuilder::new(node_id);
let socket = UdpTransport::prepare(0, &mut node_addr_builder).await;
let transport = Box::new(UdpTransport::new(node_addr_builder.addr(), socket));
let timer = Arc::new(SystemTimer());

let router = SharedRouter::new(node_id);
let manual = ManualBehavior::new(ManualBehaviorConf {
node_id,
node_addr: node_addr.addr(),
node_addr: node_addr_builder.addr(),
seeds,
local_tags: vec![],
connect_tags: vec![],
Expand All @@ -67,7 +67,7 @@ async fn run_node(node_id: NodeId, seeds: Vec<NodeAddr>) -> (PubsubSdk, NodeAddr
plane.stopped();
});

(pubsub_sdk, node_addr.addr())
(pubsub_sdk, node_addr_builder.addr())
}

#[async_std::main]
Expand Down
25 changes: 12 additions & 13 deletions examples/examples/benchmark_transport.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use atm0s_sdn::ErrorUtils;
use atm0s_sdn::RouteRule;
use atm0s_sdn::{ConnectionEvent, Transport, TransportEvent, TransportMsg};
use atm0s_sdn::{NodeAddrBuilder, UdpTransport};

#[async_std::main]
async fn main() {
env_logger::builder().format_timestamp_millis().filter_level(log::LevelFilter::Info).init();
let node_addr1 = Arc::new(NodeAddrBuilder::default());
let mut transport1 = UdpTransport::new(1, 0, node_addr1.clone()).await;
let mut node_addr_builder1 = NodeAddrBuilder::new(1);
let socket1 = UdpTransport::prepare(0, &mut node_addr_builder1).await;
let mut transport1 = Box::new(UdpTransport::new(node_addr_builder1.addr(), socket1));

let node_addr2 = Arc::new(NodeAddrBuilder::default());
let mut transport2 = UdpTransport::new(2, 0, node_addr2.clone()).await;
let mut node_addr_builder2 = NodeAddrBuilder::new(2);
let socket2 = UdpTransport::prepare(0, &mut node_addr_builder2).await;
let mut transport2 = Box::new(UdpTransport::new(node_addr_builder2.addr(), socket2));

let task = async_std::task::spawn(async move {
loop {
Expand All @@ -39,17 +40,15 @@ async fn main() {
});

async_std::task::sleep(Duration::from_secs(1)).await;
log::info!("Connect to {}", node_addr2.addr());
log::info!("Connect to {}", node_addr_builder2.addr());

transport1.connector().connect_to(100, 2, node_addr2.addr()).print_error("Should connect");
for conn in transport1.connector().create_pending_outgoing(node_addr_builder2.addr()) {
transport1.connector().continue_pending_outgoing(conn);
}

loop {
match transport1.recv().await {
Ok(TransportEvent::OutgoingRequest(_, _, acceptor, _)) => {
log::info!("[Transport1] OutgoingRequest");
acceptor.accept();
}
Ok(TransportEvent::Outgoing(trans1_sender, mut trans1_receiver, _)) => {
Ok(TransportEvent::Outgoing(trans1_sender, mut trans1_receiver)) => {
let mut msg_count = 0;
trans1_sender.send(TransportMsg::build(0, 0, RouteRule::Direct, 0, 0, &[0; 10]));
let mut last_send = std::time::Instant::now();
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/chat_example.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use atm0s_sdn::SharedRouter;
use atm0s_sdn::SystemTimer;
use atm0s_sdn::{convert_enum, NetworkPlane, NetworkPlaneConfig};
use atm0s_sdn::{KeyValueBehavior, KeyValueSdk, NodeAddr, NodeAddrBuilder, Protocol, PubsubServiceBehaviour, UdpTransport};
use atm0s_sdn::{KeyValueBehavior, KeyValueSdk, NodeAddr, NodeAddrBuilder, PubsubServiceBehaviour, UdpTransport};
use atm0s_sdn::{KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueSdkEvent};
use atm0s_sdn::{LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent};
use atm0s_sdn::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent};
Expand Down Expand Up @@ -119,14 +119,14 @@ async fn main() {
// The multiaddr is composed of multiple protocols, each of which is identified by a code.
// example in our case: /p2p/0/ip4/127.0.0.1/udp/50000
// You can find more information about multiaddr here: https://multiformats.io/multiaddr/
let node_addr_builder = Arc::new(NodeAddrBuilder::default());
node_addr_builder.add_protocol(Protocol::P2p(args.node_id));
let mut node_addr_builder = NodeAddrBuilder::new(args.node_id);

// Create a transport layer, which is used to send and receive messages.
// In this example, we use the UDP transport layer.
// There are also other transport layers, such as TCP and VNET, others are still in progress.
// The port number is 50000 + node_id.
let transport = UdpTransport::new(args.node_id, 50000 + args.node_id as u16, node_addr_builder.clone()).await;
let socket = UdpTransport::prepare(50000 + args.node_id as u16, &mut node_addr_builder).await;
let transport = UdpTransport::new(node_addr_builder.addr(), socket);
let node_addr = node_addr_builder.addr();
println!("Listening on addr {}", node_addr);

Expand Down
10 changes: 5 additions & 5 deletions examples/examples/discovery_manual_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use atm0s_sdn::SharedRouter;
use atm0s_sdn::SystemTimer;
use atm0s_sdn::{
LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent, ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, NodeAddr,
NodeAddrBuilder, Protocol, UdpTransport,
NodeAddrBuilder, UdpTransport,
};
use atm0s_sdn::{NetworkPlane, NetworkPlaneConfig};
use clap::Parser;
Expand Down Expand Up @@ -49,9 +49,9 @@ struct Args {
async fn main() {
env_logger::init();
let args: Args = Args::parse();
let node_addr_builder = Arc::new(NodeAddrBuilder::default());
node_addr_builder.add_protocol(Protocol::P2p(args.node_id));
let transport = UdpTransport::new(args.node_id, 0, node_addr_builder.clone()).await;
let mut node_addr_builder = NodeAddrBuilder::new(args.node_id);
let socket = UdpTransport::prepare(0, &mut node_addr_builder).await;
let transport = Box::new(UdpTransport::new(node_addr_builder.addr(), socket));
let node_addr = node_addr_builder.addr();
log::info!("Listen on addr {}", node_addr);

Expand All @@ -71,7 +71,7 @@ async fn main() {
node_id: args.node_id,
tick_ms: 1000,
behaviors: vec![Box::new(spreads_layer_router), Box::new(key_value), Box::new(manual)],
transport: Box::new(transport),
transport,
timer: Arc::new(SystemTimer()),
router: Arc::new(router),
});
Expand Down
18 changes: 14 additions & 4 deletions examples/examples/manual_node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use atm0s_sdn::compose_transport;
use atm0s_sdn::convert_enum;
use atm0s_sdn::SharedRouter;
use atm0s_sdn::SystemTimer;
use atm0s_sdn::{KeyValueBehavior, KeyValueSdk, NodeAddr, NodeAddrBuilder, Protocol, UdpTransport};
use atm0s_sdn::TcpTransport;
use atm0s_sdn::{KeyValueBehavior, KeyValueSdk, NodeAddr, NodeAddrBuilder, UdpTransport};
use atm0s_sdn::{KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueSdkEvent};
use atm0s_sdn::{LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent};
use atm0s_sdn::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent};
Expand Down Expand Up @@ -62,13 +64,21 @@ struct Args {
redis_addr: Option<SocketAddr>,
}

compose_transport!(UdpTcpTransport, udp: UdpTransport, tcp: TcpTransport);

#[async_std::main]
async fn main() {
env_logger::builder().format_timestamp_millis().init();
let args: Args = Args::parse();
let node_addr_builder = Arc::new(NodeAddrBuilder::default());
node_addr_builder.add_protocol(Protocol::P2p(args.node_id));
let transport = UdpTransport::new(args.node_id, 50000 + args.node_id as u16, node_addr_builder.clone()).await;
let mut node_addr_builder = NodeAddrBuilder::new(args.node_id);
let udp_socket = UdpTransport::prepare(50000 + args.node_id as u16, &mut node_addr_builder).await;
let tcp_listener = TcpTransport::prepare(50000 + args.node_id as u16, &mut node_addr_builder).await;

let udp = UdpTransport::new(node_addr_builder.addr(), udp_socket);
let tcp = TcpTransport::new(node_addr_builder.addr(), tcp_listener);

let transport = UdpTcpTransport::new(udp, tcp);

let node_addr = node_addr_builder.addr();
log::info!("Listen on addr {}", node_addr);

Expand Down
3 changes: 1 addition & 2 deletions packages/core/identity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
atm0s-sdn-multiaddr = { path = "../multiaddr", version = "0.1.2" }
multiaddr = "0.18.1"
rand = "0.8"
parking_lot = { workspace = true }
serde = { workspace = true }
2 changes: 1 addition & 1 deletion packages/core/identity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ mod node_addr;
mod node_id;

pub use conn_id::{ConnDirection, ConnId};
pub use node_addr::{NodeAddr, NodeAddrBuilder, NodeAddrType, Protocol};
pub use node_addr::{NodeAddr, NodeAddrBuilder, Protocol};
pub use node_id::{NodeId, NodeIdType, NodeSegment};
110 changes: 89 additions & 21 deletions packages/core/identity/src/node_addr.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,110 @@
use serde::{Deserialize, Serialize};
use std::{fmt::Display, str::FromStr};

use crate::node_id::NodeId;
use parking_lot::Mutex;
pub type NodeAddr = atm0s_sdn_multiaddr::Multiaddr;
pub use atm0s_sdn_multiaddr::Protocol;
pub use multiaddr::Protocol;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct NodeAddr(NodeId, multiaddr::Multiaddr);

impl NodeAddr {
pub fn empty(node_id: NodeId) -> Self {
Self(node_id, multiaddr::Multiaddr::empty())
}

pub fn node_id(&self) -> NodeId {
self.0
}

pub fn multiaddr(&self) -> &multiaddr::Multiaddr {
&self.1
}

pub fn from_iter<'a>(node_id: NodeId, iter: impl IntoIterator<Item = Protocol<'a>>) -> Self {
Self(node_id, multiaddr::Multiaddr::from_iter(iter))
}

pub fn to_vec(&self) -> Vec<u8> {
let mut buf = self.0.to_be_bytes().to_vec();
buf.extend(self.1.to_vec());
buf
}

pub fn from_vec(buf: &[u8]) -> Option<Self> {
let node_id = NodeId::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
let multiaddr = multiaddr::Multiaddr::try_from(buf[4..].to_vec()).ok()?;
Some(Self(node_id, multiaddr))
}
}

pub trait NodeAddrType {
fn node_id(&self) -> Option<NodeId>;
impl FromStr for NodeAddr {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut split = s.split('@');
let node_id = split.next().ok_or("Missing NodeId".to_string())?.parse::<NodeId>().map_err(|e| e.to_string())?;
let multiaddr = split.next().unwrap_or("").parse::<multiaddr::Multiaddr>().map_err(|e| e.to_string())?;
Ok(Self(node_id, multiaddr))
}
}

impl NodeAddrType for NodeAddr {
fn node_id(&self) -> Option<NodeId> {
for protocol in self.iter() {
if let Protocol::P2p(node_id) = protocol {
return Some(node_id);
}
impl Display for NodeAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.1.is_empty() {
write!(f, "{}", self.0)
} else {
write!(f, "{}@{}", self.0, self.1)
}
None
}
}

/// A builder for creating `NodeAddr` instances.
pub struct NodeAddrBuilder {
addr: Mutex<NodeAddr>,
node_id: NodeId,
addr: multiaddr::Multiaddr,
}

impl Default for NodeAddrBuilder {
fn default() -> Self {
Self { addr: Mutex::new(NodeAddr::empty()) }
impl NodeAddrBuilder {
pub fn new(node_id: NodeId) -> Self {
Self {
node_id,
addr: multiaddr::Multiaddr::empty(),
}
}

pub fn node_id(&self) -> NodeId {
self.node_id
}
}

impl NodeAddrBuilder {
/// Adds a protocol to the node address.
pub fn add_protocol(&self, protocol: Protocol) {
self.addr.lock().push(protocol);
pub fn add_protocol(&mut self, protocol: Protocol) {
self.addr.push(protocol);
}

/// Get the node address.
pub fn addr(&self) -> NodeAddr {
(*self.addr.lock()).clone()
NodeAddr(self.node_id, self.addr.clone())
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use multiaddr::Multiaddr;

#[test]
fn test_to_from_str() {
let addr = super::NodeAddr::from_str("1@/ip4/127.0.0.1").unwrap();
assert_eq!(addr, super::NodeAddr(1, "/ip4/127.0.0.1".parse().unwrap()));
assert_eq!(addr.to_string(), "1@/ip4/127.0.0.1");
}

#[test]
fn test_empty() {
let addr = super::NodeAddr::from_str("1").unwrap();
assert_eq!(addr, super::NodeAddr(1, Multiaddr::empty()));
assert_eq!(addr, super::NodeAddr::empty(1));
assert_eq!(addr.to_string(), "1");
}
}
3 changes: 0 additions & 3 deletions packages/core/multiaddr/.gitignore

This file was deleted.

Loading

0 comments on commit 8901f24

Please sign in to comment.