Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Jan 19, 2019
1 parent 8ee8f1a commit 91b2e55
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl Accounts {
}
/// accounts starts with an empty data structure for every fork
/// self is trunk, merge the fork into self
pub fn merge_into_trunk(&mut self, other: Self) {
pub fn merge_into_trunk(&self, other: Self) {
assert!(other.account_locks.lock().unwrap().is_empty());
let db = other.accounts_db.into_inner().unwrap();
let mut mydb = self.accounts_db.write().unwrap();
Expand Down
10 changes: 0 additions & 10 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,6 @@ impl Bank {
self.forks.read().unwrap().trunk_fork()
}

#[must_use]
fn process_and_record_transactions_on_fork(
&self,
fork: u64,
txs: &[Transaction],
poh: Option<&PohRecorder>,
) -> Result<(Vec<Result<()>>)> {
let state = self.forks.read().unwrap().bank_state(fork);
state.process_and_record_transactions(Some(&self.subscriptions), txs, poh)
}
#[must_use]
pub fn process_and_record_transactions(
&self,
Expand Down
31 changes: 17 additions & 14 deletions src/bank_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ pub struct BankCheckpoint {
/// status cache
status_cache: RwLock<StatusCache>,
finalized: AtomicBool,
pub fork_id: u64,
fork_id: AtomicUsize,
}

impl std::fmt::Debug for BankCheckpoint {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "BankCheckpoint {{ fork_id: {} }}", self.fork_id)
write!(f, "BankCheckpoint {{ fork_id: {} }}", self.fork_id())
}
}

Expand All @@ -48,7 +48,7 @@ impl BankCheckpoint {
entry_q: RwLock::new(EntryQueue::default()),
status_cache: RwLock::new(StatusCache::new(last_id)),
finalized: AtomicBool::new(false),
fork_id,
fork_id: AtomicUsize::new(fork_id as usize),
}
}
/// Create an Bank using a deposit.
Expand Down Expand Up @@ -189,19 +189,22 @@ impl BankCheckpoint {
self.accounts.hash_internal_state()
}

