Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: clean up the usage of slog #288

Merged
merged 2 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ prost-codec = ["raft-proto/prost-codec"]

# Make sure to synchronize updates with Harness.
[dependencies]
log = "0.4"
protobuf = "2"
slog = "2.2"
quick-error = "1.2.2"
Expand All @@ -35,15 +34,12 @@ rand = "0.6.5"
hashbrown = "0.5"
fail = { version = "0.3", optional = true }
getset = "0.0.7"
slog-stdlog = "4"
slog-envlogger = "2.1.0"
# We don't actually use this crate, but by pinning to 0.5.1 it forces Cargo to
# avoid 0.5.2 and thus pulling in a transitive dep with a security vulnerability.
term = "=0.5.1"

[dev-dependencies]
criterion = ">0.2.4"
regex = "1.1"
slog-stdlog = "4"
slog-envlogger = "2.1.0"
slog-async = "2.3.0"
slog-term = "2.4.0"

Expand Down
32 changes: 32 additions & 0 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
#![allow(dead_code)] // Due to criterion we need this to avoid warnings.
#![cfg_attr(feature = "cargo-clippy", allow(clippy::let_and_return))] // Benches often artificially return values. Allow it.

#[macro_use]
extern crate slog;

use criterion::Criterion;
use std::time::Duration;

mod suites;

pub const DEFAULT_RAFT_SETS: [(usize, usize); 4] = [(0, 0), (3, 1), (5, 2), (7, 3)];

/// The default logger we fall back to when passed `None` in external facing constructors.
///
/// Currently, this is a `log` adaptor behind a `Once` to ensure there is no clobbering.
fn default_logger() -> slog::Logger {
use slog::Drain;
use std::sync::{Mutex, Once};

static LOGGER_INITIALIZED: Once = Once::new();
static mut LOGGER: Option<slog::Logger> = None;

let logger = unsafe {
LOGGER_INITIALIZED.call_once(|| {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog_envlogger::new(drain);
LOGGER = Some(slog::Logger::root(Mutex::new(drain).fuse(), o!()));
});
LOGGER.as_ref().unwrap()
};
let case = std::thread::current()
.name()
.unwrap()
.split(":")
.last()
.unwrap()
.to_string();
logger.new(o!("case" => case))
}

fn main() {
let mut c = Criterion::default()
// Configure defaults before overriding with args.
Expand Down
6 changes: 3 additions & 3 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn bench_progress_set(c: &mut Criterion) {
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::with_capacity(voters, learners);
let mut set = ProgressSet::with_capacity(voters, learners, crate::default_logger());
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Progress::new(0, 10)).ok();
});
Expand All @@ -29,7 +29,7 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new());
b.iter(|| ProgressSet::new(crate::default_logger()));
};

c.bench_function("ProgressSet::new", bench);
Expand All @@ -39,7 +39,7 @@ pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::with_capacity(voters, learners));
b.iter(|| ProgressSet::with_capacity(voters, learners, crate::default_logger()));
}
};

Expand Down
43 changes: 16 additions & 27 deletions benches/suites/raft.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::DEFAULT_RAFT_SETS;
use criterion::{Bencher, Criterion};
use criterion::Criterion;
use raft::{storage::MemStorage, Config, Raft};

pub fn bench_raft(c: &mut Criterion) {
bench_raft_new(c);
bench_raft_campaign(c);
}

fn quick_raft(voters: usize, learners: usize) -> Raft<MemStorage> {
fn quick_raft(voters: usize, learners: usize, logger: &slog::Logger) -> Raft<MemStorage> {
let id = 1;
let storage = MemStorage::default();
let config = Config::new(id);
let mut raft = Raft::new(&config, storage).unwrap();
let mut raft = Raft::new(&config, storage, logger).unwrap();
(0..voters).for_each(|id| {
raft.add_node(id as u64).unwrap();
});
Expand All @@ -22,47 +22,36 @@ fn quick_raft(voters: usize, learners: usize) -> Raft<MemStorage> {
}

pub fn bench_raft_new(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| quick_raft(voters, learners));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("Raft::new ({}, {})", voters, learners),
bench(*voters, *learners),
);
c.bench_function(&format!("Raft::new ({}, {})", voters, learners), move |b| {
let logger = crate::default_logger();
b.iter(|| quick_raft(*voters, *learners, &logger))
});
});
}

