Skip to content

Commit

Permalink
Update documentation and simplify implementation (#265)
Browse files Browse the repository at this point in the history
The documentation for consolidate resulted in surprises elsewhere when the method did not fulfill a very natural expectation (that it would filter supplied zero diff elements). The behavior was improved in #264, but going forward it would be best to manage the expectations about these methods.
  • Loading branch information
frankmcsherry authored Mar 28, 2020
1 parent 11fb737 commit 2fbea46
Showing 1 changed file with 95 additions and 85 deletions.
180 changes: 95 additions & 85 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@
//! Often we find ourselves with collections of records with associated weights (often
//! integers) where we want to reduce the collection to the point that each record occurs
//! at most once, with the accumulated weights. These methods supply that functionality.
//!
//! Importantly, these methods are used internally by differential dataflow, but are made
//! public for the convenience of others. Their precise behavior is driven by the needs of
//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.

use crate::difference::Semigroup;

/// Sorts and consolidates `vec`.
///
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
consolidate_from(vec, 0);
}

/// Sorts and consolidate `vec[offset..]`.
///
/// This method will sort `vec[offset..]` and then consolidate runs with the same first
/// element of a pair by accumulating the second elements of the pairs. Should the final
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
let length = consolidate_slice(&mut vec[offset..]);
Expand All @@ -26,62 +36,60 @@ pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
if slice.len() > 1 {

slice.sort_by(|x,y| x.0.cmp(&y.0));

let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1 += &(*ptr2).1;
}
else {
if !(*ptr1).1.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
slice.sort_by(|x,y| x.0.cmp(&y.0));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1 += &(*ptr2).1;
}
else {
if !(*ptr1).1.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
}
}
if !slice[offset].1.is_zero() {
offset += 1;
}

offset
} else if slice.len() == 1 && !slice[0].1.is_zero() {
1
} else {
0
}
if offset < slice.len() && !slice[offset].1.is_zero() {
offset += 1;
}

offset
}

/// Sorts and consolidates `vec`.
///
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
consolidate_updates_from(vec, 0);
}

/// Sorts and consolidate `vec[offset..]`.
///
/// This method will sort `vec[offset..]` and then consolidate runs with the same first
/// element of a pair by accumulating the second elements of the pairs. Should the final
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
let length = consolidate_updates_slice(&mut vec[offset..]);
Expand All @@ -93,50 +101,44 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
if slice.len() > 1 {

slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2 += &(*ptr2).2;
}
else {
if !(*ptr1).2.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2 += &(*ptr2).2;
}
else {
if !(*ptr1).2.is_zero() {
offset += 1;
}

let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
}
}
if !slice[offset].2.is_zero() {
offset += 1;
}

offset
} else if slice.len() == 1 && !slice[0].2.is_zero() {
1
} else {
0
}
}
if offset < slice.len() && !slice[offset].2.is_zero() {
offset += 1;
}

offset
}

#[cfg(test)]
Expand All @@ -158,6 +160,10 @@ mod tests {
vec![("a", 0)],
vec![],
),
(
vec![("a", 0), ("b", 0)],
vec![],
),
];

for (mut input, output) in test_cases {
Expand All @@ -182,6 +188,10 @@ mod tests {
vec![("a", 1, 0)],
vec![],
),
(
vec![("a", 1, 0), ("b", 1, 0)],
vec![],
),
];

for (mut input, output) in test_cases {
Expand Down

0 comments on commit 2fbea46

Please sign in to comment.