diff --git a/src/accounts.rs b/src/accounts.rs index 9537019305b36f..71f0707db88743 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -337,7 +337,7 @@ impl Accounts { } /// accounts starts with an empty data structure for every fork /// self is trunk, merge the fork into self - pub fn merge_into_trunk(&mut self, other: Self) { + pub fn merge_into_trunk(&self, other: Self) { assert!(other.account_locks.lock().unwrap().is_empty()); let db = other.accounts_db.into_inner().unwrap(); let mut mydb = self.accounts_db.write().unwrap(); diff --git a/src/bank.rs b/src/bank.rs index 856dab7a748ba9..3e03ac2ee73e83 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -313,16 +313,6 @@ impl Bank { self.forks.read().unwrap().trunk_fork() } - #[must_use] - fn process_and_record_transactions_on_fork( - &self, - fork: u64, - txs: &[Transaction], - poh: Option<&PohRecorder>, - ) -> Result<(Vec>)> { - let state = self.forks.read().unwrap().bank_state(fork); - state.process_and_record_transactions(Some(&self.subscriptions), txs, poh) - } #[must_use] pub fn process_and_record_transactions( &self, diff --git a/src/bank_state.rs b/src/bank_state.rs index f3fe05a036465b..d7e95025acf587 100644 --- a/src/bank_state.rs +++ b/src/bank_state.rs @@ -31,12 +31,12 @@ pub struct BankCheckpoint { /// status cache status_cache: RwLock, finalized: AtomicBool, - pub fork_id: u64, + fork_id: AtomicUsize, } impl std::fmt::Debug for BankCheckpoint { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "BankCheckpoint {{ fork_id: {} }}", self.fork_id) + write!(f, "BankCheckpoint {{ fork_id: {} }}", self.fork_id()) } } @@ -48,7 +48,7 @@ impl BankCheckpoint { entry_q: RwLock::new(EntryQueue::default()), status_cache: RwLock::new(StatusCache::new(last_id)), finalized: AtomicBool::new(false), - fork_id, + fork_id: AtomicUsize::new(fork_id as usize), } } /// Create an Bank using a deposit. @@ -189,6 +189,9 @@ impl BankCheckpoint { self.accounts.hash_internal_state() } + pub fn fork_id(&self) -> u64 { + self.fork_id.load(Ordering::Relaxed) as u64 + } /// create a new fork for the bank state pub fn fork(&self, fork_id: u64, last_id: &Hash) -> Self { Self { @@ -196,12 +199,12 @@ impl BankCheckpoint { entry_q: RwLock::new(self.entry_q.read().unwrap().fork()), status_cache: RwLock::new(StatusCache::new(last_id)), finalized: AtomicBool::new(false), - fork_id, + fork_id: AtomicUsize::new(fork_id as usize), } } /// consume the checkpoint into the trunk state /// self becomes the new trunk and its fork_id is updated - pub fn merge_into_trunk(&mut self, other: Self) { + pub fn merge_into_trunk(&self, other: Self) { let (accounts, entry_q, status_cache, fork_id) = { ( other.accounts, @@ -219,7 +222,8 @@ impl BankCheckpoint { .write() .unwrap() .merge_into_trunk(status_cache.into_inner().unwrap()); - self.fork_id = fork_id; + self.fork_id + .store(fork_id.load(Ordering::Relaxed), Ordering::Relaxed); } } @@ -265,7 +269,7 @@ impl BankState { mut results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - for c in self.checkpoints { + for c in &self.checkpoints { let status_cache = c.status_cache.read().unwrap(); for (i, tx) in txs.iter().enumerate() { if results[i] == Ok(()) && status_cache.has_signature(&tx.signatures[0]) { @@ -489,7 +493,7 @@ impl BankState { //TODO: this sanity check needs to be fixed once forks contain the PoH ticks that //connect them to the previous fork. We need a way to identify the fork from the //entry itself, or have that information passed through. - assert_eq!(e.tick_height / BLOCK_TICK_COUNT, head.fork_id); + assert_eq!(e.tick_height / BLOCK_TICK_COUNT, head.fork_id()); let results = self.execute_and_commit_transactions( None, &e.transactions, @@ -543,8 +547,8 @@ impl BankState { let finish = entries.last().unwrap().tick_height; // Fork sanity check //TODO: same as the other fork sanity check - assert_eq!(finish / BLOCK_TICK_COUNT, head.fork_id); - if (finish + 1) / BLOCK_TICK_COUNT != head.fork_id { + assert_eq!(finish / BLOCK_TICK_COUNT, head.fork_id()); + if (finish + 1) / BLOCK_TICK_COUNT != head.fork_id() { head.finalize(); } } @@ -580,7 +584,7 @@ impl BankState { Ok(()) } pub fn get_signature_status(&self, signature: &Signature) -> Option> { - for c in self.checkpoints { + for c in &self.checkpoints { if let Some(status) = c.get_signature_status(signature) { return Some(status); } @@ -588,7 +592,7 @@ impl BankState { None } pub fn has_signature(&self, signature: &Signature) -> bool { - for c in self.checkpoints { + for c in &self.checkpoints { if c.has_signature(signature) { return true; } @@ -623,7 +627,7 @@ mod test { } fn new_state(mint: &Keypair, tokens: u64, last_id: &Hash) -> BankState { - let accounts = [(mint.pubkey(), Account::new(3, 0, Pubkey::default()))]; + let accounts = [(mint.pubkey(), Account::new(tokens, 0, Pubkey::default()))]; let bank = Arc::new(BankCheckpoint::new_from_accounts(0, &accounts, &last_id)); BankState { checkpoints: vec![bank], @@ -636,7 +640,6 @@ mod test { let mint = Keypair::new(); let alice = Keypair::new(); let bob = Keypair::new(); - let accounts = [(mint.pubkey(), Account::new(3, 0, Pubkey::default()))]; let bank = new_state(&mint, 3, &last_id); let tx1 = Transaction::system_new(&mint, alice.pubkey(), 1, last_id); diff --git a/src/checkpoints.rs b/src/checkpoints.rs index da6dabb6d3e66b..7d04fa4b75a763 100644 --- a/src/checkpoints.rs +++ b/src/checkpoints.rs @@ -15,7 +15,7 @@ pub struct Checkpoints { pub latest: HashSet, } -impl Checkpoints { +impl Checkpoints { pub fn is_empty(&self) -> bool { self.checkpoints.is_empty() } @@ -68,11 +68,12 @@ impl Checkpoints { break; } let trunk = queue.pop_front().unwrap(); - let new_data = self.load(trunk).expect("load from inverse").clone(); - new.store(trunk, new_data.0, new_data.1); - let children = inverse.get(&trunk).unwrap_or(&HashSet::new()); - let mut next = children.into_iter().map(|x| *x).collect(); - queue.append(&mut next); + let (data, prev) = self.load(trunk).expect("load from inverse").clone(); + new.store(trunk, data.clone(), prev); + if let Some(children) = inverse.get(&trunk) { + let mut next = children.into_iter().map(|x| *x).collect(); + queue.append(&mut next); + } } new } diff --git a/src/forks.rs b/src/forks.rs index c5ea29acbb92ea..5d9f7d06032d4b 100644 --- a/src/forks.rs +++ b/src/forks.rs @@ -61,11 +61,11 @@ impl Forks { let len = states.len(); let old_trunk = states[len - 1].clone(); let merged = states[len - 2].clone(); - (old_trunk.1, merged.1, merged.0) + (old_trunk.1.clone(), merged.1.clone(), merged.0) }; let idag = self.checkpoints.invert(); let new_checkpoints = self.checkpoints.prune(new_trunk_id, &idag); - let old_id = old.fork_id; + let old_id = old.fork_id(); self.checkpoints = new_checkpoints; self.trunk_fork = new_trunk_id; // old should have been pruned @@ -80,17 +80,17 @@ impl Forks { // merge all the new changes into the old instance under the new id // this should consume `new` // new should have no other references - let new: BankCheckpoint = Arc::try_unwrap(*new).unwrap(); + let new: BankCheckpoint = Arc::try_unwrap(new).unwrap(); old.merge_into_trunk(new); - assert_eq!(old.fork_id, new_trunk_id); + assert_eq!(old.fork_id(), new_trunk_id); Ok(new_trunk_id) } /// Initialize the first trunk pub fn init_trunk_fork(&mut self, checkpoint: BankCheckpoint) { assert!(self.checkpoints.is_empty()); - self.live_fork = checkpoint.fork_id; - self.trunk_fork = checkpoint.fork_id; + self.live_fork = checkpoint.fork_id(); + self.trunk_fork = checkpoint.fork_id(); //TODO: using u64::MAX as the impossible checkpoint //this should be a None instead self.checkpoints diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 3154036d9fe818..0c143c25a62ecd 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -74,7 +74,7 @@ impl ReplayStage { if blocks.last().unwrap().1 != current { blocks.push((vec![], current, prev)); } - blocks.last().unwrap().0.push(e); + blocks.last_mut().unwrap().0.push(e); } blocks } @@ -87,8 +87,8 @@ impl ReplayStage { _keypair: &Arc, _vote_account_keypair: &Arc, _vote_blob_sender: Option<&BlobSender>, - ledger_entry_sender: &EntrySender, - entry_height: &mut u64, + _ledger_entry_sender: &EntrySender, + _entry_height: &mut u64, last_entry_id: &mut Hash, ) -> Result<()> { let timer = Duration::new(1, 0); @@ -139,8 +139,8 @@ impl ReplayStage { let now = Instant::now(); // TODO: make this work // let options = vec![]; - // for c in bank.checkpoints.live { - // let state = bank.collect(c); + // for c in bank.live_forks() { + // let state = bank.bank_state(c); // if is_valid_vote(state.entry_height()) { // options.push(state); // }