Skip to content

Commit

Permalink
Initial rayon support: ParallelExtend + FromParallelIterator (#89)
Browse files Browse the repository at this point in the history
This was a straightforward step towards #14
  • Loading branch information
Others authored Jun 15, 2020
1 parent 448d9f6 commit f623ac9
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 6 deletions.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# Don't include build output
/target
**/*.rs.bk
# We're a library--don't include our lockfile
Cargo.lock
# Don't include rustfmt backup files
**/*.rs.bk

# Don't include IDE configuration files
.idea
**/.vscode/*
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sanitize = ['crossbeam-epoch/sanitize']
crossbeam-epoch = "0.8.2"
parking_lot = "0.10"
num_cpus = "1.12.0"
rayon = {version = "1.3", optional = true}
serde = {version = "1.0.105", optional = true}

[dependencies.ahash]
Expand Down
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
displayName: Enable debug symbols
# only --lib --tests b/c of https://github.com/rust-lang/rust/issues/53945
- script: |
env ASAN_OPTIONS="detect_odr_violation=0" RUSTFLAGS="-Z sanitizer=address" cargo test --lib --tests --features sanitize --target x86_64-unknown-linux-gnu
env ASAN_OPTIONS="detect_odr_violation=0" RUSTFLAGS="-Z sanitizer=address" cargo test --lib --tests --all-features --target x86_64-unknown-linux-gnu
displayName: cargo -Z sanitizer=address test
- job: lsan
dependsOn: deny
Expand All @@ -91,7 +91,7 @@ jobs:
cat Cargo.toml
displayName: Enable debug symbols
- script: |
env RUSTFLAGS="-Z sanitizer=leak" cargo test --features sanitize --target x86_64-unknown-linux-gnu
env RUSTFLAGS="-Z sanitizer=leak" cargo test --all-features --target x86_64-unknown-linux-gnu
displayName: cargo -Z sanitizer=leak test
- template: coverage.yml@templates
parameters:
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ mod raw;
mod set;
mod set_ref;

#[cfg(feature = "rayon")]
mod rayon_impls;

#[cfg(feature = "serde")]
mod serde_impls;

Expand Down
2 changes: 1 addition & 1 deletion src/map_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::ops::Index;
/// The current thread will be pinned for the duration of this reference.
/// Keep in mind that this prevents the collection of garbage generated by the map.
pub struct HashMapRef<'map, K, V, S = crate::DefaultHashBuilder> {
map: &'map HashMap<K, V, S>,
pub(crate) map: &'map HashMap<K, V, S>,
guard: GuardRef<'map>,
}

Expand Down
320 changes: 320 additions & 0 deletions src/rayon_impls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
use crate::{HashMap, HashMapRef, HashSet, HashSetRef};
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
use std::hash::{BuildHasher, Hash};

impl<K, V, S> FromParallelIterator<(K, V)> for HashMap<K, V, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Default + Sync,
{
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = (K, V)>,
{
let mut created_map = HashMap::with_hasher(S::default());
created_map.par_extend(par_iter);
created_map
}
}

impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = (K, V)>,
{
(&*self).par_extend(par_iter);
}
}

impl<K, V, S> ParallelExtend<(K, V)> for &HashMap<K, V, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = (K, V)>,
{
par_iter.into_par_iter().for_each_init(
|| self.guard(),
|guard, (k, v)| {
self.insert(k, v, &guard);
},
);
}
}

impl<'map, K, V, S> ParallelExtend<(K, V)> for HashMapRef<'map, K, V, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
V: Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = (K, V)>,
{
self.map.par_extend(par_iter);
}
}

impl<K, S> FromParallelIterator<K> for HashSet<K, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
S: BuildHasher + Default + Sync,
{
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = K>,
{
let mut created_set = HashSet::with_hasher(S::default());
created_set.par_extend(par_iter);
created_set
}
}

impl<K, S> ParallelExtend<K> for HashSet<K, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = K>,
{
(&*self).par_extend(par_iter);
}
}

impl<K, S> ParallelExtend<K> for &HashSet<K, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = K>,
{
let tuple_iter = par_iter.into_par_iter().map(|k| (k, ()));
(&self.map).par_extend(tuple_iter);
}
}

impl<'set, K, S> ParallelExtend<K> for HashSetRef<'set, K, S>
where
K: Clone + Hash + Ord + Send + Sync + 'static,
S: BuildHasher + Sync,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = K>,
{
self.set.par_extend(par_iter);
}
}

#[cfg(test)]
mod test {
use crate::{HashMap, HashSet};
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend};

#[test]
fn hm_from_empty_parallel_iter() {
let to_create_from: Vec<(i32, i32)> = Vec::new();
let created_map: HashMap<i32, i32> = HashMap::from_par_iter(to_create_from.into_par_iter());
assert_eq!(created_map.len(), 0);
}

#[test]
fn hm_from_large_parallel_iter() {
let mut to_create_from: Vec<(i32, i32)> = Vec::new();
for i in 0..100 {
to_create_from.push((i + 100, i * 10));
}
let created_map: HashMap<i32, i32> = HashMap::from_par_iter(to_create_from.into_par_iter());
assert_eq!(created_map.len(), 100);

let guard = created_map.guard();
assert_eq!(created_map.get(&100, &guard), Some(&0));
assert_eq!(created_map.get(&199, &guard), Some(&990));
}

#[test]
fn hs_from_empty_parallel_iter() {
let to_create_from: Vec<i32> = Vec::new();
let created_set: HashSet<i32> = HashSet::from_par_iter(to_create_from.into_par_iter());
assert_eq!(created_set.len(), 0);
}

#[test]
fn hs_from_large_parallel_iter() {
let mut to_create_from: Vec<(i32, i32)> = Vec::new();
for i in 0..100 {
to_create_from.push((i + 100, i * 10));
}
let created_map: HashSet<(i32, i32)> =
HashSet::from_par_iter(to_create_from.into_par_iter());
assert_eq!(created_map.len(), 100);

let guard = created_map.guard();
assert!(created_map.contains(&(100, 0), &guard));
assert!(!created_map.contains(&(100, 10000), &guard));
}

#[test]
fn hm_parallel_extend_by_nothing() {
let to_extend_with = Vec::new();

let mut map = HashMap::new();
let guard = map.guard();
map.insert(1, 2, &guard);
map.insert(3, 4, &guard);

map.par_extend(to_extend_with.into_par_iter());

assert_eq!(map.len(), 2);

assert_eq!(map.get(&1, &guard), Some(&2));
assert_eq!(map.get(&3, &guard), Some(&4));
}

#[test]
fn hm_parallel_extend_by_a_bunch() {
let mut to_extend_with = Vec::new();
for i in 0..100 {
to_extend_with.push((i + 100, i * 10));
}

let mut map = HashMap::new();
let guard = map.guard();
map.insert(1, 2, &guard);
map.insert(3, 4, &guard);

map.par_extend(to_extend_with.into_par_iter());
assert_eq!(map.len(), 102);

assert_eq!(map.get(&1, &guard), Some(&2));
assert_eq!(map.get(&3, &guard), Some(&4));
assert_eq!(map.get(&100, &guard), Some(&0));
assert_eq!(map.get(&199, &guard), Some(&990));
}

#[test]
fn hm_ref_parallel_extend_by_nothing() {
let to_extend_with = Vec::new();

let map = HashMap::new();
let guard = map.guard();
map.insert(1, 2, &guard);
map.insert(3, 4, &guard);

map.pin().par_extend(to_extend_with.into_par_iter());

assert_eq!(map.len(), 2);

assert_eq!(map.get(&1, &guard), Some(&2));
assert_eq!(map.get(&3, &guard), Some(&4));
}

#[test]
fn hm_ref_parallel_extend_by_a_bunch() {
let mut to_extend_with = Vec::new();
for i in 0..100 {
to_extend_with.push((i + 100, i * 10));
}

let map = HashMap::new();
let guard = map.guard();
map.insert(1, 2, &guard);
map.insert(3, 4, &guard);

map.pin().par_extend(to_extend_with.into_par_iter());
assert_eq!(map.len(), 102);

assert_eq!(map.get(&1, &guard), Some(&2));
assert_eq!(map.get(&3, &guard), Some(&4));
assert_eq!(map.get(&100, &guard), Some(&0));
assert_eq!(map.get(&199, &guard), Some(&990));
}

#[test]
fn hs_parallel_extend_by_nothing() {
let to_extend_with = Vec::new();

let mut set = HashSet::new();
let guard = set.guard();
set.insert(1, &guard);
set.insert(3, &guard);

set.par_extend(to_extend_with.into_par_iter());

assert_eq!(set.len(), 2);

assert!(set.contains(&1, &guard));
assert!(!set.contains(&17, &guard));
}

#[test]
fn hs_parallel_extend_by_a_bunch() {
let mut to_extend_with = Vec::new();
for i in 0..100 {
to_extend_with.push((i + 100, i * 10));
}

let mut set = HashSet::new();
let guard = set.guard();
set.insert((1, 2), &guard);
set.insert((3, 4), &guard);

set.par_extend(to_extend_with.into_par_iter());
assert_eq!(set.len(), 102);

assert!(set.contains(&(1, 2), &guard));
assert!(set.contains(&(199, 990), &guard));
assert!(!set.contains(&(199, 167), &guard));
}

#[test]
fn hs_ref_parallel_extend_by_nothing() {
let to_extend_with = Vec::new();

let mut set = HashSet::new();
let guard = set.guard();
set.insert((1, 2), &guard);
set.insert((3, 4), &guard);

set.par_extend(to_extend_with.into_par_iter());
assert_eq!(set.len(), 2);

assert!(set.contains(&(1, 2), &guard));
assert!(!set.contains(&(199, 990), &guard));
assert!(!set.contains(&(199, 167), &guard));
}

#[test]
fn hs_ref_parallel_extend_by_a_bunch() {
let mut to_extend_with = Vec::new();
for i in 0..100 {
to_extend_with.push((i + 100, i * 10));
}

let set = HashSet::new();
let mut set_ref = set.pin();
set_ref.insert((1, 2));
set_ref.insert((3, 4));

set_ref.par_extend(to_extend_with.into_par_iter());
assert_eq!(set.len(), 102);

assert!(set_ref.contains(&(1, 2)));
assert!(set_ref.contains(&(199, 990)));
assert!(!set_ref.contains(&(199, 167)));
}
}
2 changes: 1 addition & 1 deletion src/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::iter::FromIterator;
/// }
/// ```
pub struct HashSet<T, S = crate::DefaultHashBuilder> {
map: HashMap<T, (), S>,
pub(crate) map: HashMap<T, (), S>,
}

impl<T> HashSet<T, crate::DefaultHashBuilder> {
Expand Down
2 changes: 1 addition & 1 deletion src/set_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::hash::{BuildHasher, Hash};
/// The current thread will be pinned for the duration of this reference.
/// Keep in mind that this prevents the collection of garbage generated by the set.
pub struct HashSetRef<'set, T, S = crate::DefaultHashBuilder> {
set: &'set HashSet<T, S>,
pub(crate) set: &'set HashSet<T, S>,
guard: GuardRef<'set>,
}

Expand Down

0 comments on commit f623ac9

Please sign in to comment.