Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

[wip] replicator #130

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1f456c5
state replication
aeyakovenko Apr 18, 2018
efeebf5
wip
aeyakovenko Apr 18, 2018
d620ce5
update
aeyakovenko Apr 18, 2018
073e5ab
docs
aeyakovenko Apr 18, 2018
0c610c4
docs
aeyakovenko Apr 18, 2018
5e11078
Add Stephen
garious Apr 18, 2018
dea5ab2
Add erasure rust logic under feature flag
sakridge Apr 19, 2018
e949211
Merge pull request #131 from sakridge/erasure
garious Apr 19, 2018
3622533
wip
aeyakovenko Apr 19, 2018
d366a07
add gregs abstract as an intro
aeyakovenko Apr 19, 2018
903374a
Merge pull request #132 from aeyakovenko/readme
garious Apr 19, 2018
13a2f05
Remove out for immutable variable
kination Apr 19, 2018
64e2f1b
Merge pull request #133 from djKooks/rm-mut
garious Apr 19, 2018
b91f6bc
report parse errors to stderr
rleungx Apr 19, 2018
43e6741
Merge pull request #134 from rleungx/report-parse-errors-to-stderr
garious Apr 19, 2018
60015ae
report serde parse errors to stderr
rleungx Apr 19, 2018
c6048e2
Workaround linux hang
garious Apr 19, 2018
ca877e6
Merge pull request #136 from rleungx/report-errors-to-stderr
garious Apr 19, 2018
8181bc5
Add -h/--help options for client-demo and testnode
sakridge Apr 19, 2018
7390d6c
update
aeyakovenko Apr 19, 2018
89bf376
Merge pull request #138 from sakridge/help_options
garious Apr 19, 2018
10a0c47
Merge pull request #137 from garious/linux-hang
garious Apr 19, 2018
8cbb7d7
git clone instruction
Apr 20, 2018
39df21d
Merge pull request #142 from ansrivas/master
garious Apr 20, 2018
3da1fa4
improve the error messages
rleungx Apr 21, 2018
041de80
Merge pull request #144 from rleungx/improve-error-messages
garious Apr 21, 2018
3d7969d
initial crdt implementation
aeyakovenko Apr 21, 2018
55b8d0d
cleanup
aeyakovenko Apr 24, 2018
0b39c6f
Merge pull request #145 from aeyakovenko/crdt
aeyakovenko Apr 24, 2018
d69a86a
state replication
aeyakovenko Apr 18, 2018
c579f9b
wip
aeyakovenko Apr 18, 2018
3bd74cc
update
aeyakovenko Apr 18, 2018
97176dc
docs
aeyakovenko Apr 18, 2018
e493d70
docs
aeyakovenko Apr 18, 2018
37448de
wip
aeyakovenko Apr 19, 2018
9dfd18a
update
aeyakovenko Apr 19, 2018
92b8bb6
Fix some compilation issues
sakridge Apr 19, 2018
58d1ddd
Work on test_replicate to test replicate service
sakridge Apr 24, 2018
3789405
update
aeyakovenko Apr 24, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ name = "solana"
description = "High Performance Blockchain"
version = "0.4.0"
documentation = "https://docs.rs/solana"
homepage = "http://loomprotocol.com/"
homepage = "http://solana.io/"
repository = "https://github.com/solana-labs/solana"
authors = [
"Anatoly Yakovenko <[email protected]>",
"Greg Fitzgerald <[email protected]>",
"Anatoly Yakovenko <[email protected]>",
"Greg Fitzgerald <[email protected]>",
"Stephen Akridge <[email protected]>",
]
license = "Apache-2.0"

Expand Down Expand Up @@ -42,6 +43,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git
unstable = []
ipv6 = []
cuda = []
erasure = []

[dependencies]
rayon = "1.0.0"
Expand All @@ -60,4 +62,4 @@ matches = "^0.1.6"
byteorder = "^1.2.1"
libc = "^0.2.1"
getopts = "^0.2"

isatty = "0.1"
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2018 Anatoly Yakovenko <[email protected]> and Greg Fitzgerald <[email protected]>
Copyright 2018 Anatoly Yakovenko, Greg Fitzgerald and Stephen Akridge

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Solana: High Performance Blockchain
Solana&trade; is a new architecture for a high performance blockchain. It aims to support
over 700 thousand transactions per second on a gigabit network.

Introduction
===

It's possible for a centralized database to process 710,000 transactions per second on a standard gigabit network if the transactions are, on average, no more than 178 bytes. A centralized database can also replicate itself and maintain high availability without significantly compromising that transaction rate using the distributed system technique known as Optimistic Concurrency Control [H.T.Kung, J.T.Robinson (1981)]. At Solana, we're demonstrating that these same theoretical limits apply just as well to blockchain on an adversarial network. The key ingredient? Finding a way to share time when nodes can't trust one-another. Once nodes can trust time, suddenly ~40 years of distributed systems research becomes applicable to blockchain! Furthermore, and much to our surprise, it can implemented using a mechanism that has existed in Bitcoin since day one. The Bitcoin feature is called nLocktime and it can be used to postdate transactions using block height instead of a timestamp. As a Bitcoin client, you'd use block height instead of a timestamp if you don't trust the network. Block height turns out to be an instance of what's being called a Verifiable Delay Function in cryptography circles. It's a cryptographically secure way to say time has passed. In Solana, we use a far more granular verifiable delay function, a SHA 256 hash chain, to checkpoint the ledger and coordinate consensus. With it, we implement Optimistic Concurrency Control and are now well in route towards that theoretical limit of 710,000 transactions per second.

