Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parachain multiplexing #295

Merged
merged 7 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions backend/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::feed::connector::{FeedConnector, Connected, FeedId};
use crate::util::DenseMap;
use crate::feed::{self, FeedMessageSerializer};
use crate::chain::{self, Chain, ChainId, Label, GetNodeNetworkState};
use crate::types::{NodeDetails, NodeId};
use crate::types::{ConnId, NodeDetails, NodeId};

pub struct Aggregator {
labels: HashMap<Label, ChainId>,
Expand Down Expand Up @@ -106,8 +106,11 @@ impl Actor for Aggregator {
#[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
pub network_id: Option<Label>,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
}

Expand Down Expand Up @@ -173,21 +176,14 @@ impl Handler<AddNode> for Aggregator {
type Result = ();

fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
let AddNode { node, network_id, rec } = msg;
let AddNode { node, conn_id, rec } = msg;

let cid = self.lazy_chain(&node.chain, &network_id, ctx);
let cid = self.lazy_chain(&node.chain, &None, ctx);
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");

if let Some(network_id) = network_id {
// Attach network id to the chain if it was not done yet
if chain.network_id.is_none() {
chain.network_id = Some(network_id.clone());
self.networks.insert(network_id, cid);
}
}

chain.addr.do_send(chain::AddNode {
node,
conn_id,
rec,
});
}
Expand Down
14 changes: 10 additions & 4 deletions backend/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}};
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
use crate::feed::{self, FeedMessageSerializer};
use crate::util::{DenseMap, NumStats, now};
use crate::types::{NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};
use crate::types::{ConnId, NodeId, NodeDetails, NodeLocation, Block, Timestamp, BlockNumber};

const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes

