Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
rebase builds
Browse files Browse the repository at this point in the history
Implement an mt safe bank and batch programs into single atomic transactions.

fix entry processing

fmt

once!

cleanup and remove clone

recv poh until response arrives

handle recverrors

fix benches

get rid of answer, use 10 threads because math is simpler

coalesce the extra thread switch

optimizing

cleanup

register faster

cleanup

fixed test

sdk serialize

fixed benches

rebase

program counter

fmt
  • Loading branch information
aeyakovenko committed Sep 26, 2018
1 parent 07ddca8 commit 3be9e66
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 31 deletions.
28 changes: 12 additions & 16 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::thread::sleep;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
Expand All @@ -30,7 +30,7 @@ pub struct BankingStage {
/// Handle to the stage's thread.
thread_hdls: Vec<JoinHandle<()>>,
}

pub enum Config {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry.
Tick(usize),
Expand Down Expand Up @@ -64,18 +64,18 @@ impl BankingStage {
// Once an entry has been recorded, its last_id is registered with the bank.
let tick_producer = Builder::new()
.name("solana-banking-stage-tick_producer".to_string())
.spawn(move ||
if let Err(e) = Self::tick_producer(
tick_poh,
config,
) {
.spawn(move || {
if let Err(e) = Self::tick_producer(tick_poh, config) {
match e {
Error::RecvError(_) => (),
Error::SendError => (),
_ => error!("solana-banking-stage-tick_producer unexpected error {:?}", e),
_ => error!(
"solana-banking-stage-tick_producer unexpected error {:?}",
e
),
}
}
).unwrap();
}).unwrap();

// Many banks that process transactions in parallel.
let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
Expand Down Expand Up @@ -106,7 +106,6 @@ impl BankingStage {
(BankingStage { thread_hdls }, entry_receiver)
}


/// Convert the transactions from a blob of binary data to a vector of transactions and
/// an unused `SocketAddr` that could be used to send a response.
fn deserialize_transactions(p: &Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
Expand All @@ -119,11 +118,8 @@ impl BankingStage {
}).collect()
}

fn tick_producer(
poh: PohService,
config: Config,
) -> Result<()> {
loop {
fn tick_producer(poh: PohService, config: Config) -> Result<()> {
loop {
match config {
Config::Tick(num) => {
for _ in 0..num {
Expand All @@ -135,7 +131,7 @@ impl BankingStage {
}
}
poh.tick()?;
}
}
}

fn process_transactions(
Expand Down
26 changes: 11 additions & 15 deletions src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
//!
//! The resulting stream of Hashes represents ordered events in time.
//!
use hash::Hash;
use poh::{Poh};
use entry::Entry;
use transaction::Transaction;
use bank::Bank;
use entry::Entry;
use hash::Hash;
use poh::Poh;
use result::Result;
use std::sync::mpsc::{Sender};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use transaction::Transaction;

#[derive(Clone)]
pub struct PohService {
Expand All @@ -31,11 +31,7 @@ impl PohService {
/// `tick_duration`.
pub fn new(bank: Arc<Bank>, sender: Sender<Vec<Entry>>) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(bank.last_id())));
PohService {
poh,
bank,
sender,
}
PohService { poh, bank, sender }
}

pub fn hash(&self) {
Expand All @@ -44,7 +40,7 @@ impl PohService {
let mut poh = self.poh.lock().unwrap();
poh.hash()
}

pub fn tick(&self) -> Result<()> {
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same
Expand All @@ -55,8 +51,8 @@ impl PohService {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
};
self.sender.send(vec![entry])?;
};
self.sender.send(vec![entry])?;
Ok(())
}

Expand All @@ -70,8 +66,8 @@ impl PohService {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: txs,
};
self.sender.send(vec![entry])?;
};
self.sender.send(vec![entry])?;
Ok(())
}
}
Expand Down

0 comments on commit 3be9e66

Please sign in to comment.