Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scalable gossip library #1546

Merged
merged 7 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ unstable = []
atty = "0.2"
bincode = "1.0.0"
bs58 = "0.2.0"
bv = { version = "0.10.0", features = ["serde"] }
byteorder = "1.2.1"
bytes = "0.4"
chrono = { version = "0.4.0", features = ["serde"] }
Expand All @@ -88,6 +89,7 @@ solana-jsonrpc-pubsub = "0.3.0"
solana-jsonrpc-ws-server = "0.3.0"
ipnetwork = "0.12.7"
itertools = "0.7.8"
indexmap = "1.0"
libc = "0.2.43"
libloading = "0.5.0"
log = "0.4.2"
Expand Down
20 changes: 7 additions & 13 deletions src/bin/bench-tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn sample_tx_count(
let mut max_tps = 0.0;
let mut total;

let log_prefix = format!("{:21}:", v.contact_info.tpu.to_string());
let log_prefix = format!("{:21}:", v.tpu.to_string());

loop {
let tx_count = client.transaction_count();
Expand Down Expand Up @@ -106,7 +106,7 @@ fn sample_tx_count(
tps: max_tps,
tx: total,
};
maxes.write().unwrap().push((v.contact_info.tpu, stats));
maxes.write().unwrap().push((v.tpu, stats));
break;
}
}
Expand Down Expand Up @@ -257,7 +257,7 @@ fn do_tx_transfers(
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
leader.contact_info.tpu
leader.tpu
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
Expand Down Expand Up @@ -377,7 +377,7 @@ fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], token
}

fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_count: u64) {
let mut drone_addr = leader.contact_info.tpu;
let mut drone_addr = leader.tpu;
drone_addr.set_port(DRONE_PORT);

let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
Expand Down Expand Up @@ -638,7 +638,7 @@ fn main() {

let leader = leader.unwrap();

println!("leader RPC is at {} {}", leader.contact_info.rpc, leader.id);
println!("leader RPC is at {} {}", leader.rpc, leader.id);
let mut client = mk_client(&leader);
let mut barrier_client = mk_client(&leader);

Expand Down Expand Up @@ -804,7 +804,7 @@ fn converge(
//lets spy on the network
let (node, gossip_socket) = ClusterInfo::spy_node();
let mut spy_cluster_info = ClusterInfo::new(node).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader);
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let window = Arc::new(RwLock::new(default_window()));
Expand All @@ -818,13 +818,7 @@ fn converge(
println!("{}", spy_ref.node_info_trace());

if spy_ref.leader_data().is_some() {
v = spy_ref
.table
.values()
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc))
.cloned()
.collect();

v = spy_ref.rpc_peers();
if v.len() >= num_nodes {
println!("CONVERGED!");
break;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn main() {
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
(data.keypair(), data.node_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
Expand Down
4 changes: 2 additions & 2 deletions src/bin/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn main() {
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
(data.keypair(), data.node_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
Expand Down Expand Up @@ -129,7 +129,7 @@ fn main() {

let mut client = mk_client(&leader_info);

let mut drone_addr = leader_info.contact_info.tpu;
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
let airdrop_amount = 5;
if let Err(e) = request_airdrop(&drone_addr, &keypair.pubkey(), airdrop_amount) {
Expand Down
103 changes: 103 additions & 0 deletions src/bloom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Simple Bloom Filter
use bv::BitVec;
use rand::{self, Rng};
use std::cmp;
use std::marker::PhantomData;

/// Generate a stable hash of `self` for each `hash_index`
/// Best effort can be made for uniqueness of each hash.
pub trait BloomHashIndex {
fn hash(&self, hash_index: u64) -> u64;
}

#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct Bloom<T: BloomHashIndex> {
pub keys: Vec<u64>,
pub bits: BitVec<u8>,
_phantom: PhantomData<T>,
}

impl<T: BloomHashIndex> Bloom<T> {
/// create filter optimal for num size given the `false_rate`
/// the keys are randomized for picking data out of a collision resistant hash of size
/// `keysize` bytes
/// https://hur.st/bloomfilter/
pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self {
let min_num_bits = ((num as f64 * false_rate.log(2f64))
/ (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)).ceil()
as usize;
let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits));
let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize;
let keys: Vec<u64> = (0..num_keys).map(|_| rand::thread_rng().gen()).collect();
let bits = BitVec::new_fill(false, num_bits as u64);
Bloom {
keys,
bits,
_phantom: Default::default(),
}
}
fn pos(&self, key: &T, k: u64) -> u64 {
key.hash(k) % self.bits.len()
}
pub fn add(&mut self, key: &T) {
for k in &self.keys {
let pos = self.pos(key, *k);
self.bits.set(pos, true);
}
}
pub fn contains(&mut self, key: &T) -> bool {
for k in &self.keys {
let pos = self.pos(key, *k);
if !self.bits.get(pos) {
return false;
}
}
true
}
}

#[cfg(test)]
mod test {
use super::*;
use hash::{hash, Hash};

#[test]
fn test_bloom_filter() {
//empty
let bloom: Bloom<Hash> = Bloom::random(0, 0.1, 100);
assert_eq!(bloom.keys.len(), 0);
assert_eq!(bloom.bits.len(), 1);

//normal
let bloom: Bloom<Hash> = Bloom::random(10, 0.1, 100);
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 34);

//saturated
let bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);
assert_eq!(bloom.keys.len(), 1);
assert_eq!(bloom.bits.len(), 100);
}
#[test]
fn test_add_contains() {
let mut bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);

let key = hash(b"hello");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));

let key = hash(b"world");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));
}
#[test]
fn test_random() {
let mut b1: Bloom<Hash> = Bloom::random(10, 0.1, 100);
let mut b2: Bloom<Hash> = Bloom::random(10, 0.1, 100);
b1.keys.sort();
b2.keys.sort();
assert_ne!(b1.keys, b2.keys);
}
}
7 changes: 6 additions & 1 deletion src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use packet::{index_blobs, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
Expand All @@ -32,6 +33,7 @@ pub enum BroadcastStageReturnType {
fn broadcast(
max_tick_height: Option<u64>,
tick_height: &mut u64,
leader_id: Pubkey,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
Expand Down Expand Up @@ -140,6 +142,7 @@ fn broadcast(
// Send blobs out from the window
ClusterInfo::broadcast(
Some(*tick_height) == max_tick_height,
leader_id,
&node_info,
&broadcast_table,
&window,
Expand Down Expand Up @@ -211,10 +214,12 @@ impl BroadcastStage {
let me = cluster_info.read().unwrap().my_data().clone();
let mut tick_height_ = tick_height;
loop {
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
let broadcast_table = cluster_info.read().unwrap().tpu_peers();
let leader_id = cluster_info.read().unwrap().leader_id();
if let Err(e) = broadcast(
max_tick_height,
&mut tick_height_,
leader_id,
&me,
&broadcast_table,
&window,
Expand Down
Loading