Expand Down Expand Up @@ -194,7 +194,11 @@ impl Actor for Chain {
#[derive(Message)]
#[rtype(result = "()")]
pub struct AddNode {
/// Details of the node being added to the aggregator
pub node: NodeDetails,
/// Connection id used by the node connector for multiplexing parachains
pub conn_id: ConnId,
/// Recipient for the initialization message
pub rec: Recipient<Initialize>,
}

Expand Down Expand Up @@ -248,11 +252,13 @@ impl Handler<AddNode> for Chain {
type Result = ();

fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
self.increment_label_count(&msg.node.chain);
let AddNode { node, conn_id, rec } = msg;
self.increment_label_count(&node.chain);

let nid = self.nodes.add(Node::new(msg.node));
let nid = self.nodes.add(Node::new(node));
let chain = ctx.address();

if let Err(_) = msg.rec.do_send(Initialize(nid, ctx.address())) {
if let Err(_) = rec.do_send(Initialize { nid, conn_id, chain }) {
self.nodes.remove(nid);
} else if let Some(node) = self.nodes.get(nid) {
self.serializer.push(feed::AddedNode(nid, node));
Expand Down
167 changes: 116 additions & 51 deletions backend/src/node/connector.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,60 @@
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use std::net::Ipv4Addr;
use std::mem;

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_http::ws::Item;
use crate::aggregator::{Aggregator, AddNode};
use crate::chain::{Chain, UpdateNode, RemoveNode};
use crate::node::NodeId;
use crate::node::message::{NodeMessage, Details, SystemConnected};
use crate::util::LocateRequest;
use crate::types::ConnId;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// Continuation buffer limit, 10mb
const CONT_BUF_LIMIT: usize = 10 * 1024 * 1024;

pub struct NodeConnector {
/// Id of the node this connector is responsible for handling
nid: NodeId,
/// Multiplexing connections by id
multiplex: BTreeMap<ConnId, ConnMultiplex>,
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
hb: Instant,
/// Aggregator actor address
aggregator: Addr<Aggregator>,
/// Chain actor address
chain: Option<Addr<Chain>>,
/// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>,
/// IP address of the node this connector is responsible for
ip: Option<Ipv4Addr>,
/// Actix address of location services
locator: Recipient<LocateRequest>,
/// Buffer for constructing continuation messages
contbuf: BytesMut,
}

enum ConnMultiplex {
Connected {
/// Id of the node this multiplex connector is responsible for handling
nid: NodeId,
/// Chain address to which this multiplex connector is delegating messages
chain: Addr<Chain>,
},
Waiting {
/// Backlog of messages to be sent once we get a recipient handle to the chain
backlog: Vec<NodeMessage>,
}
}

impl Default for ConnMultiplex {
fn default() -> Self {
ConnMultiplex::Waiting {
backlog: Vec::new(),
}
}
}

impl Actor for NodeConnector {
Expand All @@ -40,23 +65,23 @@ impl Actor for NodeConnector {
}

fn stopped(&mut self, _: &mut Self::Context) {
if let Some(chain) = self.chain.as_ref() {
chain.do_send(RemoveNode(self.nid));
for mx in self.multiplex.values() {
if let ConnMultiplex::Connected { chain, nid } = mx {
chain.do_send(RemoveNode(*nid));
}
}
}
}

impl NodeConnector {
pub fn new(aggregator: Addr<Aggregator>, locator: Recipient<LocateRequest>, ip: Option<Ipv4Addr>) -> Self {
Self {
// Garbage id, will be replaced by the Initialize message
nid: !0,
multiplex: BTreeMap::new(),
hb: Instant::now(),
aggregator,
chain: None,
backlog: Vec::new(),
ip,
locator,
contbuf: BytesMut::new(),
}
}

Expand All @@ -71,58 +96,88 @@ impl NodeConnector {
}

fn handle_message(&mut self, msg: NodeMessage, data: Bytes, ctx: &mut <Self as Actor>::Context) {
if let Some(chain) = self.chain.as_ref() {
chain.do_send(UpdateNode {
nid: self.nid,
msg,
raw: Some(data)
});

return;
let conn_id = msg.id.unwrap_or(0);
dvdplm marked this conversation as resolved.
Show resolved Hide resolved

match self.multiplex.entry(conn_id).or_default() {
ConnMultiplex::Connected { nid, chain } => {
chain.do_send(UpdateNode {
nid: *nid,
msg,
raw: Some(data),
});
}
ConnMultiplex::Waiting { backlog } => {
if let Details::SystemConnected(connected) = msg.details {
let SystemConnected { network_id: _, mut node } = connected;
let rec = ctx.address().recipient();

// FIXME: Use genesis hash instead of names to avoid this mess
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
_ => (),
}

self.aggregator.do_send(AddNode { rec, conn_id, node });
} else {
if backlog.len() >= 10 {
backlog.remove(0);
}

backlog.push(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the fix here?

Copy link
Contributor Author

@maciejhirsz maciejhirsz Nov 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was that I had a separate backlog and Addr<Chain> per ConnId, but was using a single NodeId. Now the NodeId is tied to ConnId, everything else (using the enum for ConnMultiplex) is a refactor to make the intent clearer, and to avoid having 3 separate BTreeMaps for no good reason (one for chain address, one for backlog, one for node id).

}
}
}
}

if let Details::SystemConnected(connected) = msg.details {
let SystemConnected { network_id: _, mut node } = connected;
let rec = ctx.address().recipient();

// FIXME: mergin chains by network_id is not the way to do it.
// This will at least force all CC3 nodes to be aggregated with
// the rest.
let network_id = None; // network_id.map(Into::into);
match &*node.chain {
"Kusama CC3" => node.chain = "Kusama".into(),
"Polkadot CC1" => node.chain = "Polkadot".into(),
_ => (),
}
fn start_frame(&mut self, bytes: &[u8]) {
if !self.contbuf.is_empty() {
log::error!("Unused continuation buffer");
self.contbuf.clear();
}
self.continue_frame(bytes);
}

self.aggregator.do_send(AddNode { rec, network_id, node });
fn continue_frame(&mut self, bytes: &[u8]) {
if self.contbuf.len() + bytes.len() <= CONT_BUF_LIMIT {
self.contbuf.extend_from_slice(&bytes);
} else {
if self.backlog.len() >= 10 {
self.backlog.remove(0);
}

self.backlog.push(msg);
log::error!("Continuation buffer overflow");
self.contbuf = BytesMut::new();
}
}

fn finish_frame(&mut self) -> Bytes {
mem::replace(&mut self.contbuf, BytesMut::new()).freeze()
}
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Initialize(pub NodeId, pub Addr<Chain>);
pub struct Initialize {
pub nid: NodeId,
pub conn_id: ConnId,
pub chain: Addr<Chain>,
}

impl Handler<Initialize> for NodeConnector {
type Result = ();

fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
let Initialize(nid, chain) = msg;
let backlog = std::mem::replace(&mut self.backlog, Vec::new());
let Initialize { nid, conn_id, chain } = msg;

for msg in backlog {
chain.do_send(UpdateNode { nid, msg, raw: None });
}
let mx = self.multiplex.entry(conn_id).or_default();

if let ConnMultiplex::Waiting { backlog } = mx {
for msg in backlog.drain(..) {
chain.do_send(UpdateNode { nid, msg, raw: None });
}

self.nid = nid;
self.chain = Some(chain.clone());
*mx = ConnMultiplex::Connected {
nid,
chain: chain.clone(),
};
};

// Acquire the node's physical location
if let Some(ip) = self.ip {
Expand All @@ -148,9 +203,19 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
return;
}
Ok(ws::Message::Nop) => return,
Ok(ws::Message::Continuation(_)) => {
log::error!("Continuation not supported");
return;
Ok(ws::Message::Continuation(cont)) => match cont {
Item::FirstText(bytes) | Item::FirstBinary(bytes) => {
self.start_frame(&bytes);
return;
}
Item::Continue(bytes) => {
self.continue_frame(&bytes);
return;
}
Item::Last(bytes) => {
self.continue_frame(&bytes);
self.finish_frame()
}
}
Err(error) => {
log::error!("{:?}", error);
Expand Down
3 changes: 2 additions & 1 deletion backend/src/node/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde::de::IgnoredAny;
use crate::node::NodeDetails;
use crate::types::{Block, BlockNumber, BlockHash};
use crate::types::{Block, BlockNumber, BlockHash, ConnId};

#[derive(Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct NodeMessage {
pub ts: DateTime<Utc>,
pub id: Option<ConnId>,
#[serde(flatten)]
pub details: Details,
}
Expand Down
1 change: 1 addition & 0 deletions backend/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::Deserialize;
use crate::util::{MeanList, now};

pub type NodeId = usize;
pub type ConnId = u64;
pub type BlockNumber = u64;
pub type Timestamp = u64;
pub type Address = Box<str>;
Expand Down