Skip to content

Commit

Permalink
StorageStage now sends transactions at the local TPU
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Mar 8, 2019
1 parent 7bd0929 commit 12f3fd7
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions core/src/storage_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,10 @@ impl StorageStage {
.spawn(move || loop {
match tx_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut tx) => {
if Self::send_tx(&cluster_info0, &mut tx, &exit1, &keypair1, None).is_ok() {
debug!("sent tx: {:?}", tx);
if Self::send_transaction(&cluster_info0, &mut tx, &exit1, &keypair1, None)
.is_ok()
{
debug!("sent transaction: {:?}", tx);
}
}
Err(e) => match e {
Expand All @@ -218,58 +220,57 @@ impl StorageStage {
}
}

fn send_tx(
fn send_transaction(
cluster_info: &Arc<RwLock<ClusterInfo>>,
tx: &mut Transaction,
transaction: &mut Transaction,
exit: &Arc<AtomicBool>,
keypair: &Arc<Keypair>,
account_to_create: Option<Pubkey>,
) -> io::Result<()> {
if let Some(leader_info) = cluster_info.read().unwrap().leader_data() {
let mut client = mk_client_with_timeout(leader_info, Duration::from_secs(5));
let node_info = cluster_info.read().unwrap().my_data();
let mut client = mk_client_with_timeout(&node_info, Duration::from_secs(5));

if let Some(account) = account_to_create {
if client.get_account_userdata(&account).is_ok() {
return Ok(());
}
if let Some(account) = account_to_create {
if client.get_account_userdata(&account).is_ok() {
return Ok(());
}
}

let mut blockhash = None;
for _ in 0..10 {
if let Some(new_blockhash) = client.try_get_recent_blockhash(1) {
blockhash = Some(new_blockhash);
break;
}

if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
let mut blockhash = None;
for _ in 0..10 {
if let Some(new_blockhash) = client.try_get_recent_blockhash(1) {
blockhash = Some(new_blockhash);
break;
}

if let Some(blockhash) = blockhash {
tx.sign(&[keypair.as_ref()], blockhash);
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
}

if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
if let Some(blockhash) = blockhash {
transaction.sign(&[keypair.as_ref()], blockhash);

if let Ok(signature) = client.transfer_signed(&tx) {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}

if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
if let Ok(signature) = client.transfer_signed(&transaction) {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}

sleep(Duration::from_millis(200));
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}

sleep(Duration::from_millis(200));
}
}
}

Err(io::Error::new(io::ErrorKind::Other, "leader not found"))
Err(io::Error::new(io::ErrorKind::Other, "other failure"))
}

pub fn process_entry_crossing(
Expand Down

0 comments on commit 12f3fd7

Please sign in to comment.