Skip to content

Commit

Permalink
*: clean up the usage of slog (#288)
Browse files Browse the repository at this point in the history
This commit brings 2 big changes to the API:

1. No more default logger. Users are required to give a logger to use
the most of structs in this library. This makes the interface simple.
2. No more tag scope value. Some APIs insert tag into KVs, some don't.
It's confusing and easily lead to duplicate KV definitions. Users are
required to insert tag into KVs before passing the logger to the
library, and there is no more explicit configuration of tag.
  • Loading branch information
BusyJay authored Sep 18, 2019
1 parent 326716a commit a82ce9f
Show file tree
Hide file tree
Showing 28 changed files with 561 additions and 610 deletions.
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ prost-codec = ["raft-proto/prost-codec"]

# Make sure to synchronize updates with Harness.
[dependencies]
log = "0.4"
protobuf = "2"
slog = "2.2"
quick-error = "1.2.2"
Expand All @@ -35,15 +34,12 @@ rand = "0.6.5"
hashbrown = "0.5"
fail = { version = "0.3", optional = true }
getset = "0.0.7"
slog-stdlog = "4"
slog-envlogger = "2.1.0"
# We don't actually use this crate, but by pinning to 0.5.1 it forces Cargo to
# avoid 0.5.2 and thus pulling in a transitive dep with a security vulnerability.
term = "=0.5.1"

[dev-dependencies]
criterion = ">0.2.4"
regex = "1.1"
slog-stdlog = "4"
slog-envlogger = "2.1.0"
slog-async = "2.3.0"
slog-term = "2.4.0"

Expand Down
32 changes: 32 additions & 0 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
#![allow(dead_code)] // Due to criterion we need this to avoid warnings.
#![cfg_attr(feature = "cargo-clippy", allow(clippy::let_and_return))] // Benches often artificially return values. Allow it.

#[macro_use]
extern crate slog;

use criterion::Criterion;
use std::time::Duration;

mod suites;

pub const DEFAULT_RAFT_SETS: [(usize, usize); 4] = [(0, 0), (3, 1), (5, 2), (7, 3)];

/// The default logger we fall back to when passed `None` in external facing constructors.
///
/// Currently, this is a `log` adaptor behind a `Once` to ensure there is no clobbering.
fn default_logger() -> slog::Logger {
use slog::Drain;
use std::sync::{Mutex, Once};

static LOGGER_INITIALIZED: Once = Once::new();
static mut LOGGER: Option<slog::Logger> = None;

let logger = unsafe {
LOGGER_INITIALIZED.call_once(|| {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog_envlogger::new(drain);
LOGGER = Some(slog::Logger::root(Mutex::new(drain).fuse(), o!()));
});
LOGGER.as_ref().unwrap()
};
let case = std::thread::current()
.name()
.unwrap()
.split(":")
.last()
.unwrap()
.to_string();
logger.new(o!("case" => case))
}

fn main() {
let mut c = Criterion::default()
// Configure defaults before overriding with args.
Expand Down
6 changes: 3 additions & 3 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn bench_progress_set(c: &mut Criterion) {
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::with_capacity(voters, learners);
let mut set = ProgressSet::with_capacity(voters, learners, crate::default_logger());
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Progress::new(0, 10)).ok();
});
Expand All @@ -29,7 +29,7 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new());
b.iter(|| ProgressSet::new(crate::default_logger()));
};

c.bench_function("ProgressSet::new", bench);
Expand All @@ -39,7 +39,7 @@ pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::with_capacity(voters, learners));
b.iter(|| ProgressSet::with_capacity(voters, learners, crate::default_logger()));
}
};

Expand Down
43 changes: 16 additions & 27 deletions benches/suites/raft.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::DEFAULT_RAFT_SETS;
use criterion::{Bencher, Criterion};
use criterion::Criterion;
use raft::{storage::MemStorage, Config, Raft};

pub fn bench_raft(c: &mut Criterion) {
bench_raft_new(c);
bench_raft_campaign(c);
}

