Skip to content

Commit

Permalink
feat: relay msg to peers and network tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
quake authored and doitian committed Nov 19, 2018
1 parent 5032495 commit b957d2b
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ impl MinerService {

fn announce_new_block(&self, block: &Arc<Block>) {
self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| {
for peer in self.network.connected_peers_indexes() {
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new());
fbb.finish(message, None);
for peer in nc.connected_peers() {
debug!(target: "miner", "announce new block to peer#{}, {} => {}",
peer, block.header().number(), block.header().hash());
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new());
fbb.finish(message, None);
let _ = nc.send(peer, fbb.finished_data().to_vec());
}
});
Expand Down
7 changes: 7 additions & 0 deletions network/src/ckb_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub trait CKBProtocolContext: Send {
.and_then(|session| Some((*peer_index, session)))
}).collect()
}
fn connected_peers(&self) -> Vec<PeerIndex>;
}

pub(crate) struct DefaultCKBProtocolContext {
Expand Down Expand Up @@ -147,6 +148,12 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
fn protocol_id(&self) -> ProtocolId {
self.protocol_id
}

fn connected_peers(&self) -> Vec<PeerIndex> {
let peers_registry = self.network.peers_registry().read();
let iter = peers_registry.connected_peers_indexes();
iter.collect::<Vec<_>>()
}
}

pub trait CKBProtocolHandler: Sync + Send {
Expand Down
19 changes: 1 addition & 18 deletions network/src/network_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::NetworkConfig;
use super::{Error, ErrorKind, ProtocolId};
use super::{NetworkConfig, PeerIndex};
use ckb_protocol::CKBProtocol;
use ckb_protocol_handler::CKBProtocolHandler;
use ckb_protocol_handler::{CKBProtocolContext, DefaultCKBProtocolContext};
Expand Down Expand Up @@ -45,23 +45,6 @@ impl NetworkService {
&self.network.peer_store()
}

#[cfg_attr(feature = "cargo-clippy", allow(let_and_return))]
pub fn connected_peers(&self) -> Vec<PeerId> {
let peers_registry = self.peers_registry().read();
let peers = peers_registry
.connected_peers()
.map(|peer_id| peer_id.to_owned())
.collect::<Vec<_>>();
peers
}

#[cfg_attr(feature = "cargo-clippy", allow(let_and_return))]
pub fn connected_peers_indexes(&self) -> Vec<PeerIndex> {
let peers_registry = self.peers_registry().read();
let peers = peers_registry.connected_peers_indexes().collect::<Vec<_>>();
peers
}

pub fn add_peer(&self, peer_id: PeerId, peer: PeerConnection) {
let mut peers_registry = self.peers_registry().write();
peers_registry.add_peer(peer_id, peer);
Expand Down
5 changes: 0 additions & 5 deletions network/src/peers_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,6 @@ impl PeersRegistry {
}
}

#[inline]
pub fn connected_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
Box::new(self.peer_connections.iter().map(|(k, _v)| k))
}

#[inline]
pub fn connected_peers_indexes<'a>(&'a self) -> impl Iterator<Item = PeerIndex> + 'a {
Box::new(
Expand Down
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde_json = "1.0"
log = "0.4"
crossbeam-channel = "0.2"
fnv = "1.0.3"
flatbuffers = "0.5.0"

[dev-dependencies]
ckb-db = { path = "../db" }
Expand Down
25 changes: 16 additions & 9 deletions rpc/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use bigint::H256;
use ckb_pow::Clicker;
use core::header::{BlockNumber, Header};
use core::transaction::{OutPoint, Transaction};
use flatbuffers::FlatBufferBuilder;
use jsonrpc_core::{Error, IoHandler, Result};
use jsonrpc_http_server::ServerBuilder;
use jsonrpc_server_utils::cors::AccessControlAllowOrigin;
use jsonrpc_server_utils::hosts::DomainsValidation;
use network::NetworkService;
use pool::txs_pool::TransactionPoolController;
use protocol::RelayMessage;
use shared::index::ChainIndex;
use shared::shared::{ChainProvider, Shared};
use std::sync::Arc;
use sync::RELAY_PROTOCOL_ID;

//TODO: build_rpc_trait! do not surppot trait bounds
build_rpc_trait! {
Expand Down Expand Up @@ -74,17 +77,21 @@ impl<CI: ChainIndex + 'static> IntegrationTestRpc for RpcImpl<CI> {
}

fn send_transaction(&self, tx: Transaction) -> Result<H256> {
let indexed_tx: Transaction = tx.into();
let result = indexed_tx.hash();
let pool_result = self.tx_pool.add_transaction(indexed_tx.clone());
let tx_hash = tx.hash();
let pool_result = self.tx_pool.add_transaction(tx.clone());
debug!(target: "rpc", "send_transaction add to pool result: {:?}", pool_result);

// TODO PENDING new api NetworkContext#connected_peers
// for peer_id in self.nc.connected_peers() {
// let data = builde_transaction(indexed_tx);
// self.nc.send(peer_id, 0, data.to_vec());
// }
Ok(result)
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
fbb.finish(message, None);

self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| {
for peer in nc.connected_peers() {
debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer);
let _ = nc.send(peer, fbb.finished_data().to_vec());
}
});
Ok(tx_hash)
}

