Skip to content

Commit

Permalink
Cluster Replciated Data Store
Browse files Browse the repository at this point in the history
Separate the data storage and merge strategy from the newtwork IO boundary.
Implement an eager push overlay for transporting recent messages.

Simulation shows fast convergance with 20k nodes.
  • Loading branch information
aeyakovenko committed Nov 9, 2018
1 parent cf8f3bc commit 3841f2d
Show file tree
Hide file tree
Showing 31 changed files with 2,640 additions and 1,609 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ unstable = []
atty = "0.2"
bincode = "1.0.0"
bs58 = "0.2.0"
bv = { version = "0.10.0", features = ["serde"] }
byteorder = "1.2.1"
bytes = "0.4"
chrono = { version = "0.4.0", features = ["serde"] }
Expand All @@ -88,6 +89,7 @@ solana-jsonrpc-pubsub = "0.3.0"
solana-jsonrpc-ws-server = "0.3.0"
ipnetwork = "0.12.7"
itertools = "0.7.8"
indexmap = "1.0"
libc = "0.2.43"
libloading = "0.5.0"
log = "0.4.2"
Expand Down
20 changes: 7 additions & 13 deletions src/bin/bench-tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn sample_tx_count(
let mut max_tps = 0.0;
let mut total;

let log_prefix = format!("{:21}:", v.contact_info.tpu.to_string());
let log_prefix = format!("{:21}:", v.tpu.to_string());

loop {
let tx_count = client.transaction_count();
Expand Down Expand Up @@ -106,7 +106,7 @@ fn sample_tx_count(
tps: max_tps,
tx: total,
};
maxes.write().unwrap().push((v.contact_info.tpu, stats));
maxes.write().unwrap().push((v.tpu, stats));
break;
}
}
Expand Down Expand Up @@ -257,7 +257,7 @@ fn do_tx_transfers(
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
leader.contact_info.tpu
leader.tpu
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
Expand Down Expand Up @@ -377,7 +377,7 @@ fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], token
}

fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &Keypair, tx_count: i64) {
let mut drone_addr = leader.contact_info.tpu;
let mut drone_addr = leader.tpu;
drone_addr.set_port(DRONE_PORT);

let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
Expand Down Expand Up @@ -638,7 +638,7 @@ fn main() {

let leader = leader.unwrap();

println!("leader is at {} {}", leader.contact_info.rpu, leader.id);
println!("leader is at {} {}", leader.rpu, leader.id);
let mut client = mk_client(&leader);
let mut barrier_client = mk_client(&leader);

Expand Down Expand Up @@ -804,7 +804,7 @@ fn converge(
//lets spy on the network
let (node, gossip_socket) = ClusterInfo::spy_node();
let mut spy_cluster_info = ClusterInfo::new(node).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader);
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let window = Arc::new(RwLock::new(default_window()));
Expand All @@ -818,13 +818,7 @@ fn converge(
println!("{}", spy_ref.node_info_trace());

if spy_ref.leader_data().is_some() {
v = spy_ref
.table
.values()
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();

v = spy_ref.rpu_peers();
if v.len() >= num_nodes {
println!("CONVERGED!");
break;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn main() {
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
(data.keypair(), data.node_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
Expand Down
4 changes: 2 additions & 2 deletions src/bin/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn parse_args(matches: &ArgMatches) -> Result<WalletConfig, Box<error::Error

let leader = poll_gossip_for_leader(network, timeout)?;

let mut drone_addr = leader.contact_info.tpu;
let mut drone_addr = leader.tpu;
drone_addr.set_port(DRONE_PORT);

let rpc_addr = if let Some(proxy) = matches.value_of("proxy") {
Expand All @@ -62,7 +62,7 @@ pub fn parse_args(matches: &ArgMatches) -> Result<WalletConfig, Box<error::Error
} else {
RPC_PORT
};
let mut rpc_addr = leader.contact_info.tpu;
let mut rpc_addr = leader.tpu;
rpc_addr.set_port(rpc_port);
format!("http://{}", rpc_addr.to_string())
};
Expand Down
106 changes: 106 additions & 0 deletions src/bloom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Simple Bloom Filter
use bv::BitVec;
use rand::{self, Rng};
use std::cmp;
use std::marker::PhantomData;

/// Generate a stable hash of `self` for each `hash_index`
/// Best effort can be made for uniqueness of each hash.
pub trait BloomHashIndex {
fn hash(&self, hash_index: u64) -> u64;
}

#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct Bloom<T: BloomHashIndex> {
pub keys: Vec<u64>,
pub bits: BitVec<u8>,
_phantom: PhantomData<T>,
}

impl<T: BloomHashIndex> Bloom<T> {
/// create filter optimal for num size given the `false_rate`
/// the keys are randomized for picking data out of a collision resistant hash of size
/// `keysize` bytes
/// https://hur.st/bloomfilter/
pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self {
let min_num_bits = ((num as f64 * false_rate.log(2f64))
/ (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)).ceil()
as usize;
let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits));
let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize;
let keys: Vec<u64> = (0..num_keys)
.into_iter()
.map(|_| rand::thread_rng().gen())
.collect();
let bits = BitVec::new_fill(false, num_bits as u64);
Bloom {
keys,
bits,
_phantom: Default::default(),
}
}
fn pos(&self, key: &T, k: u64) -> u64 {
key.hash(k) % self.bits.len()
}
pub fn add(&mut self, key: &T) {
for k in &self.keys {
let pos = self.pos(key, *k);
self.bits.set(pos, true);
}
}
pub fn contains(&mut self, key: &T) -> bool {
for k in &self.keys {
let pos = self.pos(key, *k);
if !self.bits.get(pos) {
return false;
}
}
return true;
}
}

#[cfg(test)]
mod test {
use super::*;
use hash::{hash, Hash};

#[test]
fn test_bloom_filter() {
//empty
let bloom: Bloom<Hash> = Bloom::random(0, 0.1, 100);
assert_eq!(bloom.keys.len(), 0);
assert_eq!(bloom.bits.len(), 1);

//normal
let bloom: Bloom<Hash> = Bloom::random(10, 0.1, 100);
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 34);

//saturated
let bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);
assert_eq!(bloom.keys.len(), 1);
assert_eq!(bloom.bits.len(), 100);
}
#[test]
fn test_add_contains() {
let mut bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);

let key = hash(b"hello");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));

let key = hash(b"world");
assert!(!bloom.contains(&key));
bloom.add(&key);
assert!(bloom.contains(&key));
}
#[test]
fn test_random() {
let mut b1: Bloom<Hash> = Bloom::random(10, 0.1, 100);
let mut b2: Bloom<Hash> = Bloom::random(10, 0.1, 100);
b1.keys.sort();
b2.keys.sort();
assert_ne!(b1.keys, b2.keys);
}
}
7 changes: 6 additions & 1 deletion src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use packet::SharedBlobs;
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
Expand All @@ -33,6 +34,7 @@ pub enum BroadcastStageReturnType {
fn broadcast(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
mut tick_height: u64,
leader_id: Pubkey,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
Expand Down Expand Up @@ -139,6 +141,7 @@ fn broadcast(
ClusterInfo::broadcast(
&leader_scheduler,
tick_height,
leader_id,
&node_info,
&broadcast_table,
&window,
Expand Down Expand Up @@ -208,10 +211,12 @@ impl BroadcastStage {
let mut receive_index = entry_height;
let me = cluster_info.read().unwrap().my_data().clone();
loop {
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
let broadcast_table = cluster_info.read().unwrap().tpu_peers();
let leader_id = cluster_info.read().unwrap().leader_id();
if let Err(e) = broadcast(
leader_scheduler,
tick_height,
leader_id,
&me,
&broadcast_table,
&window,
Expand Down
Loading

0 comments on commit 3841f2d

Please sign in to comment.