pub fn bench_raft_campaign(c: &mut Criterion) {
let bench = |voters, learners, variant| {
move |b: &mut Bencher| {
b.iter(|| {
// TODO: Make raft clone somehow.
let mut raft = quick_raft(voters, learners);
raft.campaign(variant)
})
}
};

DEFAULT_RAFT_SETS
.iter()
.skip(1)
.for_each(|(voters, learners)| {
// We don't want to make `raft::raft` public at this point.
let msgs = [
let msgs = &[
"CampaignPreElection",
"CampaignElection",
"CampaignTransfer",
];
// Skip the first since it's 0,0
for msg in &msgs {
for msg in msgs {
c.bench_function(
&format!("Raft::campaign ({}, {}, {})", voters, learners, msg),
bench(*voters, *learners, msg.as_bytes()),
move |b| {
let logger = crate::default_logger();
b.iter(|| {
let mut raft = quick_raft(*voters, *learners, &logger);
raft.campaign(msg.as_bytes());
})
},
);
}
});
Expand Down
8 changes: 4 additions & 4 deletions benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ pub fn bench_raw_node(c: &mut Criterion) {
bench_raw_node_new(c);
}

fn quick_raw_node() -> RawNode<MemStorage> {
fn quick_raw_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let id = 1;
let storage = MemStorage::default();
let config = Config::new(id);
RawNode::new(&config, storage).unwrap()
RawNode::new(&config, storage, logger).unwrap()
}

pub fn bench_raw_node_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(quick_raw_node);
let logger = crate::default_logger();
b.iter(|| quick_raw_node(&logger));
};

c.bench_function("RawNode::new", bench);
Expand Down
65 changes: 50 additions & 15 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

#[macro_use]
extern crate slog;

use slog::Drain;
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TryRecvError};
use std::sync::{Arc, Mutex};
Expand All @@ -23,6 +28,15 @@ use raft::{prelude::*, StateRole};
use regex::Regex;

fn main() {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain)
.chan_size(4096)
.overflow_strategy(slog_async::OverflowStrategy::Block)
.build()
.fuse();
let logger = slog::Logger::root(drain, o!());

const NUM_NODES: u32 = 5;
// Create 5 mailboxes to send/receive messages. Every node holds a `Receiver` to receive
// messages from others, and uses the respective `Sender` to send messages to others.
Expand All @@ -46,7 +60,7 @@ fn main() {
let mailboxes = (1..6u64).zip(tx_vec.iter().cloned()).collect();
let mut node = match i {
// Peer 1 is the leader.
0 => Node::create_raft_leader(1, rx, mailboxes),
0 => Node::create_raft_leader(1, rx, mailboxes, &logger),
// Other peers are followers.
_ => Node::create_raft_follower(rx, mailboxes),
};
Expand All @@ -57,13 +71,14 @@ fn main() {

// Clone the stop receiver
let rx_stop_clone = Arc::clone(&rx_stop);
let logger = logger.clone();
// Here we spawn the node on a new thread and keep a handle so we can join on them later.
let handle = thread::spawn(move || loop {
thread::sleep(Duration::from_millis(10));
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg),
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
}
Expand Down Expand Up @@ -91,7 +106,13 @@ fn main() {
}

// Handle readies from the raft.
on_ready(raft_group, &mut node.kv_pairs, &node.mailboxes, &proposals);
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);

// Check control signals from
if check_signals(&rx_stop_clone) {
Expand All @@ -105,7 +126,10 @@ fn main() {
add_all_followers(proposals.as_ref());

// Put 100 key-value pairs.
println!("We get a 5 nodes Raft cluster now, now propose 100 proposals");
info!(
logger,
"We get a 5 nodes Raft cluster now, now propose 100 proposals"
);
(0..100u16)
.filter(|i| {
let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned());
Expand All @@ -116,7 +140,7 @@ fn main() {
})
.count();

println!("Propose 100 proposals success!");
info!(logger, "Propose 100 proposals success!");

// Send terminate signals
for _ in 0..NUM_NODES {
Expand Down Expand Up @@ -159,13 +183,14 @@ impl Node {
id: u64,
my_mailbox: Receiver<Message>,
mailboxes: HashMap<u64, Sender<Message>>,
logger: &slog::Logger,
) -> Self {
let mut cfg = example_config();
cfg.id = id;
cfg.tag = format!("peer_{}", id);
let logger = logger.new(o!("tag" => format!("peer_{}", id)));

let storage = MemStorage::new_with_conf_state(ConfState::from((vec![id], vec![])));
let raft_group = Some(RawNode::new(&cfg, storage).unwrap());
let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
Node {
raft_group,
my_mailbox,
Expand All @@ -188,22 +213,22 @@ impl Node {
}

// Initialize raft for followers.
fn initialize_raft_from_message(&mut self, msg: &Message) {
fn initialize_raft_from_message(&mut self, msg: &Message, logger: &slog::Logger) {
if !is_initial_msg(msg) {
return;
}
let mut cfg = example_config();
cfg.id = msg.to;
cfg.tag = format!("peer_{}", msg.to);
let logger = logger.new(o!("tag" => format!("peer_{}", msg.to)));
let storage = MemStorage::new();
self.raft_group = Some(RawNode::new(&cfg, storage).unwrap());
self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
}

// Step a raft message, initialize the raft if need.
fn step(&mut self, msg: Message) {
fn step(&mut self, msg: Message, logger: &slog::Logger) {
if self.raft_group.is_none() {
if is_initial_msg(&msg) {
self.initialize_raft_from_message(&msg);
self.initialize_raft_from_message(&msg, &logger);
} else {
return;
}
Expand All @@ -218,6 +243,7 @@ fn on_ready(
kv_pairs: &mut HashMap<u16, String>,
mailboxes: &HashMap<u64, Sender<Message>>,
proposals: &Mutex<VecDeque<Proposal>>,
logger: &slog::Logger,
) {
if !raft_group.has_ready() {
return;
Expand All @@ -230,15 +256,21 @@ fn on_ready(
// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
eprintln!("persist raft log fail: {:?}, need to retry or panic", e);
error!(
logger,
"persist raft log fail: {:?}, need to retry or panic", e
);
return;
}

// Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot.
if *ready.snapshot() != Snapshot::default() {
let s = ready.snapshot().clone();
if let Err(e) = store.wl().apply_snapshot(s) {
eprintln!("apply snapshot fail: {:?}, need to retry or panic", e);
error!(
logger,
"apply snapshot fail: {:?}, need to retry or panic", e
);
return;
}
}
Expand All @@ -247,7 +279,10 @@ fn on_ready(
for msg in ready.messages.drain(..) {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
eprintln!("send raft message to {} fail, let Raft retry it", to);
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}

Expand Down
Loading