Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastcdc #11

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,11 @@ license = "BSD-3-Clause"
default = []
bench = []

[profile.bench]
opt-level = 3
debug = true
lto = true
debug-assertions = false

[dev-dependencies]
rand = "0.3"
281 changes: 281 additions & 0 deletions src/fastcdc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
use super::Engine;
use std::default::Default;
use std::cmp;
use std::mem;
use Gear;

pub struct FastCDC {
current_chunk_size: u64,
gear: Gear,
}

impl Default for FastCDC {
fn default() -> Self {
FastCDC {
current_chunk_size: 0,
gear: Gear::default(),
}
}
}


impl Engine for FastCDC {
type Digest = u64;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really emphasises that trait Engine is an incorrect abstraction for CDC. FastCDC doesn't actually have a Digest, nor can you roll a single byte.

(Similarly, AE and MAXP can't even pretend to have a digest, because they're not even approximately hash-based)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. However Engine was all we have ATM. :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point, I had only been looking at rolling checksums (hence the crate name) and hadn't really thought about non-checksum based alternatives for doing chunking.


#[inline(always)]
fn roll_byte(&mut self, b: u8) {
self.gear.roll_byte(b);
}

#[inline(always)]
fn digest(&self) -> u64 {
self.gear.digest()
}

#[inline]
fn reset(&mut self) {
self.gear.reset();
self.current_chunk_size = 0;
}
}

impl FastCDC {
/// Create new FastCDC engine with default chunking settings
pub fn new() -> Self {
Default::default()
}

/// Create new `FastCDC` engine with custom chunking settings
///
/// `chunk_bits` is number of bits that need to match in
/// the edge condition. `CHUNK_BITS` constant is the default.
pub fn new_with_chunk_bits(chunk_bits: u32) -> Self {
Self {
current_chunk_size: 0,
gear: Gear::new_with_chunk_bits(chunk_bits),
}
}

/// Find chunk edge using `FastCDC` defaults.
///
/// See `Engine::find_chunk_edge_cond`.
pub fn find_chunk_edge(&mut self, mut buf: &[u8]) -> Option<(usize, u64)> {

const DIGEST_SIZE: usize = 64;
debug_assert_eq!(
mem::size_of::<<Self as Engine>::Digest>() * 8,
DIGEST_SIZE
);

const SPREAD_BITS: u32 = 3;
const WINDOW_SIZE: usize = 64;

let min_shift = DIGEST_SIZE as u32 - self.gear.chunk_bits - SPREAD_BITS;
let max_shift = DIGEST_SIZE as u32 - self.gear.chunk_bits + SPREAD_BITS;
let min_size = (1 << (self.gear.chunk_bits - SPREAD_BITS)) as u64;
let ignore_size = min_size - WINDOW_SIZE as u64;
let avg_size = (1 << self.gear.chunk_bits) as u64;
let max_size = (1 << (self.gear.chunk_bits + SPREAD_BITS)) as u64;

let mut cur_offset = 0usize;

loop {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this loop for? As far as I can tell it will loop exactly once, because all codepaths return?

debug_assert!(self.current_chunk_size < max_size);
debug_assert!(cur_offset < max_size as usize);

if buf.is_empty() {
return None
}

// ignore bytes that are not going to influence the digest
if self.current_chunk_size < ignore_size {
let skip_bytes = cmp::min(ignore_size - self.current_chunk_size, buf.len() as u64);
Copy link
Collaborator Author

@dpc dpc Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this mins outside of condition might perform better. Like here: https://github.com/dswd/zvault/blob/master/chunking/src/fastcdc.rs#L99

self.current_chunk_size += skip_bytes;
cur_offset += skip_bytes as usize;
buf = &buf[skip_bytes as usize..];
}

// ignore edges in bytes that are smaller than min_size
if self.current_chunk_size < min_size {
let roll_bytes = cmp::min(min_size - self.current_chunk_size,
buf.len() as u64);
self.gear.roll(&buf[..roll_bytes as usize]);
self.current_chunk_size += roll_bytes;
cur_offset += roll_bytes as usize;
buf = &buf[roll_bytes as usize..];
}

// roll through early bytes with smaller probability
if self.current_chunk_size < avg_size {
let roll_bytes = cmp::min(avg_size - self.current_chunk_size,
buf.len() as u64);
let result = self.gear.find_chunk_edge_cond(buf, |e: &Gear| (e.digest() >> min_shift) == 0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing the padding-zeros optimisation for deduplication efficiency (likewise the large-chunk calculation)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the padding zeros. Since the underlying Gear (as described in Ddelta paper) is taking most significant bits, isn't that the ultimate padding? I was so confused but this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you're quite right. I've misread this; the window here is essentially the width of the digest; 64 bytes.

The authors of the paper describe the mask they use in the algorithm as being empirically derived, but infuriatingly not giving any details about it. You'd think that taking the largest window would be best, but apparently not? Apparently it works best when the contributing bits are split approximately uniformly across the 64bit digest?

Copy link
Collaborator Author

@dpc dpc Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works best when the contributing bits are split approximately uniformly across the 64bit digest?

But why? :D I am not very academically minded person, but I found this paper rather confusing is many places. A lot repeating the obvious, and glancing over the important details.

Well, @dswd has the correct implementation https://github.com/dswd/zvault/blob/master/chunking/src/fastcdc.rs here, so we can just use it. :)


if let Some((offset, digest)) = result {
self.reset();
return Some((cur_offset + offset, digest));
}

self.current_chunk_size += roll_bytes;
cur_offset += roll_bytes as usize;
buf = &buf[roll_bytes as usize..];
}

// roll through late bytes with higher probability
if self.current_chunk_size < max_size {
let roll_bytes = cmp::min(max_size - self.current_chunk_size,
buf.len() as u64);
let result = self.gear.find_chunk_edge_cond(buf, |e: &Gear| (e.digest() >> max_shift) == 0);

if let Some((offset, digest)) = result {
self.reset();
return Some((cur_offset + offset, digest));
}

self.current_chunk_size += roll_bytes;
cur_offset += roll_bytes as usize;
buf = &buf[roll_bytes as usize..];
}

if self.current_chunk_size >= max_size {
debug_assert_eq!(self.current_chunk_size, max_size);
let result = (cur_offset, self.gear.digest());
self.reset();
return Some(result);
}

}
}
}

#[cfg(test)]
mod tests {
use super::{FastCDC, Engine};

#[test]
fn effective_window_size() {
let ones = vec![0x1; 1024];
let zeroes = vec![0x0; 1024];

let mut gear = FastCDC::new();
gear.roll(&ones);
let digest = gear.digest();

let mut gear = FastCDC::new();
gear.roll(&zeroes);

for (i, &b) in ones.iter().enumerate() {
gear.roll_byte(b);
if gear.digest() == digest {
assert_eq!(i, 63);
return;
}
}

panic!("matching digest not found");
}

#[cfg(feature = "bench")]
mod bench {
use test::Bencher;
use super::*;

use tests::test_data_1mb;

#[bench]
fn perf_1mb(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new();
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}

#[bench]
fn perf_1mb_16k_chunks(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new_with_chunk_bits(14);
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}
#[bench]
fn perf_1mb_64k_chunks(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new_with_chunk_bits(16);
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}

#[bench]
fn perf_1mb_128k_chunks(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new_with_chunk_bits(17);
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}


#[bench]
fn perf_1mb_256k_chunks(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new_with_chunk_bits(18);
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}

#[bench]
fn perf_1mb_512k_chunks(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = FastCDC::new_with_chunk_bits(19);
let mut i = 0;
while let Some((new_i, _)) = gear.find_chunk_edge(&v[i..v.len()]) {
i += new_i;
if i == v.len() {
break;
}
}
});
}
}
}
19 changes: 7 additions & 12 deletions src/gear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub const CHUNK_BITS: u32 = 13;

