Skip to content

Commit

Permalink
Merge pull request #9 from garious/add-historian
Browse files Browse the repository at this point in the history
Add historian
  • Loading branch information
garious authored Feb 18, 2018
2 parents 3550f70 + 4c94754 commit 6ec0e58
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
/// Though processing power varies across nodes, the network gives priority to the
/// fastest processor. Duration should therefore be estimated by assuming that the hash
/// was generated by the fastest processor at the time the entry was logged.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Event {
pub num_hashes: u64,
pub end_hash: u64,
Expand All @@ -23,6 +24,7 @@ pub struct Event {
/// be generated in 'num_hashes' hashes and verified in 'num_hashes' hashes. By logging
/// a hash alongside the tick, each tick and be verified in parallel using the 'end_hash'
/// of the preceding tick to seed its hashing.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum EventData {
Tick,
UserDataKey(u64),
Expand Down
138 changes: 138 additions & 0 deletions src/historian.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! The `historian` crate provides a microservice for generating a Proof-of-History.
//! It logs EventData items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an EventData item. It
//! tags each EventData with an Event and sends it back. The Event includes the
//! EventData, the latest hash, and the number of hashes since the last event.
//! The resulting Event stream represents ordered events in time.
use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender};
use event::{Event, EventData};

pub struct Historian {
pub sender: Sender<EventData>,
pub receiver: Receiver<Event>,
pub thread_hdl: JoinHandle<(Event, EventThreadExitReason)>,
}

#[derive(Debug, PartialEq, Eq)]
pub enum EventThreadExitReason {
RecvDisconnected,
SendDisconnected,
}

fn drain_queue(
receiver: &Receiver<EventData>,
sender: &Sender<Event>,
num_hashes: u64,
end_hash: u64,
) -> Result<u64, (Event, EventThreadExitReason)> {
use std::sync::mpsc::TryRecvError;
let mut num_hashes = num_hashes;
loop {
match receiver.try_recv() {
Ok(data) => {
let e = Event {
end_hash,
num_hashes,
data,
};
if let Err(_) = sender.send(e.clone()) {
return Err((e, EventThreadExitReason::SendDisconnected));
}
num_hashes = 0;
}
Err(TryRecvError::Empty) => {
return Ok(num_hashes);
}
Err(TryRecvError::Disconnected) => {
let e = Event {
end_hash,
num_hashes,
data: EventData::Tick,
};
return Err((e, EventThreadExitReason::RecvDisconnected));
}
}
}
}

/// A background thread that will continue tagging received EventData messages and
/// sending back Event messages until either the receiver or sender channel is closed.
pub fn event_stream(
start_hash: u64,
receiver: Receiver<EventData>,
sender: Sender<Event>,
) -> JoinHandle<(Event, EventThreadExitReason)> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::thread;
thread::spawn(move || {
let mut end_hash = start_hash;
let mut hasher = DefaultHasher::new();
let mut num_hashes = 0;
loop {
match drain_queue(&receiver, &sender, num_hashes, end_hash) {
Ok(n) => num_hashes = n,
Err(e) => return e,
}
end_hash.hash(&mut hasher);
end_hash = hasher.finish();
num_hashes += 1;
}
})
}

impl Historian {
pub fn new(start_hash: u64) -> Self {
use std::sync::mpsc::channel;
let (sender, event_data_receiver) = channel();
let (event_sender, receiver) = channel();
let thread_hdl = event_stream(start_hash, event_data_receiver, event_sender);
Historian {
sender,
receiver,
thread_hdl,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use event::*;

#[test]
fn test_historian() {
let hist = Historian::new(0);

let data = EventData::Tick;
hist.sender.send(data.clone()).unwrap();
let e0 = hist.receiver.recv().unwrap();
assert_eq!(e0.data, data);

let data = EventData::UserDataKey(0xdeadbeef);
hist.sender.send(data.clone()).unwrap();
let e1 = hist.receiver.recv().unwrap();
assert_eq!(e1.data, data);

drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
EventThreadExitReason::RecvDisconnected
);

verify_slice(&[e0, e1], 0);
}

#[test]
fn test_historian_closed_sender() {
let hist = Historian::new(0);
drop(hist.receiver);
hist.sender.send(EventData::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap().1,
EventThreadExitReason::SendDisconnected
);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod event;
pub mod historian;
extern crate itertools;
extern crate rayon;

0 comments on commit 6ec0e58

Please sign in to comment.