pub fn fork_id(&self) -> u64 {
self.fork_id.load(Ordering::Relaxed) as u64
}
/// create a new fork for the bank state
pub fn fork(&self, fork_id: u64, last_id: &Hash) -> Self {
Self {
accounts: Accounts::default(),
entry_q: RwLock::new(self.entry_q.read().unwrap().fork()),
status_cache: RwLock::new(StatusCache::new(last_id)),
finalized: AtomicBool::new(false),
fork_id,
fork_id: AtomicUsize::new(fork_id as usize),
}
}
/// consume the checkpoint into the trunk state
/// self becomes the new trunk and its fork_id is updated
pub fn merge_into_trunk(&mut self, other: Self) {
pub fn merge_into_trunk(&self, other: Self) {
let (accounts, entry_q, status_cache, fork_id) = {
(
other.accounts,
Expand All @@ -219,7 +222,8 @@ impl BankCheckpoint {
.write()
.unwrap()
.merge_into_trunk(status_cache.into_inner().unwrap());
self.fork_id = fork_id;
self.fork_id
.store(fork_id.load(Ordering::Relaxed), Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -265,7 +269,7 @@ impl BankState {
mut results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
for c in self.checkpoints {
for c in &self.checkpoints {
let status_cache = c.status_cache.read().unwrap();
for (i, tx) in txs.iter().enumerate() {
if results[i] == Ok(()) && status_cache.has_signature(&tx.signatures[0]) {
Expand Down Expand Up @@ -489,7 +493,7 @@ impl BankState {
//TODO: this sanity check needs to be fixed once forks contain the PoH ticks that
//connect them to the previous fork. We need a way to identify the fork from the
//entry itself, or have that information passed through.
assert_eq!(e.tick_height / BLOCK_TICK_COUNT, head.fork_id);
assert_eq!(e.tick_height / BLOCK_TICK_COUNT, head.fork_id());
let results = self.execute_and_commit_transactions(
None,
&e.transactions,
Expand Down Expand Up @@ -543,8 +547,8 @@ impl BankState {
let finish = entries.last().unwrap().tick_height;
// Fork sanity check
//TODO: same as the other fork sanity check
assert_eq!(finish / BLOCK_TICK_COUNT, head.fork_id);
if (finish + 1) / BLOCK_TICK_COUNT != head.fork_id {
assert_eq!(finish / BLOCK_TICK_COUNT, head.fork_id());
if (finish + 1) / BLOCK_TICK_COUNT != head.fork_id() {
head.finalize();
}
}
Expand Down Expand Up @@ -580,15 +584,15 @@ impl BankState {
Ok(())
}
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
for c in self.checkpoints {
for c in &self.checkpoints {
if let Some(status) = c.get_signature_status(signature) {
return Some(status);
}
}
None
}
pub fn has_signature(&self, signature: &Signature) -> bool {
for c in self.checkpoints {
for c in &self.checkpoints {
if c.has_signature(signature) {
return true;
}
Expand Down Expand Up @@ -623,7 +627,7 @@ mod test {
}

fn new_state(mint: &Keypair, tokens: u64, last_id: &Hash) -> BankState {
let accounts = [(mint.pubkey(), Account::new(3, 0, Pubkey::default()))];
let accounts = [(mint.pubkey(), Account::new(tokens, 0, Pubkey::default()))];
let bank = Arc::new(BankCheckpoint::new_from_accounts(0, &accounts, &last_id));
BankState {
checkpoints: vec![bank],
Expand All @@ -636,7 +640,6 @@ mod test {
let mint = Keypair::new();
let alice = Keypair::new();
let bob = Keypair::new();
let accounts = [(mint.pubkey(), Account::new(3, 0, Pubkey::default()))];
let bank = new_state(&mint, 3, &last_id);

let tx1 = Transaction::system_new(&mint, alice.pubkey(), 1, last_id);
Expand Down
13 changes: 7 additions & 6 deletions src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Checkpoints<T> {
pub latest: HashSet<u64>,
}

impl<T> Checkpoints<T> {
impl<T: Clone> Checkpoints<T> {
pub fn is_empty(&self) -> bool {
self.checkpoints.is_empty()
}
Expand Down Expand Up @@ -68,11 +68,12 @@ impl<T> Checkpoints<T> {
break;
}
let trunk = queue.pop_front().unwrap();
let new_data = self.load(trunk).expect("load from inverse").clone();
new.store(trunk, new_data.0, new_data.1);
let children = inverse.get(&trunk).unwrap_or(&HashSet::new());
let mut next = children.into_iter().map(|x| *x).collect();
queue.append(&mut next);
let (data, prev) = self.load(trunk).expect("load from inverse").clone();
new.store(trunk, data.clone(), prev);
if let Some(children) = inverse.get(&trunk) {
let mut next = children.into_iter().map(|x| *x).collect();
queue.append(&mut next);
}
}
new
}
Expand Down
12 changes: 6 additions & 6 deletions src/forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ impl Forks {
let len = states.len();
let old_trunk = states[len - 1].clone();
let merged = states[len - 2].clone();
(old_trunk.1, merged.1, merged.0)
(old_trunk.1.clone(), merged.1.clone(), merged.0)
};
let idag = self.checkpoints.invert();
let new_checkpoints = self.checkpoints.prune(new_trunk_id, &idag);
let old_id = old.fork_id;
let old_id = old.fork_id();
self.checkpoints = new_checkpoints;
self.trunk_fork = new_trunk_id;
// old should have been pruned
Expand All @@ -80,17 +80,17 @@ impl Forks {
// merge all the new changes into the old instance under the new id
// this should consume `new`
// new should have no other references
let new: BankCheckpoint = Arc::try_unwrap(*new).unwrap();
let new: BankCheckpoint = Arc::try_unwrap(new).unwrap();
old.merge_into_trunk(new);
assert_eq!(old.fork_id, new_trunk_id);
assert_eq!(old.fork_id(), new_trunk_id);
Ok(new_trunk_id)
}

/// Initialize the first trunk
pub fn init_trunk_fork(&mut self, checkpoint: BankCheckpoint) {
assert!(self.checkpoints.is_empty());
self.live_fork = checkpoint.fork_id;
self.trunk_fork = checkpoint.fork_id;
self.live_fork = checkpoint.fork_id();
self.trunk_fork = checkpoint.fork_id();
//TODO: using u64::MAX as the impossible checkpoint
//this should be a None instead
self.checkpoints
Expand Down
10 changes: 5 additions & 5 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ReplayStage {
if blocks.last().unwrap().1 != current {
blocks.push((vec![], current, prev));
}
blocks.last().unwrap().0.push(e);
blocks.last_mut().unwrap().0.push(e);
}
blocks
}
Expand All @@ -87,8 +87,8 @@ impl ReplayStage {
_keypair: &Arc<Keypair>,
_vote_account_keypair: &Arc<Keypair>,
_vote_blob_sender: Option<&BlobSender>,
ledger_entry_sender: &EntrySender,
entry_height: &mut u64,
_ledger_entry_sender: &EntrySender,
_entry_height: &mut u64,
last_entry_id: &mut Hash,
) -> Result<()> {
let timer = Duration::new(1, 0);
Expand Down Expand Up @@ -139,8 +139,8 @@ impl ReplayStage {
let now = Instant::now();
// TODO: make this work
// let options = vec![];
// for c in bank.checkpoints.live {
// let state = bank.collect(c);
// for c in bank.live_forks() {
// let state = bank.bank_state(c);
// if is_valid_vote(state.entry_height()) {
// options.push(state);
// }
Expand Down

0 comments on commit 91b2e55

Please sign in to comment.