From 1625c125fa392224bf32d8ad38656c3723e5d435 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:08:53 -0700 Subject: [PATCH 1/9] generic array fail case --- src/bin/fullnode.rs | 21 ++++++++----- src/crdt.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++- src/server.rs | 7 ++--- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 5bddad530ec121..38a43504d2386d 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -36,7 +36,12 @@ fn main() { let mut opts = Options::new(); opts.optflag("h", "help", "print help"); opts.optopt("l", "", "run with the identity found in FILE", "FILE"); - opts.optopt("v", "", "validate; find leader's identity in FILE", "FILE"); + opts.optopt( + "t", + "", + "testnet; connec to the network at this gossip entry point", + "host:port", + ); opts.optopt( "o", "", @@ -119,14 +124,14 @@ fn main() { } let exit = Arc::new(AtomicBool::new(false)); - let threads = if matches.opt_present("v") { - let path = matches.opt_str("v").unwrap(); + let threads = if matches.opt_present("t") { + let testnet = matches.opt_str("t").unwrap(); eprintln!( - "starting validator... {} using {}", - repl_data.requests_addr, path + "starting validator... {} connecting to {}", + repl_data.requests_addr, testnet ); - let file = File::open(path.clone()).expect(&format!("file not found: {}", path)); - let leader = serde_json::from_reader(file).expect("parse"); + let taddr = testnet.parse().unwrap(); + let entry = ReplicatedData::new_entry_point(taddr); let s = Server::new_validator( bank, repl_data.clone(), @@ -135,7 +140,7 @@ fn main() { UdpSocket::bind(repl_data.replicate_addr).unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(), UdpSocket::bind(repl_data.repair_addr).unwrap(), - leader, + entry, exit.clone(), ); s.thread_hdls diff --git a/src/crdt.rs b/src/crdt.rs index 483a73bd8537e3..97ceda7ead84e7 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -551,6 +551,33 @@ impl Crdt { blob_sender.send(q)?; Ok(()) } + /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection + fn top_leader(&self) -> Option { + let mut table = HashMap::new(); + let def = PublicKey::default(); + let cur = self.table.values().filter(|x| x.current_leader_id != def); + for v in cur { + let cnt = table.entry(&v.current_leader_id).or_insert(0); + //let cnt = table.get_mut(&v.current_leader_id).unwrap(); + *cnt += 1; + println!("leader {:?} {}", &v.current_leader_id[..4], *cnt); + } + let mut sorted: Vec<_> = table.iter().collect(); + sorted.sort_by_key(|a| a.1); + sorted.last().map(|a| *(*(*a).0)) + } + + /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection + /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet + fn update_leader(&mut self) { + if let Some(lid) = self.top_leader() { + if self.my_data().current_leader_id != lid { + if self.table.get(&lid).is_some() { + self.set_leader(lid); + } + } + } + } /// Apply updates that we received from the identity `from` /// # Arguments @@ -577,6 +604,7 @@ impl Crdt { Builder::new() .name("solana-gossip".to_string()) .spawn(move || loop { + let start = timestamp(); let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); obj.write().unwrap().purge(timestamp()); if exit.load(Ordering::Relaxed) { @@ -584,7 +612,12 @@ impl Crdt { } //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); + let _ = obj.write().unwrap().update_leader(); + let elapsed = timestamp() - start; + if GOSSIP_SLEEP_MILLIS > elapsed { + let left = GOSSIP_SLEEP_MILLIS - elapsed; + sleep(Duration::from_millis(left)); + } }) .unwrap() } @@ -825,6 +858,7 @@ mod tests { use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; + use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1166,4 +1200,40 @@ mod tests { assert_eq!(blob.get_id().unwrap(), id); } } + /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection, + /// delete this test after leader selection is correctly implemented + #[test] + fn test_update_leader() { + let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mut crdt = Crdt::new(me.clone()); + assert_matches!(crdt.top_leader(), None); + crdt.set_leader(lead.id); + assert_eq!(crdt.top_leader().unwrap(), lead.id); + //add a bunch of nodes with a new leader + for _ in 0..10 { + let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + dum.current_leader_id = lead2.id; + crdt.insert(&dum); + } + assert_eq!(crdt.top_leader().unwrap(), lead2.id); + crdt.update_leader(); + assert_eq!(crdt.my_data().current_leader_id, lead.id); + crdt.insert(&lead2); + crdt.update_leader(); + assert_eq!(crdt.my_data().current_leader_id, lead2.id); + } + #[test] + fn test_update_leader_pubkeys() { + let key1 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let key2 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let mut table = HashMap::new(); + table.entry(&key1.current_leader_id).or_insert(0); + for _ in 0..1000 { + let a = table.entry(&key2.current_leader_id).or_insert(0); + *a += 1; + } + assert_eq!(table.len(), 2); + } } diff --git a/src/server.rs b/src/server.rs index 821ebcd78ee01a..65752ef1b3bb36 100644 --- a/src/server.rs +++ b/src/server.rs @@ -134,7 +134,7 @@ impl Server { replicate_socket: UdpSocket, gossip_listen_socket: UdpSocket, repair_socket: UdpSocket, - leader_repl_data: ReplicatedData, + entry_point: ReplicatedData, exit: Arc, ) -> Self { let bank = Arc::new(bank); @@ -143,12 +143,9 @@ impl Server { thread_hdls.extend(rpu.thread_hdls); let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write() - .expect("'crdt' write lock in pub fn replicate") - .set_leader(leader_repl_data.id); crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(&leader_repl_data); + .insert(&entry_point); let window = streamer::default_window(); let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); From c819150f4644de5179d9a1df0097bce802f9a371 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:30:40 -0700 Subject: [PATCH 2/9] tests --- src/crdt.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 97ceda7ead84e7..0a57c549b423f1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -857,8 +857,10 @@ mod tests { use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; use packet::BlobRecycler; use result::Error; + use signature::PublicKey; use signature::{KeyPair, KeyPairUtil}; use std::collections::HashMap; + use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1224,14 +1226,28 @@ mod tests { crdt.update_leader(); assert_eq!(crdt.my_data().current_leader_id, lead2.id); } + struct TestArrays { + pub v1: SocketAddr, + pub key2: PublicKey, + pub v3: SocketAddr, + } + #[test] - fn test_update_leader_pubkeys() { - let key1 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); - let key2 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + fn test_update_leader_keys() { + let key1 = TestArrays { + v1: "127.0.0.1:4124".parse().unwrap(), + key2: KeyPair::new().pubkey(), + v3: "224.245.124.012:4124".parse().unwrap(), + }; + let key2 = TestArrays { + v1: "134.245.124.012:4000".parse().unwrap(), + key2: KeyPair::new().pubkey(), + v3: "244.245.124.124:4124".parse().unwrap(), + }; let mut table = HashMap::new(); - table.entry(&key1.current_leader_id).or_insert(0); + table.entry(&key1.key2).or_insert(0); for _ in 0..1000 { - let a = table.entry(&key2.current_leader_id).or_insert(0); + let a = table.entry(&key2.key2).or_insert(0); *a += 1; } assert_eq!(table.len(), 2); From 7cd5b3fd2f48ec77418d22088d025412269455b4 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:34:59 -0700 Subject: [PATCH 3/9] get rid of dummy test --- src/crdt.rs | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 0a57c549b423f1..eae363c61eaada 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -857,10 +857,7 @@ mod tests { use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; use packet::BlobRecycler; use result::Error; - use signature::PublicKey; use signature::{KeyPair, KeyPairUtil}; - use std::collections::HashMap; - use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1226,30 +1223,4 @@ mod tests { crdt.update_leader(); assert_eq!(crdt.my_data().current_leader_id, lead2.id); } - struct TestArrays { - pub v1: SocketAddr, - pub key2: PublicKey, - pub v3: SocketAddr, - } - - #[test] - fn test_update_leader_keys() { - let key1 = TestArrays { - v1: "127.0.0.1:4124".parse().unwrap(), - key2: KeyPair::new().pubkey(), - v3: "224.245.124.012:4124".parse().unwrap(), - }; - let key2 = TestArrays { - v1: "134.245.124.012:4000".parse().unwrap(), - key2: KeyPair::new().pubkey(), - v3: "244.245.124.124:4124".parse().unwrap(), - }; - let mut table = HashMap::new(); - table.entry(&key1.key2).or_insert(0); - for _ in 0..1000 { - let a = table.entry(&key2.key2).or_insert(0); - *a += 1; - } - assert_eq!(table.len(), 2); - } } From db7658376a7253e63c3f400cca03a427bc9ef17f Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:38:49 -0700 Subject: [PATCH 4/9] fix logs --- src/crdt.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index eae363c61eaada..9838977b3beefa 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -558,9 +558,8 @@ impl Crdt { let cur = self.table.values().filter(|x| x.current_leader_id != def); for v in cur { let cnt = table.entry(&v.current_leader_id).or_insert(0); - //let cnt = table.get_mut(&v.current_leader_id).unwrap(); *cnt += 1; - println!("leader {:?} {}", &v.current_leader_id[..4], *cnt); + trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt); } let mut sorted: Vec<_> = table.iter().collect(); sorted.sort_by_key(|a| a.1); @@ -855,6 +854,7 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; + use logger; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil}; @@ -1203,6 +1203,7 @@ mod tests { /// delete this test after leader selection is correctly implemented #[test] fn test_update_leader() { + logger::setup(); let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); From 710297376aa973211277c667d6001e898619d7f6 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:43:26 -0700 Subject: [PATCH 5/9] fix docs --- src/bin/fullnode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 38a43504d2386d..4910c1641eae01 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -40,7 +40,7 @@ fn main() { "t", "", "testnet; connec to the network at this gossip entry point", - "host:port", + "HOST:PORT", ); opts.optopt( "o", From 301f22f9e74bf2ea96c2a2acedd64ebe72eb99ac Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:44:56 -0700 Subject: [PATCH 6/9] nits --- src/bin/fullnode.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 4910c1641eae01..05bd7815a6e734 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -39,7 +39,7 @@ fn main() { opts.optopt( "t", "", - "testnet; connec to the network at this gossip entry point", + "testnet; connect to the network at this gossip entry point", "HOST:PORT", ); opts.optopt( From ecb39ae67a93b4612a76cd5ac0ed9159c9618e0d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 16:51:39 -0700 Subject: [PATCH 7/9] fixed! --- src/crdt.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/crdt.rs b/src/crdt.rs index 9838977b3beefa..9c865a3edc7c1d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1214,6 +1214,7 @@ mod tests { //add a bunch of nodes with a new leader for _ in 0..10 { let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + dum.id = KeyPair::new().pubkey(); dum.current_leader_id = lead2.id; crdt.insert(&dum); } From 60ce78910bf0f87b4ab918e48f13365321124592 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 17:01:38 -0700 Subject: [PATCH 8/9] comments --- src/bin/fullnode.rs | 10 +++++----- src/crdt.rs | 38 +++++++++++++++++++------------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 05bd7815a6e734..b1acf1bdd4f6f6 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -125,13 +125,13 @@ fn main() { let exit = Arc::new(AtomicBool::new(false)); let threads = if matches.opt_present("t") { - let testnet = matches.opt_str("t").unwrap(); + let testnet_address_string = matches.opt_str("t").unwrap(); eprintln!( "starting validator... {} connecting to {}", - repl_data.requests_addr, testnet + repl_data.requests_addr, testnet_address_string ); - let taddr = testnet.parse().unwrap(); - let entry = ReplicatedData::new_entry_point(taddr); + let testnet_addr = testnet_address_string.parse().unwrap(); + let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr); let s = Server::new_validator( bank, repl_data.clone(), @@ -140,7 +140,7 @@ fn main() { UdpSocket::bind(repl_data.replicate_addr).unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(), UdpSocket::bind(repl_data.repair_addr).unwrap(), - entry, + newtwork_entry_point, exit.clone(), ); s.thread_hdls diff --git a/src/crdt.rs b/src/crdt.rs index 9c865a3edc7c1d..3c8b13f69af5ea 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -551,7 +551,7 @@ impl Crdt { blob_sender.send(q)?; Ok(()) } - /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection + /// TODO: This is obviously the wrong way to do this. Need to implement leader selection fn top_leader(&self) -> Option { let mut table = HashMap::new(); let def = PublicKey::default(); @@ -566,13 +566,13 @@ impl Crdt { sorted.last().map(|a| *(*(*a).0)) } - /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection + /// TODO: This is obviously the wrong way to do this. Need to implement leader selection /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet fn update_leader(&mut self) { - if let Some(lid) = self.top_leader() { - if self.my_data().current_leader_id != lid { - if self.table.get(&lid).is_some() { - self.set_leader(lid); + if let Some(leader_id) = self.top_leader() { + if self.my_data().current_leader_id != leader_id { + if self.table.get(&leader_id).is_some() { + self.set_leader(leader_id); } } } @@ -614,8 +614,8 @@ impl Crdt { let _ = obj.write().unwrap().update_leader(); let elapsed = timestamp() - start; if GOSSIP_SLEEP_MILLIS > elapsed { - let left = GOSSIP_SLEEP_MILLIS - elapsed; - sleep(Duration::from_millis(left)); + let time_left = GOSSIP_SLEEP_MILLIS - elapsed; + sleep(Duration::from_millis(time_left)); } }) .unwrap() @@ -1199,30 +1199,30 @@ mod tests { assert_eq!(blob.get_id().unwrap(), id); } } - /// FIXME: This is obviously the wrong way to do this. Need to implement leader selection, + /// TODO: This is obviously the wrong way to do this. Need to implement leader selection, /// delete this test after leader selection is correctly implemented #[test] fn test_update_leader() { logger::setup(); let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader0 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()); - assert_matches!(crdt.top_leader(), None); - crdt.set_leader(lead.id); - assert_eq!(crdt.top_leader().unwrap(), lead.id); + assert_eq!(crdt.top_leader(), None); + crdt.set_leader(leader0.id); + assert_eq!(crdt.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); dum.id = KeyPair::new().pubkey(); - dum.current_leader_id = lead2.id; + dum.current_leader_id = leader1.id; crdt.insert(&dum); } - assert_eq!(crdt.top_leader().unwrap(), lead2.id); + assert_eq!(crdt.top_leader().unwrap(), leader1.id); crdt.update_leader(); - assert_eq!(crdt.my_data().current_leader_id, lead.id); - crdt.insert(&lead2); + assert_eq!(crdt.my_data().current_leader_id, leader0.id); + crdt.insert(&leader1); crdt.update_leader(); - assert_eq!(crdt.my_data().current_leader_id, lead2.id); + assert_eq!(crdt.my_data().current_leader_id, leader1.id); } } From b32b0aa9a813ead3feec18d6e828c6cbb14a638c Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 23 Jun 2018 17:13:45 -0700 Subject: [PATCH 9/9] borrow checker --- src/crdt.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 3c8b13f69af5ea..307d80a2e4eefb 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -561,9 +561,9 @@ impl Crdt { *cnt += 1; trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt); } - let mut sorted: Vec<_> = table.iter().collect(); + let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); sorted.sort_by_key(|a| a.1); - sorted.last().map(|a| *(*(*a).0)) + sorted.last().map(|a| *a.0) } /// TODO: This is obviously the wrong way to do this. Need to implement leader selection