pub struct Gear {
digest: Wrapping<u64>,
chunk_bits: u32,
pub chunk_bits: u32,
}

impl Default for Gear {
Expand Down Expand Up @@ -111,19 +111,14 @@ mod tests {

#[cfg(feature = "bench")]
mod bench {
use test::Bencher;
use rand::{Rng, SeedableRng, StdRng};
use super::*;
use test::Bencher;
use super::*;

#[bench]
fn gear_perf_1mb(b: &mut Bencher) {
let mut v = vec![0x0; 1024 * 1024];
use tests::test_data_1mb;

let seed: &[_] = &[1, 2, 3, 4];
let mut rng: StdRng = SeedableRng::from_seed(seed);
for i in 0..v.len() {
v[i] = rng.gen();
}
#[bench]
fn perf_1mb(b: &mut Bencher) {
let v = test_data_1mb();

b.iter(|| {
let mut gear = Gear::new();
Expand Down
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,34 @@ extern crate test;
/// Rolling sum and chunk splitting used by
/// `bup` - https://github.com/bup/bup/
pub mod bup;
pub use bup::Bup;

pub mod gear;
pub use gear::Gear;

pub use bup::Bup;
pub mod fastcdc;
pub use fastcdc::FastCDC;

/// Rolling sum engine trait
pub trait Engine {
type Digest;

/// Roll over one byte
#[inline(always)]
fn roll_byte(&mut self, byte: u8);

/// Roll over a slice of bytes
#[inline(always)]
fn roll(&mut self, buf: &[u8]) {
buf.iter().map(|&b| self.roll_byte(b)).count();
}

/// Return current rolling sum digest
#[inline(always)]
fn digest(&self) -> Self::Digest;

/// Resets the internal state
#[inline(always)]
fn reset(&mut self);

/// Find the end of the chunk.
Expand Down
12 changes: 12 additions & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,15 @@ fn bup_selftest()
assert_eq!(sum2a, sum2b);
assert_eq!(sum3a, sum3b);
}

pub fn test_data_1mb() -> Vec<u8> {
let mut v = vec![0x0; 1024 * 1024];

let seed: &[_] = &[2, 1, 255, 70];
let mut rng: StdRng = SeedableRng::from_seed(seed);
for i in 0..v.len() {
v[i] = rng.gen();
}

v
}