Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better names #10

Merged
merged 3 commits into from
Feb 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions src/historian.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,69 @@
//! The `historian` crate provides a microservice for generating a Proof-of-History.
//! It logs EventData items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an EventData item. It
//! tags each EventData with an Event and sends it back. The Event includes the
//! EventData, the latest hash, and the number of hashes since the last event.
//! The resulting Event stream represents ordered events in time.
//! It logs Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event.
//! The resulting stream of entries represents ordered events in time.

use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender};
use event::{Event, EventData};
use log::{Entry, Event};

pub struct Historian {
pub sender: Sender<EventData>,
pub receiver: Receiver<Event>,
pub thread_hdl: JoinHandle<(Event, EventThreadExitReason)>,
pub sender: Sender<Event>,
pub receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<(Entry, ExitReason)>,
}

#[derive(Debug, PartialEq, Eq)]
pub enum EventThreadExitReason {
pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}

fn drain_queue(
receiver: &Receiver<EventData>,
sender: &Sender<Event>,
fn log_events(
receiver: &Receiver<Event>,
sender: &Sender<Entry>,
num_hashes: u64,
end_hash: u64,
) -> Result<u64, (Event, EventThreadExitReason)> {
) -> Result<u64, (Entry, ExitReason)> {
use std::sync::mpsc::TryRecvError;
let mut num_hashes = num_hashes;
loop {
match receiver.try_recv() {
Ok(data) => {
let e = Event {
Ok(event) => {
let entry = Entry {
end_hash,
num_hashes,
data,
event,
};
if let Err(_) = sender.send(e.clone()) {
return Err((e, EventThreadExitReason::SendDisconnected));
if let Err(_) = sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
num_hashes = 0;
}
Err(TryRecvError::Empty) => {
return Ok(num_hashes);
}
Err(TryRecvError::Disconnected) => {
let e = Event {
let entry = Entry {
end_hash,
num_hashes,
data: EventData::Tick,
event: Event::Tick,
};
return Err((e, EventThreadExitReason::RecvDisconnected));
return Err((entry, ExitReason::RecvDisconnected));
}
}
}
}

/// A background thread that will continue tagging received EventData messages and
/// sending back Event messages until either the receiver or sender channel is closed.
pub fn event_stream(
/// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn create_logger(
start_hash: u64,
receiver: Receiver<EventData>,
sender: Sender<Event>,
) -> JoinHandle<(Event, EventThreadExitReason)> {
receiver: Receiver<Event>,
sender: Sender<Entry>,
) -> JoinHandle<(Entry, ExitReason)> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::thread;
Expand All @@ -72,9 +72,9 @@ pub fn event_stream(
let mut hasher = DefaultHasher::new();
let mut num_hashes = 0;
loop {
match drain_queue(&receiver, &sender, num_hashes, end_hash) {
match log_events(&receiver, &sender, num_hashes, end_hash) {
Ok(n) => num_hashes = n,
Err(e) => return e,
Err(err) => return err,
}
end_hash.hash(&mut hasher);
end_hash = hasher.finish();
Expand All @@ -86,9 +86,9 @@ pub fn event_stream(
impl Historian {
pub fn new(start_hash: u64) -> Self {
use std::sync::mpsc::channel;
let (sender, event_data_receiver) = channel();
let (event_sender, receiver) = channel();
let thread_hdl = event_stream(start_hash, event_data_receiver, event_sender);
let (sender, event_receiver) = channel();
let (entry_sender, receiver) = channel();
let thread_hdl = create_logger(start_hash, event_receiver, entry_sender);
Historian {
sender,
receiver,
Expand All @@ -100,39 +100,39 @@ impl Historian {
#[cfg(test)]
mod tests {
use super::*;
use event::*;
use log::*;

#[test]
fn test_historian() {
let hist = Historian::new(0);

let data = EventData::Tick;
hist.sender.send(data.clone()).unwrap();
let e0 = hist.receiver.recv().unwrap();
assert_eq!(e0.data, data);
let event = Event::Tick;
hist.sender.send(event.clone()).unwrap();
let entry0 = hist.receiver.recv().unwrap();
assert_eq!(entry0.event, event);

let data = EventData::UserDataKey(0xdeadbeef);
hist.sender.send(data.clone()).unwrap();
let e1 = hist.receiver.recv().unwrap();
assert_eq!(e1.data, data);
let event = Event::UserDataKey(0xdeadbeef);
hist.sender.send(event.clone()).unwrap();
let entry1 = hist.receiver.recv().unwrap();
assert_eq!(entry1.event, event);

drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
EventThreadExitReason::RecvDisconnected
ExitReason::RecvDisconnected
);

verify_slice(&[e0, e1], 0);
verify_slice(&[entry0, entry1], 0);
}

#[test]
fn test_historian_closed_sender() {
let hist = Historian::new(0);
drop(hist.receiver);
hist.sender.send(EventData::Tick).unwrap();
hist.sender.send(Event::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap().1,
EventThreadExitReason::SendDisconnected
ExitReason::SendDisconnected
);
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod event;
pub mod log;
pub mod historian;
extern crate itertools;
extern crate rayon;
66 changes: 33 additions & 33 deletions src/event.rs → src/log.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The `event` crate provides the foundational data structures for Proof-of-History
//! The `log` crate provides the foundational data structures for Proof-of-History,
//! an ordered log of events in time.

/// A Proof-of-History is an ordered log of events in time. Each entry contains three
/// pieces of data. The 'num_hashes' field is the number of hashes performed since the previous
/// entry. The 'end_hash' field is the result of hashing 'end_hash' from the previous entry
/// 'num_hashes' times. The 'data' field is an optional foreign key (a hash) pointing to some
/// arbitrary data that a client is looking to associate with the entry.
/// Each log entry contains three pieces of data. The 'num_hashes' field is the number
/// of hashes performed since the previous entry. The 'end_hash' field is the result
/// of hashing 'end_hash' from the previous entry 'num_hashes' times. The 'event'
/// field points to an Event that took place shortly after 'end_hash' was generated.
///
/// If you divide 'num_hashes' by the amount of time it takes to generate a new hash, you
/// get a duration estimate since the last event. Since processing power increases
Expand All @@ -13,32 +13,32 @@
/// fastest processor. Duration should therefore be estimated by assuming that the hash
/// was generated by the fastest processor at the time the entry was logged.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Event {
pub struct Entry {
pub num_hashes: u64,
pub end_hash: u64,
pub data: EventData,
pub event: Event,
}

/// When 'data' is Tick, the event represents a simple clock tick, and exists for the
/// When 'event' is Tick, the event represents a simple clock tick, and exists for the
/// sole purpose of improving the performance of event log verification. A tick can
/// be generated in 'num_hashes' hashes and verified in 'num_hashes' hashes. By logging
/// a hash alongside the tick, each tick and be verified in parallel using the 'end_hash'
/// of the preceding tick to seed its hashing.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum EventData {
pub enum Event {
Tick,
UserDataKey(u64),
}

impl Event {
/// Creates an Event from the number of hashes 'num_hashes' since the previous event
impl Entry {
/// Creates a Entry from the number of hashes 'num_hashes' since the previous event
/// and that resulting 'end_hash'.
pub fn new_tick(num_hashes: u64, end_hash: u64) -> Self {
let data = EventData::Tick;
Event {
let event = Event::Tick;
Entry {
num_hashes,
end_hash,
data,
event,
}
}

Expand All @@ -48,8 +48,8 @@ impl Event {
}
}

/// Creates the next Tick Event 'num_hashes' after 'start_hash'.
pub fn next_tick(start_hash: u64, num_hashes: u64) -> Event {
/// Creates the next Tick Entry 'num_hashes' after 'start_hash'.
pub fn next_tick(start_hash: u64, num_hashes: u64) -> Entry {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut end_hash = start_hash;
Expand All @@ -58,26 +58,26 @@ pub fn next_tick(start_hash: u64, num_hashes: u64) -> Event {
end_hash.hash(&mut hasher);
end_hash = hasher.finish();
}
Event::new_tick(num_hashes, end_hash)
Entry::new_tick(num_hashes, end_hash)
}

/// Verifies the hashes and counts of a slice of events are all consistent.
pub fn verify_slice(events: &[Event], start_hash: u64) -> bool {
pub fn verify_slice(events: &[Entry], start_hash: u64) -> bool {
use rayon::prelude::*;
let genesis = [Event::new_tick(0, start_hash)];
let genesis = [Entry::new_tick(0, start_hash)];
let event_pairs = genesis.par_iter().chain(events).zip(events);
event_pairs.all(|(x0, x1)| x1.verify(x0.end_hash))
}

/// Verifies the hashes and events serially. Exists only for reference.
pub fn verify_slice_seq(events: &[Event], start_hash: u64) -> bool {
let genesis = [Event::new_tick(0, start_hash)];
pub fn verify_slice_seq(events: &[Entry], start_hash: u64) -> bool {
let genesis = [Entry::new_tick(0, start_hash)];
let mut event_pairs = genesis.iter().chain(events).zip(events);
event_pairs.all(|(x0, x1)| x1.verify(x0.end_hash))
}

/// Create a vector of Ticks of length 'len' from 'start_hash' hash and 'num_hashes'.
pub fn create_ticks(start_hash: u64, num_hashes: u64, len: usize) -> Vec<Event> {
pub fn create_ticks(start_hash: u64, num_hashes: u64, len: usize) -> Vec<Entry> {
use itertools::unfold;
let mut events = unfold(start_hash, |state| {
let event = next_tick(*state, num_hashes);
Expand All @@ -93,8 +93,8 @@ mod tests {

#[test]
fn test_event_verify() {
assert!(Event::new_tick(0, 0).verify(0)); // base case
assert!(!Event::new_tick(0, 0).verify(1)); // base case, bad
assert!(Entry::new_tick(0, 0).verify(0)); // base case
assert!(!Entry::new_tick(0, 0).verify(1)); // base case, bad
assert!(next_tick(0, 1).verify(0)); // inductive step
assert!(!next_tick(0, 1).verify(1)); // inductive step, bad
}
Expand All @@ -104,10 +104,10 @@ mod tests {
assert_eq!(next_tick(0, 1).num_hashes, 1)
}

fn verify_slice_generic(verify_slice: fn(&[Event], u64) -> bool) {
fn verify_slice_generic(verify_slice: fn(&[Entry], u64) -> bool) {
assert!(verify_slice(&vec![], 0)); // base case
assert!(verify_slice(&vec![Event::new_tick(0, 0)], 0)); // singleton case 1
assert!(!verify_slice(&vec![Event::new_tick(0, 0)], 1)); // singleton case 2, bad
assert!(verify_slice(&vec![Entry::new_tick(0, 0)], 0)); // singleton case 1
assert!(!verify_slice(&vec![Entry::new_tick(0, 0)], 1)); // singleton case 2, bad
assert!(verify_slice(&create_ticks(0, 0, 2), 0)); // inductive step

let mut bad_ticks = create_ticks(0, 0, 2);
Expand All @@ -131,23 +131,23 @@ mod tests {
mod bench {
extern crate test;
use self::test::Bencher;
use event;
use log::*;

#[bench]
fn event_bench(bencher: &mut Bencher) {
let start_hash = 0;
let events = event::create_ticks(start_hash, 100_000, 8);
let events = create_ticks(start_hash, 100_000, 8);
bencher.iter(|| {
assert!(event::verify_slice(&events, start_hash));
assert!(verify_slice(&events, start_hash));
});
}

#[bench]
fn event_bench_seq(bencher: &mut Bencher) {
let start_hash = 0;
let events = event::create_ticks(start_hash, 100_000, 8);
let events = create_ticks(start_hash, 100_000, 8);
bencher.iter(|| {
assert!(event::verify_slice_seq(&events, start_hash));
assert!(verify_slice_seq(&events, start_hash));
});
}
}