fn get_block(&self, hash: H256) -> Result<Option<BlockWithHash>> {
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
extern crate bigint;
extern crate flatbuffers;
extern crate jsonrpc_core;
#[macro_use]
extern crate jsonrpc_macros;
Expand All @@ -12,7 +13,9 @@ extern crate ckb_db as db;
extern crate ckb_network as network;
extern crate ckb_notify as notify;
extern crate ckb_pool as pool;
extern crate ckb_protocol as protocol;
extern crate ckb_shared as shared;
extern crate ckb_sync as sync;
extern crate ckb_time;
#[cfg(test)]
extern crate ckb_verification as verification;
Expand Down
31 changes: 20 additions & 11 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ use super::{BlockWithHash, CellOutputWithOutPoint, Config, TransactionWithHash};
use bigint::H256;
use core::header::{BlockNumber, Header};
use core::transaction::{OutPoint, Transaction};
use flatbuffers::FlatBufferBuilder;
use jsonrpc_core::{Error, IoHandler, Result};
use jsonrpc_http_server::ServerBuilder;
use jsonrpc_server_utils::cors::AccessControlAllowOrigin;
use jsonrpc_server_utils::hosts::DomainsValidation;
use network::NetworkService;
use pool::txs_pool::TransactionPoolController;
use protocol::RelayMessage;
use shared::index::ChainIndex;
use shared::shared::{ChainProvider, Shared};
use std::sync::Arc;
use sync::RELAY_PROTOCOL_ID;

build_rpc_trait! {
pub trait Rpc {
Expand Down Expand Up @@ -46,23 +49,29 @@ build_rpc_trait! {
}

struct RpcImpl<CI> {
_network: Arc<NetworkService>,
network: Arc<NetworkService>,
shared: Shared<CI>,
tx_pool: TransactionPoolController,
controller: RpcController,
}

impl<CI: ChainIndex + 'static> Rpc for RpcImpl<CI> {
fn send_transaction(&self, tx: Transaction) -> Result<H256> {
let result = tx.hash();
let pool_result = self.tx_pool.add_transaction(tx);
let tx_hash = tx.hash();
let pool_result = self.tx_pool.add_transaction(tx.clone());
debug!(target: "rpc", "send_transaction add to pool result: {:?}", pool_result);
// TODO PENDING new api NetworkContext#connected_peers
// for peer_id in self.nc.connected_peers() {
// let data = builde_transaction(indexed_tx);
// self.nc.send(peer_id, 0, data.to_vec());
// }
Ok(result)

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
fbb.finish(message, None);

self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| {
for peer in nc.connected_peers() {
debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer);
let _ = nc.send(peer, fbb.finished_data().to_vec());
}
});
Ok(tx_hash)
}

fn get_block(&self, hash: H256) -> Result<Option<BlockWithHash>> {
Expand Down Expand Up @@ -130,7 +139,7 @@ pub struct RpcServer {
impl RpcServer {
pub fn start<CI: ChainIndex + 'static>(
&self,
_network: Arc<NetworkService>,
network: Arc<NetworkService>,
shared: Shared<CI>,
tx_pool: TransactionPoolController,
controller: RpcController,
Expand All @@ -140,7 +149,7 @@ impl RpcServer {
let mut io = IoHandler::new();
io.extend_with(
RpcImpl {
_network,
network,
shared,
tx_pool,
controller,
Expand Down
21 changes: 13 additions & 8 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ckb_shared::index::ChainIndex;
use flatbuffers::FlatBufferBuilder;
use network::{CKBProtocolContext, PeerIndex};
use relayer::Relayer;
use std::collections::HashSet;

pub struct CompactBlockProcess<'a, CI: ChainIndex + 'a> {
message: &'a FbsCompactBlock<'a>,
Expand Down Expand Up @@ -45,14 +46,18 @@ where

match self.relayer.reconstruct_block(&compact_block, Vec::new()) {
(Some(block), _) => {
let _ = self.relayer.accept_block(self.peer, block);
// TODO PENDING new api NetworkContext#connected_peers
// for peer_id in self.nc.connected_peers() {
// let fbb = &mut FlatBufferBuilder::new();
// let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new());
// fbb.finish(message, None);
// self.nc.send(peer_id, 0, fbb.finished_data().to_vec());
// }
if self.relayer.accept_block(self.peer, block.clone()).is_ok() {
let fbb = &mut FlatBufferBuilder::new();
let message =
RelayMessage::build_compact_block(fbb, &block, &HashSet::new());
fbb.finish(message, None);

for peer_id in self.nc.connected_peers() {
if peer_id != self.peer {
let _ = self.nc.send(peer_id, fbb.finished_data().to_vec());
}
}
}
}
(_, Some(missing_indexes)) => {
let hash = compact_block.header.hash();
Expand Down
29 changes: 17 additions & 12 deletions sync/src/relayer/transaction_process.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use ckb_protocol::Transaction;
use ckb_protocol::{RelayMessage, Transaction as FbsTransaction};
use ckb_shared::index::ChainIndex;
use core::transaction::Transaction;
use flatbuffers::FlatBufferBuilder;
use network::{CKBProtocolContext, PeerIndex};
use relayer::Relayer;

// TODO PENDING remove this attribute later
#[allow(dead_code)]
pub struct TransactionProcess<'a, CI: ChainIndex + 'a> {
message: &'a Transaction<'a>,
message: &'a FbsTransaction<'a>,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a CKBProtocolContext,
Expand All @@ -17,7 +17,7 @@ where
CI: ChainIndex + 'static,
{
pub fn new(
message: &'a Transaction,
message: &'a FbsTransaction,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a CKBProtocolContext,
Expand All @@ -31,12 +31,17 @@ where
}

pub fn execute(self) {
let tx = (*self.message).into();
let _ = self.relayer.tx_pool.add_transaction(tx);
// TODO PENDING new api NetworkContext#connected_peers
// for peer_id in self.nc.connected_peers() {
// let data = builde_transaction(indexed_tx);
// self.nc.send(peer_id, 0, data.to_vec());
// }
let tx: Transaction = (*self.message).into();
let _ = self.relayer.tx_pool.add_transaction(tx.clone());

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
fbb.finish(message, None);

for peer_id in self.nc.connected_peers() {
if peer_id != self.peer {
let _ = self.nc.send(peer_id, fbb.finished_data().to_vec());
}
}
}
}
4 changes: 4 additions & 0 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,10 @@ mod tests {
fn protocol_id(&self) -> ProtocolId {
unimplemented!();
}

fn connected_peers(&self) -> Vec<PeerIndex> {
unimplemented!();
}
}

fn mock_network_context(peer_num: usize) -> DummyNetworkContext {
Expand Down
5 changes: 5 additions & 0 deletions sync/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ impl NetworkContext for TestNetworkContext {
None
}

fn connected_peers(&self) -> Vec<PeerIndex> {
self.msg_senders.keys().map(|k| k.1).collect::<Vec<_>>()
}
}

fn subprotocol_name(&self) -> ProtocolId {
[1, 1, 1]
}
Expand Down

0 comments on commit b957d2b

Please sign in to comment.