fn quick_raft(voters: usize, learners: usize) -> Raft<MemStorage> {
fn quick_raft(voters: usize, learners: usize, logger: &slog::Logger) -> Raft<MemStorage> {
let id = 1;
let storage = MemStorage::default();
let config = Config::new(id);
let mut raft = Raft::new(&config, storage).unwrap();
let mut raft = Raft::new(&config, storage, logger).unwrap();
(0..voters).for_each(|id| {
raft.add_node(id as u64).unwrap();
});
Expand All @@ -22,47 +22,36 @@ fn quick_raft(voters: usize, learners: usize) -> Raft<MemStorage> {
}

pub fn bench_raft_new(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| quick_raft(voters, learners));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("Raft::new ({}, {})", voters, learners),
bench(*voters, *learners),
);
c.bench_function(&format!("Raft::new ({}, {})", voters, learners), move |b| {
let logger = crate::default_logger();
b.iter(|| quick_raft(*voters, *learners, &logger))
});
});
}

pub fn bench_raft_campaign(c: &mut Criterion) {
let bench = |voters, learners, variant| {
move |b: &mut Bencher| {
b.iter(|| {
// TODO: Make raft clone somehow.
let mut raft = quick_raft(voters, learners);
raft.campaign(variant)
})
}
};

DEFAULT_RAFT_SETS
.iter()
.skip(1)
.for_each(|(voters, learners)| {
// We don't want to make `raft::raft` public at this point.
let msgs = [
let msgs = &[
"CampaignPreElection",
"CampaignElection",
"CampaignTransfer",
];
// Skip the first since it's 0,0
for msg in &msgs {
for msg in msgs {
c.bench_function(
&format!("Raft::campaign ({}, {}, {})", voters, learners, msg),
bench(*voters, *learners, msg.as_bytes()),
move |b| {
let logger = crate::default_logger();
b.iter(|| {
let mut raft = quick_raft(*voters, *learners, &logger);
raft.campaign(msg.as_bytes());
})
},
);
}
});
Expand Down
8 changes: 4 additions & 4 deletions benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ pub fn bench_raw_node(c: &mut Criterion) {
bench_raw_node_new(c);
}

fn quick_raw_node() -> RawNode<MemStorage> {
fn quick_raw_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let id = 1;
let storage = MemStorage::default();
let config = Config::new(id);
RawNode::new(&config, storage).unwrap()
RawNode::new(&config, storage, logger).unwrap()
}

pub fn bench_raw_node_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(quick_raw_node);
let logger = crate::default_logger();
b.iter(|| quick_raw_node(&logger));
};

c.bench_function("RawNode::new", bench);
Expand Down
65 changes: 50 additions & 15 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

#[macro_use]
extern crate slog;

use slog::Drain;
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TryRecvError};
use std::sync::{Arc, Mutex};
Expand All @@ -23,6 +28,15 @@ use raft::{prelude::*, StateRole};
use regex::Regex;

fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain)
.chan_size(4096)
.overflow_strategy(slog_async::OverflowStrategy::Block)
.build()
.fuse();
let logger = slog::Logger::root(drain, o!());