Running the demo
===

Expand All @@ -24,6 +29,13 @@ $ curl https://sh.rustup.rs -sSf | sh
$ source $HOME/.cargo/env
```

Now checkout the code from github:

```bash
$ git clone https://github.com/solana-labs/solana.git
$ cd solana
```

The testnode server is initialized with a ledger from stdin and
generates new ledger entries on stdout. To create the input ledger, we'll need
to create *the mint* and use it to generate a *genesis ledger*. It's done in
Expand Down
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::env;

fn main() {
println!("cargo:rustc-link-search=native=.");
if !env::var("CARGO_FEATURE_CUDA").is_err() {
println!("cargo:rustc-link-search=native=.");
println!("cargo:rustc-link-lib=static=cuda_verify_ed25519");
println!("cargo:rustc-link-search=native=/usr/local/cuda/lib64");
println!("cargo:rustc-link-lib=dylib=cudart");
println!("cargo:rustc-link-lib=dylib=cuda");
println!("cargo:rustc-link-lib=dylib=cudadevrt");
}
if !env::var("CARGO_FEATURE_ERASURE").is_err() {
println!("cargo:rustc-link-lib=dylib=Jerasure");
}
}
153 changes: 150 additions & 3 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use subscribers::Subscribers;

use subscribers;
use std::mem::size_of;

pub struct AccountantSkel<W: Write + Send + 'static> {
acc: Accountant,
Expand Down Expand Up @@ -245,8 +249,32 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
Ok(())
}
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &BlobReceiver,
blob_sender: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
for msgs in blobs {
let entries:Vec<Entry> = b.read().unwrap().data.deserialize()?;
for e in entries {
obj.lock().unwrap().acc.process_verified_events(e.events)?;
}
//TODO respond back to leader with hash of the state
}
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}


/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
obj: &Arc<Mutex<AccountantSkel<W>>>,
Expand Down Expand Up @@ -279,7 +307,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
Expand All @@ -292,6 +320,71 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}

/// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state.
/// # Arguments
/// * `obj` - The accountant state.
/// * `rsubs` - The subscribers.
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows:
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures (TODO)
/// 3. reconstruct contiguous window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs, if erasure coding is insufficient
/// d. make sure that the blobs PoH sequences connect (TODO)
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
let write = UdpSocket::bind(local)?;

let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();

let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = streamer::retransmitter(
write,
exit.clone(),
subs,
blob_recycler.clone(),
retransmit_receiver,
);
//TODO
//the packets comming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
subs,
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
}
}

#[cfg(test)]
Expand Down Expand Up @@ -319,7 +412,7 @@ mod tests {
use accountant_skel::{to_packets, Request};
use bincode::serialize;
use ecdsa;
use packet::{PacketRecycler, NUM_PACKETS};
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
use transaction::{memfind, test_tx};

use accountant::Accountant;
Expand All @@ -339,6 +432,12 @@ mod tests {
use std::time::Duration;
use transaction::Transaction;

use subscribers::{Node, Subscribers};
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use packet::{PACKET_DATA_SIZE};

#[test]
fn test_layout() {
let tr = test_tx();
Expand Down Expand Up @@ -443,6 +542,54 @@ mod tests {
exit.store(true, Ordering::Relaxed);
}

#[test]
fn test_replicate() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let node_me = Node::default();
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
let subs = Subscribers::new(node_me, node_leader, &[]);

let recv_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let historian = Historian::new(&alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
alice.last_id(),
sink(),
historian,
)));

let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();

let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");

exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}

}

#[cfg(all(feature = "unstable", test))]
Expand Down
43 changes: 40 additions & 3 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
extern crate getopts;
extern crate isatty;
extern crate rayon;
extern crate serde_json;
extern crate solana;

use getopts::Options;
use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::accountant_stub::AccountantStub;
use solana::mint::Mint;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::env;
use std::io::stdin;
use std::io::{stdin, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::thread::sleep;
use std::time::{Duration, Instant};

fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <mint.json> | {} [options]\n\n", program);
brief += " Solana client demo creates a number of transactions and\n";
brief += " sends them to a target node.";
brief += " Takes json formatted mint file to stdin.";

print!("{}", opts.usage(&brief));
}

fn main() {
let mut threads = 4usize;
let mut addr: String = "127.0.0.1:8000".to_string();
Expand All @@ -24,12 +36,21 @@ fn main() {
opts.optopt("s", "", "server address", "host:port");
opts.optopt("c", "", "client address", "host:port");
opts.optopt("t", "", "number of threads", "4");
opts.optflag("h", "help", "print help");
let args: Vec<String> = env::args().collect();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => panic!(f.to_string()),
Err(e) => {
eprintln!("{}", e);
exit(1);
}
};

if matches.opt_present("h") {
let program = args[0].clone();
print_usage(&program, opts);
return;
}
if matches.opt_present("s") {
addr = matches.opt_str("s").unwrap();
}
Expand All @@ -39,7 +60,23 @@ fn main() {
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
let mint: Mint = serde_json::from_reader(stdin()).unwrap();

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
}

let mut buffer = String::new();
let num_bytes = stdin().read_to_string(&mut buffer).unwrap();
if num_bytes == 0 {
eprintln!("empty file on stdin, expected a json file");
exit(1);
}

let mint: Mint = serde_json::from_str(&buffer).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mint_keypair = mint.keypair();
let mint_pubkey = mint.pubkey();

Expand Down
Loading