diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index d5144062878555..c7d3f47774cd56 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -8,7 +8,7 @@ extern crate solana; use bincode::serialize; use clap::{App, Arg}; use rayon::prelude::*; -use solana::crdt::{Crdt, ReplicatedData}; +use solana::crdt::{Crdt, NodeInfo}; use solana::drone::DroneRequest; use solana::fullnode::Config; use solana::hash::Hash; @@ -38,7 +38,7 @@ fn sample_tx_count( exit: Arc, maxes: Arc>>, first_count: u64, - v: ReplicatedData, + v: NodeInfo, sample_period: u64, ) { let mut client = mk_client(&v); @@ -79,7 +79,7 @@ fn generate_and_send_txs( tx_clients: &Vec, id: &Mint, keypairs: &Vec, - leader: &ReplicatedData, + leader: &NodeInfo, txs: i64, last_id: &mut Hash, threads: usize, @@ -185,12 +185,12 @@ fn main() { ) .get_matches(); - let leader: ReplicatedData; + let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { leader = read_leader(l.to_string()).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = ReplicatedData::new_leader(&server_addr); + leader = NodeInfo::new_leader(&server_addr); }; let id: Mint; @@ -319,7 +319,7 @@ fn main() { } } -fn mk_client(r: &ReplicatedData) -> ThinClient { +fn mk_client(r: &NodeInfo) -> ThinClient { let requests_socket = udp_random_bind(8000, 10000, 5).unwrap(); let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap(); @@ -335,11 +335,11 @@ fn mk_client(r: &ReplicatedData) -> ThinClient { ) } -fn spy_node() -> (ReplicatedData, UdpSocket) { +fn spy_node() -> (NodeInfo, UdpSocket) { let gossip_socket_pair = udp_public_bind("gossip", 8000, 10000); let pubkey = KeyPair::new().pubkey(); let daddr = "0.0.0.0:0".parse().unwrap(); - let node = ReplicatedData::new( + let node = NodeInfo::new( pubkey, //gossip.local_addr().unwrap(), gossip_socket_pair.addr, @@ -352,11 +352,11 @@ fn spy_node() -> (ReplicatedData, UdpSocket) { } fn converge( - leader: &ReplicatedData, + leader: &NodeInfo, exit: Arc, num_nodes: usize, threads: &mut Vec>, -) -> Vec { +) -> Vec { //lets spy on the network let daddr = "0.0.0.0:0".parse().unwrap(); let (spy, spy_gossip) = spy_node(); @@ -376,7 +376,7 @@ fn converge( let mut rv = vec![]; //wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { - let v: Vec = spy_ref + let v: Vec = spy_ref .read() .unwrap() .table diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 8b8bc815d02e7a..76b3c93414d376 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -9,7 +9,7 @@ extern crate tokio_io; use bincode::deserialize; use clap::{App, Arg}; -use solana::crdt::ReplicatedData; +use solana::crdt::NodeInfo; use solana::drone::{Drone, DroneRequest}; use solana::fullnode::Config; use solana::mint::Mint; @@ -60,12 +60,12 @@ fn main() { ) .get_matches(); - let leader: ReplicatedData; + let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { leader = read_leader(l.to_string()).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = ReplicatedData::new_leader(&server_addr); + leader = NodeInfo::new_leader(&server_addr); }; let mint: Mint; diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 26f90ba24f8195..556241592e9bbf 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -8,7 +8,7 @@ extern crate solana; use atty::{is, Stream}; use clap::{App, Arg}; -use solana::crdt::{ReplicatedData, TestNode}; +use solana::crdt::{NodeInfo, TestNode}; use solana::fullnode::{Config, FullNode, InFile, OutFile}; use solana::service::Service; use solana::signature::{KeyPair, KeyPairUtil}; @@ -52,7 +52,7 @@ fn main() -> () { let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let mut keypair = KeyPair::new(); - let mut repl_data = ReplicatedData::new_leader_with_pubkey(keypair.pubkey(), &bind_addr); + let mut repl_data = NodeInfo::new_leader_with_pubkey(keypair.pubkey(), &bind_addr); if let Some(l) = matches.value_of("identity") { let path = l.to_string(); if let Ok(file) = File::open(path.clone()) { @@ -82,7 +82,7 @@ fn main() -> () { None, ) } else { - node.data.current_leader_id = node.data.id.clone(); + node.data.leader_id = node.data.id.clone(); let outfile = if let Some(o) = matches.value_of("output") { OutFile::Path(o.to_string()) diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 818e3b5a8da46f..97be24a9a6bab2 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -8,7 +8,7 @@ extern crate solana; use bincode::serialize; use clap::{App, Arg, SubCommand}; -use solana::crdt::ReplicatedData; +use solana::crdt::NodeInfo; use solana::drone::DroneRequest; use solana::fullnode::Config; use solana::mint::Mint; @@ -56,7 +56,7 @@ impl error::Error for WalletError { } struct WalletConfig { - leader: ReplicatedData, + leader: NodeInfo, id: Mint, drone_addr: SocketAddr, command: WalletCommand, @@ -66,7 +66,7 @@ impl Default for WalletConfig { fn default() -> WalletConfig { let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); WalletConfig { - leader: ReplicatedData::new_leader(&default_addr.clone()), + leader: NodeInfo::new_leader(&default_addr.clone()), id: Mint::new(0), drone_addr: default_addr.clone(), command: WalletCommand::Balance, @@ -141,12 +141,12 @@ fn parse_args() -> Result> { .subcommand(SubCommand::with_name("address").about("Get your public key")) .get_matches(); - let leader: ReplicatedData; + let leader: NodeInfo; if let Some(l) = matches.value_of("leader") { leader = read_leader(l.to_string()).node_info; } else { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); - leader = ReplicatedData::new_leader(&server_addr); + leader = NodeInfo::new_leader(&server_addr); }; let id: Mint; @@ -298,7 +298,7 @@ fn read_mint(path: String) -> Result> { Ok(mint) } -fn mk_client(r: &ReplicatedData) -> io::Result { +fn mk_client(r: &NodeInfo) -> io::Result { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 7a0850ca286840..fd235387978daa 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,4 +1,4 @@ -use crdt::{CrdtError, ReplicatedData}; +use crdt::{CrdtError, NodeInfo}; use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::thread_rng; use result::Result; @@ -9,7 +9,7 @@ use std::collections::HashMap; pub const DEFAULT_WEIGHT: u32 = 1; pub trait ChooseGossipPeerStrategy { - fn choose_peer<'a>(&self, options: Vec<&'a ReplicatedData>) -> Result<&'a ReplicatedData>; + fn choose_peer<'a>(&self, options: Vec<&'a NodeInfo>) -> Result<&'a NodeInfo>; } pub struct ChooseRandomPeerStrategy<'a> { @@ -27,7 +27,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> { } impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { - fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { + fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { if options.is_empty() { Err(CrdtError::TooSmall)?; } @@ -172,7 +172,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { - fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { + fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { if options.len() < 1 { Err(CrdtError::TooSmall)?; } diff --git a/src/crdt.rs b/src/crdt.rs index cb20bf8f1f73f7..77a2cee98e0d84 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -108,26 +108,32 @@ pub struct ContactInfo { /// destined to the replciate_addr pub tvu_window: SocketAddr, /// if this struture changes update this value as well - /// Always update `ReplicatedData` version too + /// Always update `NodeInfo` version too /// This separate version for addresses allows us to use the `Vote` - /// as means of updating the `ReplicatedData` table without touching the + /// as means of updating the `NodeInfo` table without touching the /// addresses if they haven't changed. pub version: u64, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct ReplicatedData { +pub struct LedgerState { + /// last verified hash that was submitted to the leader + pub last_id: Hash, + /// last verified entry count, always increasing + pub entry_height: u64, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct NodeInfo { pub id: PublicKey, /// If any of the bits change, update increment this value pub version: u64, /// network addresses pub contact_info: ContactInfo, /// current leader identity - pub current_leader_id: PublicKey, - /// last verified hash that was submitted to the leader - last_verified_id: Hash, - /// last verified count, always increasing - last_verified_height: u64, + pub leader_id: PublicKey, + /// information about the state of the ledger + ledger_state: LedgerState, } fn make_debug_id(buf: &[u8]) -> u64 { @@ -136,7 +142,7 @@ fn make_debug_id(buf: &[u8]) -> u64 { .expect("rdr.read_u64 in fn debug_id") } -impl ReplicatedData { +impl NodeInfo { pub fn new( id: PublicKey, ncp: SocketAddr, @@ -144,8 +150,8 @@ impl ReplicatedData { rpu: SocketAddr, tpu: SocketAddr, tvu_window: SocketAddr, - ) -> ReplicatedData { - ReplicatedData { + ) -> NodeInfo { + NodeInfo { id, version: 0, contact_info: ContactInfo { @@ -156,9 +162,11 @@ impl ReplicatedData { tvu_window, version: 0, }, - current_leader_id: PublicKey::default(), - last_verified_id: Hash::default(), - last_verified_height: 0, + leader_id: PublicKey::default(), + ledger_state: LedgerState { + last_id: Hash::default(), + entry_height: 0, + }, } } pub fn debug_id(&self) -> u64 { @@ -175,7 +183,7 @@ impl ReplicatedData { let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); let repair_addr = Self::next_port(&bind_addr, 4); - ReplicatedData::new( + NodeInfo::new( pubkey, gossip_addr, replicate_addr, @@ -190,7 +198,7 @@ impl ReplicatedData { } pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - ReplicatedData::new( + NodeInfo::new( PublicKey::default(), gossip_addr, daddr.clone(), @@ -201,9 +209,9 @@ impl ReplicatedData { } } -/// `Crdt` structure keeps a table of `ReplicatedData` structs +/// `Crdt` structure keeps a table of `NodeInfo` structs /// # Properties -/// * `table` - map of public id's to versioned and signed ReplicatedData structs +/// * `table` - map of public id's to versioned and signed NodeInfo structs /// * `local` - map of public id's to what `self.update_index` `self.table` was updated /// * `remote` - map of public id's to the `remote.update_index` was sent /// * `update_index` - my update index @@ -214,7 +222,7 @@ impl ReplicatedData { /// No attempt to keep track of timeouts or dropped requests is made, or should be. pub struct Crdt { /// table of everyone in the network - pub table: HashMap, + pub table: HashMap, /// Value of my update index when entry in table was updated. /// Nodes will ask for updates since `update_index`, and this node /// should respond with all the identities that are greater then the @@ -238,17 +246,17 @@ enum Protocol { /// this doesn't update the `remote` update index, but it allows the /// recepient of this request to add knowledge of this node to the network /// (last update index i saw from you, my replicated data) - RequestUpdates(u64, ReplicatedData), + RequestUpdates(u64, NodeInfo), //TODO might need a since? - /// from id, form's last update index, ReplicatedData - ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), + /// from id, form's last update index, NodeInfo + ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), /// ask for a missing index /// (my replicated data to keep alive, missing window index) - RequestWindowIndex(ReplicatedData, u64), + RequestWindowIndex(NodeInfo, u64), } impl Crdt { - pub fn new(me: ReplicatedData) -> Crdt { + pub fn new(me: NodeInfo) -> Crdt { assert_eq!(me.version, 0); let mut g = Crdt { table: HashMap::new(), @@ -266,11 +274,11 @@ impl Crdt { pub fn debug_id(&self) -> u64 { make_debug_id(&self.me) } - pub fn my_data(&self) -> &ReplicatedData { + pub fn my_data(&self) -> &NodeInfo { &self.table[&self.me] } - pub fn leader_data(&self) -> Option<&ReplicatedData> { - self.table.get(&(self.table[&self.me].current_leader_id)) + pub fn leader_data(&self) -> Option<&NodeInfo> { + self.table.get(&(self.table[&self.me].leader_id)) } pub fn set_leader(&mut self, key: PublicKey) -> () { @@ -279,9 +287,9 @@ impl Crdt { "{:x}: LEADER_UPDATE TO {:x} from {:x}", me.debug_id(), make_debug_id(&key), - make_debug_id(&me.current_leader_id), + make_debug_id(&me.leader_id), ); - me.current_leader_id = key; + me.leader_id = key; me.version += 1; self.insert(&me); } @@ -321,7 +329,7 @@ impl Crdt { } else { let mut data = self.table[pubkey].clone(); data.version = v.version; - data.last_verified_id = last_id; + data.ledger_state.last_id = last_id; debug!( "{:x}: INSERTING VOTE! for {:x}", self.debug_id(), @@ -349,7 +357,7 @@ impl Crdt { self.insert_vote(&v.0, &v.1, v.2); } } - pub fn insert(&mut self, v: &ReplicatedData) { + pub fn insert(&mut self, v: &NodeInfo) { // TODO check that last_verified types are always increasing //update the peer table if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { @@ -454,7 +462,7 @@ impl Crdt { } pub fn index_blobs( - me: &ReplicatedData, + me: &NodeInfo, blobs: &Vec, receive_index: &mut u64, ) -> Result<()> { @@ -472,12 +480,12 @@ impl Crdt { } /// compute broadcast table /// # Remarks - pub fn compute_broadcast_table(&self) -> Vec { + pub fn compute_broadcast_table(&self) -> Vec { let live: Vec<_> = self.alive.iter().collect(); //thread_rng().shuffle(&mut live); let daddr = "0.0.0.0:0".parse().unwrap(); let me = &self.table[&self.me]; - let cloned_table: Vec = live.iter() + let cloned_table: Vec = live.iter() .map(|x| &self.table[x.0]) .filter(|v| { if me.id == v.id { @@ -509,8 +517,8 @@ impl Crdt { /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( - me: &ReplicatedData, - broadcast_table: &Vec, + me: &NodeInfo, + broadcast_table: &Vec, window: &Window, s: &UdpSocket, transmit_index: &mut u64, @@ -540,7 +548,7 @@ impl Crdt { .into_iter() .map(|(b, v)| { // only leader should be broadcasting - assert!(me.current_leader_id != v.id); + assert!(me.leader_id != v.id); let bl = b.unwrap(); let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter @@ -580,7 +588,7 @@ impl Crdt { /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { - let (me, table): (ReplicatedData, Vec) = { + let (me, table): (NodeInfo, Vec) = { // copy to avoid locking during IO let s = obj.read().expect("'obj' read lock in pub fn retransmit"); (s.table[&s.me].clone(), s.table.values().cloned().collect()) @@ -596,7 +604,7 @@ impl Crdt { .filter(|v| { if me.id == v.id { false - } else if me.current_leader_id == v.id { + } else if me.leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false } else if v.contact_info.tvu == daddr { @@ -646,7 +654,7 @@ impl Crdt { 1.0 } - fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { + fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); let data = self.table .values() @@ -715,8 +723,8 @@ impl Crdt { let mut me = self.my_data().clone(); let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone(); me.version += 1; - me.last_verified_id = last_id; - me.last_verified_height = height; + me.ledger_state.last_id = last_id; + me.ledger_state.entry_height = height; let vote = Vote { version: me.version, contact_info_version: me.contact_info.version, @@ -752,11 +760,11 @@ impl Crdt { fn top_leader(&self) -> Option { let mut table = HashMap::new(); let def = PublicKey::default(); - let cur = self.table.values().filter(|x| x.current_leader_id != def); + let cur = self.table.values().filter(|x| x.leader_id != def); for v in cur { - let cnt = table.entry(&v.current_leader_id).or_insert(0); + let cnt = table.entry(&v.leader_id).or_insert(0); *cnt += 1; - trace!("leader {:x} {}", make_debug_id(&v.current_leader_id), *cnt); + trace!("leader {:x} {}", make_debug_id(&v.leader_id), *cnt); } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let my_id = self.debug_id(); @@ -776,7 +784,7 @@ impl Crdt { /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet fn update_leader(&mut self) { if let Some(leader_id) = self.top_leader() { - if self.my_data().current_leader_id != leader_id { + if self.my_data().leader_id != leader_id { if self.table.get(&leader_id).is_some() { self.set_leader(leader_id); } @@ -793,7 +801,7 @@ impl Crdt { &mut self, from: PublicKey, update_index: u64, - data: &[ReplicatedData], + data: &[NodeInfo], external_liveness: &[(PublicKey, u64)], ) { trace!("got updates {}", data.len()); @@ -857,8 +865,8 @@ impl Crdt { } fn run_window_request( window: &Window, - me: &ReplicatedData, - from: &ReplicatedData, + me: &NodeInfo, + from: &NodeInfo, ix: u64, blob_recycler: &BlobRecycler, ) -> Option { @@ -877,7 +885,7 @@ impl Crdt { // Allow retransmission of this response if the node // is the leader and the number of repair requests equals // a power of two - if me.current_leader_id == me.id + if me.leader_id == me.id && (num_retransmits == 0 || num_retransmits.is_power_of_two()) { sender_id = me.id @@ -1079,7 +1087,7 @@ pub struct Sockets { } pub struct TestNode { - pub data: ReplicatedData, + pub data: NodeInfo, pub sockets: Sockets, } @@ -1098,7 +1106,7 @@ impl TestNode { let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data = ReplicatedData::new( + let data = NodeInfo::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), @@ -1121,7 +1129,7 @@ impl TestNode { }, } } - pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { + pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { let mut local_gossip_addr = bind_addr.clone(); local_gossip_addr.set_port(data.contact_info.ncp.port()); @@ -1171,7 +1179,7 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_PURGE_MILLIS, + parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use hash::Hash; @@ -1198,7 +1206,7 @@ mod tests { } #[test] fn insert_test() { - let mut d = ReplicatedData::new( + let mut d = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1218,11 +1226,11 @@ mod tests { } #[test] fn test_new_vote() { - let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); assert_eq!(crdt.table[&d.id].version, 0); - let leader = ReplicatedData::new_leader(&"127.0.0.2:1235".parse().unwrap()); + let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); assert_ne!(d.id, leader.id); assert_matches!( crdt.new_vote(0, Hash::default()).err(), @@ -1245,7 +1253,7 @@ mod tests { #[test] fn test_insert_vote() { - let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); assert_eq!(crdt.table[&d.id].version, 0); @@ -1277,10 +1285,10 @@ mod tests { fn test_insert_vote_leader_liveness() { logger::setup(); // TODO: remove this test once leaders vote - let d = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); - let leader = ReplicatedData::new_leader(&"127.0.0.2:1235".parse().unwrap()); + let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); assert_ne!(d.id, leader.id); crdt.insert(&leader); crdt.set_leader(leader.id); @@ -1300,7 +1308,7 @@ mod tests { assert!(updated > live); } - fn sorted(ls: &Vec) -> Vec { + fn sorted(ls: &Vec) -> Vec { let mut copy: Vec<_> = ls.iter().cloned().collect(); copy.sort_by(|x, y| x.id.cmp(&y.id)); copy @@ -1308,7 +1316,7 @@ mod tests { #[test] fn replicated_data_new_leader_with_pubkey() { let kp = KeyPair::new(); - let d1 = ReplicatedData::new_leader_with_pubkey( + let d1 = NodeInfo::new_leader_with_pubkey( kp.pubkey().clone(), &"127.0.0.1:1234".parse().unwrap(), ); @@ -1324,7 +1332,7 @@ mod tests { } #[test] fn update_test() { - let d1 = ReplicatedData::new( + let d1 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1332,7 +1340,7 @@ mod tests { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - let d2 = ReplicatedData::new( + let d2 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1340,7 +1348,7 @@ mod tests { "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); - let d3 = ReplicatedData::new( + let d3 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1376,14 +1384,14 @@ mod tests { sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&crdt.table.values().map(|x| x.clone()).collect()) ); - let d4 = ReplicatedData::new_entry_point("127.0.0.4:1234".parse().unwrap()); + let d4 = NodeInfo::new_entry_point("127.0.0.4:1234".parse().unwrap()); crdt.insert(&d4); let (_key, _ix, ups) = crdt.get_updates_since(0); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); } #[test] fn window_index_request() { - let me = ReplicatedData::new( + let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1394,7 +1402,7 @@ mod tests { let mut crdt = Crdt::new(me.clone()); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); - let nxt = ReplicatedData::new( + let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1405,7 +1413,7 @@ mod tests { crdt.insert(&nxt); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); - let nxt = ReplicatedData::new( + let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1418,7 +1426,7 @@ mod tests { assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); - let nxt = ReplicatedData::new( + let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.3:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1445,7 +1453,7 @@ mod tests { /// test that gossip requests are eventually generated for all nodes #[test] fn gossip_request() { - let me = ReplicatedData::new( + let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1456,7 +1464,7 @@ mod tests { let mut crdt = Crdt::new(me.clone()); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); - let nxt1 = ReplicatedData::new( + let nxt1 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1470,7 +1478,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt1.contact_info.ncp); - let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap()); + let nxt2 = NodeInfo::new_entry_point("127.0.0.3:1234".parse().unwrap()); crdt.insert(&nxt2); // check that the service works // and that it eventually produces a request for both nodes @@ -1511,9 +1519,9 @@ mod tests { #[test] fn purge_test() { logger::setup(); - let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()); - let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt.id); crdt.set_leader(me.id); crdt.insert(&nxt); @@ -1532,7 +1540,7 @@ mod tests { let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); - let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); + let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt2.id); assert_ne!(nxt.id, nxt2.id); crdt.insert(&nxt2); @@ -1555,7 +1563,7 @@ mod tests { #[test] fn run_window_request() { let window = default_window(); - let me = ReplicatedData::new( + let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), @@ -1584,10 +1592,10 @@ mod tests { fn run_window_request_with_backoff() { let window = default_window(); - let mut me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - me.current_leader_id = me.id; + let mut me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + me.leader_id = me.id; - let mock_peer = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let mock_peer = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let recycler = BlobRecycler::default(); @@ -1620,25 +1628,25 @@ mod tests { #[test] fn test_update_leader() { logger::setup(); - let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let leader0 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); - let leader1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()); assert_eq!(crdt.top_leader(), None); crdt.set_leader(leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { - let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap()); + let mut dum = NodeInfo::new_entry_point("127.0.0.1:1234".parse().unwrap()); dum.id = KeyPair::new().pubkey(); - dum.current_leader_id = leader1.id; + dum.leader_id = leader1.id; crdt.insert(&dum); } assert_eq!(crdt.top_leader().unwrap(), leader1.id); crdt.update_leader(); - assert_eq!(crdt.my_data().current_leader_id, leader0.id); + assert_eq!(crdt.my_data().leader_id, leader0.id); crdt.insert(&leader1); crdt.update_leader(); - assert_eq!(crdt.my_data().current_leader_id, leader1.id); + assert_eq!(crdt.my_data().leader_id, leader1.id); } } diff --git a/src/erasure.rs b/src/erasure.rs index c560116cf2a5ff..449361edc71914 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -523,7 +523,7 @@ mod test { erasure::add_coding_blobs(blob_recycler, &mut blobs, offset as u64); let blobs_len = blobs.len(); - let d = crdt::ReplicatedData::new( + let d = crdt::NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), diff --git a/src/fullnode.rs b/src/fullnode.rs index 800bcf942d78fb..3df3fd75c7201b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,7 +1,7 @@ //! The `fullnode` module hosts all the fullnode microservices. use bank::Bank; -use crdt::{Crdt, ReplicatedData, TestNode}; +use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; use entry_writer; use ledger::Block; @@ -44,7 +44,7 @@ pub enum OutFile { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] /// Fullnode configuration to be stored in file pub struct Config { - pub node_info: ReplicatedData, + pub node_info: NodeInfo, pkcs8: Vec, } @@ -58,7 +58,7 @@ impl Config { let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in fullnode::Config new"); let pubkey = keypair.pubkey(); - let node_info = ReplicatedData::new_leader_with_pubkey(pubkey, bind_addr); + let node_info = NodeInfo::new_leader_with_pubkey(pubkey, bind_addr); Config { node_info, pkcs8 } } pub fn keypair(&self) -> KeyPair { @@ -104,7 +104,7 @@ impl FullNode { if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); - let network_entry_point = ReplicatedData::new_entry_point(testnet_addr); + let network_entry_point = NodeInfo::new_entry_point(testnet_addr); let keypair = keypair_for_validator.expect("validator requires keypair"); let server = FullNode::new_validator( keypair, @@ -121,7 +121,7 @@ impl FullNode { ); server } else { - node.data.current_leader_id = node.data.id.clone(); + node.data.leader_id = node.data.id.clone(); let outfile_for_leader: Box = match outfile_for_leader { Some(OutFile::Path(file)) => Box::new( OpenOptions::new() @@ -285,7 +285,7 @@ impl FullNode { entry_height: u64, ledger_tail: Option>, node: TestNode, - entry_point: ReplicatedData, + entry_point: NodeInfo, exit: Arc, ) -> Self { let bank = Arc::new(bank); diff --git a/src/streamer.rs b/src/streamer.rs index c7634fa1bbdef0..d2cbda902b311e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,7 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! use counter::Counter; -use crdt::{Crdt, CrdtError, ReplicatedData}; +use crdt::{Crdt, CrdtError, NodeInfo}; #[cfg(feature = "erasure")] use erasure; use packet::{ @@ -262,7 +262,7 @@ fn recv_window( ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let maybe_leader: Option = crdt.read() + let maybe_leader: Option = crdt.read() .expect("'crdt' read lock in fn recv_window") .leader_data() .cloned(); @@ -574,8 +574,8 @@ pub fn window( } fn broadcast( - me: &ReplicatedData, - broadcast_table: &Vec, + me: &NodeInfo, + broadcast_table: &Vec, window: &Window, recycler: &BlobRecycler, r: &BlobReceiver, diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 5da5fbe4c0822e..77ce210599568d 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -31,8 +31,8 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { } /// Test that the network converges. -/// Run until every node in the network has a full ReplicatedData set. -/// Check that nodes stop sending updates after all the ReplicatedData has been shared. +/// Run until every node in the network has a full NodeInfo set. +/// Check that nodes stop sending updates after all the NodeInfo has been shared. /// tests that actually use this function are below fn run_gossip_topo(topo: F) where diff --git a/tests/multinode.rs b/tests/multinode.rs index 5059a56f07663e..b0979eb14f324e 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -5,7 +5,7 @@ extern crate serde_json; extern crate solana; use solana::crdt::TestNode; -use solana::crdt::{Crdt, ReplicatedData}; +use solana::crdt::{Crdt, NodeInfo}; use solana::entry_writer::EntryWriter; use solana::fullnode::{FullNode, InFile, OutFile}; use solana::logger; @@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; -fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { +fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network let exit = Arc::new(AtomicBool::new(false)); let mut spy = TestNode::new(); @@ -46,7 +46,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { let mut rv = vec![]; for _ in 0..30 { let num = spy_ref.read().unwrap().convergence(); - let mut v: Vec = spy_ref + let mut v: Vec = spy_ref .read() .unwrap() .table @@ -284,7 +284,7 @@ fn test_boot_validator_from_file() { std::fs::remove_file(ledger_path).unwrap(); } -fn create_leader(ledger_path: &str) -> (ReplicatedData, FullNode) { +fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) { let leader = TestNode::new(); let leader_data = leader.data.clone(); let leader_fullnode = FullNode::new( @@ -399,7 +399,7 @@ fn test_multi_node_dynamic_network() { send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); - let validators: Vec<(ReplicatedData, FullNode)> = (0..N) + let validators: Vec<(NodeInfo, FullNode)> = (0..N) .into_iter() .map(|n| { let keypair = KeyPair::new(); @@ -475,7 +475,7 @@ fn test_multi_node_dynamic_network() { std::fs::remove_file(ledger_path).unwrap(); } -fn mk_client(leader: &ReplicatedData) -> ThinClient { +fn mk_client(leader: &NodeInfo) -> ThinClient { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::new(1, 0))) @@ -514,7 +514,7 @@ fn retry_get_balance( } fn send_tx_and_retry_get_balance( - leader: &ReplicatedData, + leader: &NodeInfo, alice: &Mint, bob_pubkey: &PublicKey, expected: Option,