const NUM_NODES: u32 = 5;
// Create 5 mailboxes to send/receive messages. Every node holds a `Receiver` to receive
// messages from others, and uses the respective `Sender` to send messages to others.
Expand All @@ -46,7 +60,7 @@ fn main() {
let mailboxes = (1..6u64).zip(tx_vec.iter().cloned()).collect();
let mut node = match i {
// Peer 1 is the leader.
0 => Node::create_raft_leader(1, rx, mailboxes),
0 => Node::create_raft_leader(1, rx, mailboxes, &logger),
// Other peers are followers.
_ => Node::create_raft_follower(rx, mailboxes),
};
Expand All @@ -57,13 +71,14 @@ fn main() {

// Clone the stop receiver
let rx_stop_clone = Arc::clone(&rx_stop);
let logger = logger.clone();
// Here we spawn the node on a new thread and keep a handle so we can join on them later.
let handle = thread::spawn(move || loop {
thread::sleep(Duration::from_millis(10));
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg),
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
}
Expand Down Expand Up @@ -91,7 +106,13 @@ fn main() {
}

// Handle readies from the raft.
on_ready(raft_group, &mut node.kv_pairs, &node.mailboxes, &proposals);
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);

// Check control signals from
if check_signals(&rx_stop_clone) {
Expand All @@ -105,7 +126,10 @@ fn main() {
add_all_followers(proposals.as_ref());

// Put 100 key-value pairs.
println!("We get a 5 nodes Raft cluster now, now propose 100 proposals");
info!(
logger,
"We get a 5 nodes Raft cluster now, now propose 100 proposals"
);
(0..100u16)
.filter(|i| {
let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned());
Expand All @@ -116,7 +140,7 @@ fn main() {
})
.count();

println!("Propose 100 proposals success!");
info!(logger, "Propose 100 proposals success!");

// Send terminate signals
for _ in 0..NUM_NODES {
Expand Down Expand Up @@ -159,13 +183,14 @@ impl Node {
id: u64,
my_mailbox: Receiver<Message>,
mailboxes: HashMap<u64, Sender<Message>>,
logger: &slog::Logger,
) -> Self {
let mut cfg = example_config();
cfg.id = id;
cfg.tag = format!("peer_{}", id);
let logger = logger.new(o!("tag" => format!("peer_{}", id)));

let storage = MemStorage::new_with_conf_state(ConfState::from((vec![id], vec![])));
let raft_group = Some(RawNode::new(&cfg, storage).unwrap());
let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
Node {
raft_group,
my_mailbox,
Expand All @@ -188,22 +213,22 @@ impl Node {
}

// Initialize raft for followers.
fn initialize_raft_from_message(&mut self, msg: &Message) {
fn initialize_raft_from_message(&mut self, msg: &Message, logger: &slog::Logger) {
if !is_initial_msg(msg) {
return;
}
let mut cfg = example_config();
cfg.id = msg.to;
cfg.tag = format!("peer_{}", msg.to);
let logger = logger.new(o!("tag" => format!("peer_{}", msg.to)));
let storage = MemStorage::new();
self.raft_group = Some(RawNode::new(&cfg, storage).unwrap());
self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
}

// Step a raft message, initialize the raft if need.
fn step(&mut self, msg: Message) {
fn step(&mut self, msg: Message, logger: &slog::Logger) {
if self.raft_group.is_none() {
if is_initial_msg(&msg) {
self.initialize_raft_from_message(&msg);
self.initialize_raft_from_message(&msg, &logger);
} else {
return;
}
Expand All @@ -218,6 +243,7 @@ fn on_ready(
kv_pairs: &mut HashMap<u16, String>,
mailboxes: &HashMap<u64, Sender<Message>>,
proposals: &Mutex<VecDeque<Proposal>>,
logger: &slog::Logger,
) {
if !raft_group.has_ready() {
return;
Expand All @@ -230,15 +256,21 @@ fn on_ready(
// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
eprintln!("persist raft log fail: {:?}, need to retry or panic", e);
error!(
logger,
"persist raft log fail: {:?}, need to retry or panic", e
);
return;
}

// Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot.
if *ready.snapshot() != Snapshot::default() {
let s = ready.snapshot().clone();
if let Err(e) = store.wl().apply_snapshot(s) {
eprintln!("apply snapshot fail: {:?}, need to retry or panic", e);
error!(
logger,
"apply snapshot fail: {:?}, need to retry or panic", e
);
return;
}
}
Expand All @@ -247,7 +279,10 @@ fn on_ready(
for msg in ready.messages.drain(..) {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
eprintln!("send raft message to {} fail, let Raft retry it", to);
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}

Expand Down
Loading

0 comments on commit a82ce9f

Please sign in to comment.