Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

self-ticking logger #18

Merged
merged 4 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "silk"
description = "A silky smooth implementation of the Loom architecture"
version = "0.2.1"
version = "0.2.2"
documentation = "https://docs.rs/silk"
homepage = "http://loomprotocol.com/"
repository = "https://github.com/loomprotocol/silk"
Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,27 @@ Create a *Historian* and send it *events* to generate an *event log*, where each
is tagged with the historian's latest *hash*. Then ensure the order of events was not tampered
with by verifying each entry's hash can be generated from the hash in the previous entry:

![historian](https://user-images.githubusercontent.com/55449/36492930-97a572be-16eb-11e8-8289-358e9507189e.png)
![historian](https://user-images.githubusercontent.com/55449/36499105-7c8db6a0-16fd-11e8-8b88-c6e0f52d7a50.png)

```rust
extern crate silk;

use silk::historian::Historian;
use silk::log::{verify_slice, Entry, Event, Sha256Hash};
use std::{thread, time};
use std::thread::sleep;
use std::time::Duration;
use std::sync::mpsc::SendError;

fn create_log(hist: &Historian) -> Result<(), SendError<Event>> {
hist.sender.send(Event::Tick)?;
thread::sleep(time::Duration::new(0, 100_000));
sleep(Duration::from_millis(15));
hist.sender.send(Event::UserDataKey(Sha256Hash::default()))?;
thread::sleep(time::Duration::new(0, 100_000));
hist.sender.send(Event::Tick)?;
sleep(Duration::from_millis(10));
Ok(())
}

fn main() {
let seed = Sha256Hash::default();
let hist = Historian::new(&seed);
let hist = Historian::new(&seed, Some(10));
create_log(&hist).expect("send error");
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
Expand Down
4 changes: 0 additions & 4 deletions diagrams/historian.msc
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
msc {
client,historian,logger;

client=>historian [ label = "Tick" ] ;
historian=>logger [ label = "Tick" ] ;
logger=>historian [ label = "e0 = Entry{hash: h0, n: 0, event: Tick}" ] ;
logger=>logger [ label = "h1 = hash(h0)" ] ;
logger=>logger [ label = "h2 = hash(h1)" ] ;
Expand All @@ -13,8 +11,6 @@ msc {
logger=>logger [ label = "h4 = hash(h3)" ] ;
logger=>logger [ label = "h5 = hash(h4)" ] ;
logger=>logger [ label = "h6 = hash(h5)" ] ;
client=>historian [ label = "Tick" ] ;
historian=>logger [ label = "Tick" ] ;
logger=>historian [ label = "e2 = Entry{hash: h6, n: 3, event: Tick}" ] ;
client=>historian [ label = "collect()" ] ;
historian=>client [ label = "entries = [e0, e1, e2]" ] ;
Expand Down
11 changes: 5 additions & 6 deletions src/bin/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ extern crate silk;

use silk::historian::Historian;
use silk::log::{verify_slice, Entry, Event, Sha256Hash};
use std::{thread, time};
use std::thread::sleep;
use std::time::Duration;
use std::sync::mpsc::SendError;

fn create_log(hist: &Historian) -> Result<(), SendError<Event>> {
hist.sender.send(Event::Tick)?;
thread::sleep(time::Duration::new(0, 100_000));
sleep(Duration::from_millis(15));
hist.sender.send(Event::UserDataKey(Sha256Hash::default()))?;
thread::sleep(time::Duration::new(0, 100_000));
hist.sender.send(Event::Tick)?;
sleep(Duration::from_millis(10));
Ok(())
}

fn main() {
let seed = Sha256Hash::default();
let hist = Historian::new(&seed);
let hist = Historian::new(&seed, Some(10));
create_log(&hist).expect("send error");
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
Expand Down
88 changes: 68 additions & 20 deletions src/historian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender};
use std::time::{Duration, SystemTime};
use log::{extend_and_hash, hash, Entry, Event, Sha256Hash};

pub struct Historian {
Expand All @@ -20,29 +21,48 @@ pub enum ExitReason {
RecvDisconnected,
SendDisconnected,
}
fn log_event(
sender: &Sender<Entry>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
event: Event,
) -> Result<(), (Entry, ExitReason)> {
if let Event::UserDataKey(key) = event {
*end_hash = extend_and_hash(end_hash, &key);
}
let entry = Entry {
end_hash: *end_hash,
num_hashes: *num_hashes,
event,
};
if let Err(_) = sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
*num_hashes = 0;
Ok(())
}

fn log_events(
receiver: &Receiver<Event>,
sender: &Sender<Entry>,
num_hashes: &mut u64,
end_hash: &mut Sha256Hash,
epoch: SystemTime,
num_ticks: &mut u64,
ms_per_tick: Option<u64>,
) -> Result<(), (Entry, ExitReason)> {
use std::sync::mpsc::TryRecvError;
loop {
if let Some(ms) = ms_per_tick {
let now = SystemTime::now();
if now > epoch + Duration::from_millis((*num_ticks + 1) * ms) {
log_event(sender, num_hashes, end_hash, Event::Tick)?;
*num_ticks += 1;
}
}
match receiver.try_recv() {
Ok(event) => {
if let Event::UserDataKey(key) = event {
*end_hash = extend_and_hash(end_hash, &key);
}
let entry = Entry {
end_hash: *end_hash,
num_hashes: *num_hashes,
event,
};
if let Err(_) = sender.send(entry.clone()) {
return Err((entry, ExitReason::SendDisconnected));
}
*num_hashes = 0;
log_event(sender, num_hashes, end_hash, event)?;
}
Err(TryRecvError::Empty) => {
return Ok(());
Expand All @@ -63,15 +83,26 @@ fn log_events(
/// sending back Entry messages until either the receiver or sender channel is closed.
pub fn create_logger(
start_hash: Sha256Hash,
ms_per_tick: Option<u64>,
receiver: Receiver<Event>,
sender: Sender<Entry>,
) -> JoinHandle<(Entry, ExitReason)> {
use std::thread;
thread::spawn(move || {
let mut end_hash = start_hash;
let mut num_hashes = 0;
let mut num_ticks = 0;
let epoch = SystemTime::now();
loop {
if let Err(err) = log_events(&receiver, &sender, &mut num_hashes, &mut end_hash) {
if let Err(err) = log_events(
&receiver,
&sender,
&mut num_hashes,
&mut end_hash,
epoch,
&mut num_ticks,
ms_per_tick,
) {
return err;
}
end_hash = hash(&end_hash);
Expand All @@ -81,11 +112,11 @@ pub fn create_logger(
}

impl Historian {
pub fn new(start_hash: &Sha256Hash) -> Self {
pub fn new(start_hash: &Sha256Hash, ms_per_tick: Option<u64>) -> Self {
use std::sync::mpsc::channel;
let (sender, event_receiver) = channel();
let (entry_sender, receiver) = channel();
let thread_hdl = create_logger(*start_hash, event_receiver, entry_sender);
let thread_hdl = create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender);
Historian {
sender,
receiver,
Expand All @@ -98,14 +129,13 @@ impl Historian {
mod tests {
use super::*;
use log::*;
use std::thread::sleep;
use std::time::Duration;

#[test]
fn test_historian() {
use std::thread::sleep;
use std::time::Duration;

let zero = Sha256Hash::default();
let hist = Historian::new(&zero);
let hist = Historian::new(&zero, None);

hist.sender.send(Event::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
Expand All @@ -129,12 +159,30 @@ mod tests {
#[test]
fn test_historian_closed_sender() {
let zero = Sha256Hash::default();
let hist = Historian::new(&zero);
let hist = Historian::new(&zero, None);
drop(hist.receiver);
hist.sender.send(Event::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap().1,
ExitReason::SendDisconnected
);
}

#[test]
fn test_ticking_historian() {
let zero = Sha256Hash::default();
let hist = Historian::new(&zero, Some(20));
sleep(Duration::from_millis(30));
hist.sender.send(Event::UserDataKey(zero)).unwrap();
sleep(Duration::from_millis(15));
drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap().1,
ExitReason::RecvDisconnected
);

let entries: Vec<Entry> = hist.receiver.iter().collect();
assert!(entries.len() > 1);
assert!(verify_slice(&entries, &zero));
}
}