Skip to content

Commit

Permalink
Merge pull request #217 from garious/add-historian-stage
Browse files Browse the repository at this point in the history
Add record_stage to pipeline
  • Loading branch information
garious authored May 14, 2018
2 parents a578c1a + 7736b9c commit a604dcb
Show file tree
Hide file tree
Showing 16 changed files with 400 additions and 447 deletions.
244 changes: 111 additions & 133 deletions src/accountant.rs → src/bank.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/bin/genesis-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate untrusted;

use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::accountant::MAX_ENTRY_IDS;
use solana::bank::MAX_ENTRY_IDS;
use solana::entry::{create_entry, next_entry};
use solana::event::Event;
use solana::mint::MintDemo;
Expand Down
19 changes: 9 additions & 10 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ extern crate solana;

use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
Expand All @@ -19,6 +18,7 @@ use std::net::UdpSocket;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
Expand Down Expand Up @@ -92,32 +92,31 @@ fn main() {
None
};

eprintln!("creating accountant...");
eprintln!("creating bank...");

let accountant = Accountant::new_from_deposit(&deposit.unwrap());
accountant.register_entry_id(&entry0.id);
accountant.register_entry_id(&entry1.id);
let bank = Bank::new_from_deposit(&deposit.unwrap());
bank.register_entry_id(&entry0.id);
bank.register_entry_id(&entry1.id);

eprintln!("processing entries...");

let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
let results = accountant.process_verified_events(entry.events);
let results = bank.process_verified_events(entry.events);
for result in results {
if let Err(e) = result {
eprintln!("failed to process event {:?}", e);
exit(1);
}
}
accountant.register_entry_id(&last_id);
bank.register_entry_id(&last_id);
}

eprintln!("creating networking stack...");

let event_processor = EventProcessor::new(accountant, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(event_processor);
let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! * layer 1 - As many nodes as we can fit
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Accountant needs to provide an interface for us to query the stake weight
//! Bank needs to provide an interface for us to query the stake weight
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
Expand Down
12 changes: 6 additions & 6 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The `entry_writer` module helps implement the TPU's write stage.
use accountant::Accountant;
use bank::Bank;
use entry::Entry;
use ledger;
use packet;
Expand All @@ -15,18 +15,18 @@ use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
accountant: &'a Accountant,
bank: &'a Bank,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: &'a Accountant) -> Self {
EntryWriter { accountant }
/// Create a new Tpu that wraps the given Bank.
pub fn new(bank: &'a Bank) -> Self {
EntryWriter { bank }
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.accountant.register_entry_id(&entry.id);
self.bank.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
Expand Down
167 changes: 0 additions & 167 deletions src/event_processor.rs

This file was deleted.

5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant;
pub mod bank;
pub mod crdt;
pub mod ecdsa;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod event;
pub mod event_processor;
pub mod hash;
pub mod historian;
pub mod ledger;
pub mod logger;
pub mod mint;
pub mod packet;
pub mod plan;
pub mod record_stage;
pub mod recorder;
pub mod request;
pub mod request_processor;
Expand Down
Loading

0 comments on commit a604dcb

Please sign in to comment.