Skip to content

Commit

Permalink
ExternalSortExec v1
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Oct 28, 2021
1 parent 1c26cd0 commit 2e6eb0c
Show file tree
Hide file tree
Showing 12 changed files with 456 additions and 307 deletions.
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ path = "src/lib.rs"
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
simd = ["arrow/simd"]
crypto_expressions = ["md-5", "sha2"]
regex_expressions = ["regex", "lazy_static"]
regex_expressions = ["regex"]
unicode_expressions = ["unicode-segmentation"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
Expand All @@ -67,7 +67,7 @@ sha2 = { version = "^0.9.1", optional = true }
ordered-float = "2.0"
unicode-segmentation = { version = "^1.7.1", optional = true }
regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0", optional = true }
lazy_static = { version = "^1.4.0"}
smallvec = { version = "1.6", features = ["union"] }
rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
let mut hasher = DefaultHasher::new();
file_name.hash(&mut hasher);
let hash = hasher.finish();
let dir = local_dirs[hash.rem_euclid(local_dirs.len() as u64)];
let dir = &local_dirs[hash.rem_euclid(local_dirs.len() as u64) as usize];
let mut path = PathBuf::new();
path.push(dir);
path.push(file_name);
Expand All @@ -88,9 +88,9 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
fn create_tmp_file(local_dirs: &Vec<String>) -> Result<String> {
let name = Uuid::new_v4().to_string();
let mut path = get_file(&*name, local_dirs);
while path.exists() {
while Path::new(path.as_str()).exists() {
path = get_file(&*Uuid::new_v4().to_string(), local_dirs);
}
File::create(&path).map_err(|e| e.into())?;
File::create(&path)?;
Ok(path)
}
81 changes: 62 additions & 19 deletions datafusion/src/execution/memory_management/memory_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::memory_management::{MemoryConsumer, MemoryConsumerId};
use crate::physical_plan::aggregates::return_type;
use crate::execution::memory_management::MemoryConsumer;
use hashbrown::HashMap;
use log::{info, warn};
use std::cmp::{max, min};
use std::cmp::min;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Condvar, Mutex};

pub(crate) trait ExecutionMemoryPool {
pub(crate) trait ExecutionMemoryPool: Sync + Send + Debug {
fn memory_available(&self) -> usize;
fn memory_used(&self) -> usize;
fn memory_used_partition(&self, partition_id: usize) -> usize;
fn acquire_memory(&self, required: usize, consumer: &dyn MemoryConsumer) -> usize;
fn acquire_memory(
&self,
required: usize,
consumer: &Arc<dyn MemoryConsumer>,
) -> usize;
fn update_usage(
&self,
granted_size: usize,
Expand All @@ -49,6 +54,14 @@ impl DummyExecutionMemoryPool {
}
}

impl Debug for DummyExecutionMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DummyExecutionMemoryPool")
.field("total", &self.pool_size)
.finish()
}
}

impl ExecutionMemoryPool for DummyExecutionMemoryPool {
fn memory_available(&self) -> usize {
usize::MAX
Expand All @@ -62,7 +75,11 @@ impl ExecutionMemoryPool for DummyExecutionMemoryPool {
0
}

fn acquire_memory(&self, required: usize, _consumer: &dyn MemoryConsumer) -> usize {
fn acquire_memory(
&self,
required: usize,
_consumer: &Arc<dyn MemoryConsumer>,
) -> usize {
required
}

Expand Down Expand Up @@ -98,6 +115,15 @@ impl ConstraintExecutionMemoryPool {
}
}

impl Debug for ConstraintExecutionMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ConstraintExecutionMemoryPool")
.field("total", &self.pool_size)
.field("used", &self.memory_used())
.finish()
}
}

impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
fn memory_available(&self) -> usize {
self.pool_size - self.memory_used()
Expand All @@ -110,10 +136,17 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {

fn memory_used_partition(&self, partition_id: usize) -> usize {
let partition_usage = self.memory_usage.lock().unwrap();
partition_usage[partition_id].unwrap_or(0)
match partition_usage.get(&partition_id) {
None => 0,
Some(v) => *v,
}
}

fn acquire_memory(&self, required: usize, consumer: &dyn MemoryConsumer) -> usize {
fn acquire_memory(
&self,
required: usize,
consumer: &Arc<dyn MemoryConsumer>,
) -> usize {
assert!(required > 0);
let partition_id = consumer.partition_id();
let mut partition_usage = self.memory_usage.lock().unwrap();
Expand All @@ -138,7 +171,7 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
Some(max_available) => min(required, max_available),
};

let total_used = partition_usage.values().sum();
let total_used: usize = partition_usage.values().sum();
let total_available = self.pool_size - total_used;
// Only give it as much memory as is free, which might be none if it reached 1 / num_active_partition
let to_grant = min(max_grant, total_available);
Expand All @@ -147,8 +180,11 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if to_grant < required && current_mem + to_grant < min_memory_per_partition {
info!("{} waiting for at least 1/2N of pool to be free", consumer);
self.condvar.wait(&mut partition_usage);
info!(
"{:?} waiting for at least 1/2N of pool to be free",
consumer
);
self.condvar.wait(partition_usage);
} else {
*partition_usage.entry(partition_id).or_insert(0) += to_grant;
return to_grant;
Expand All @@ -169,20 +205,24 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
} else {
let mut partition_usage = self.memory_usage.lock().unwrap();
if granted_size > real_size {
partition_usage.entry(consumer.partition_id()) -=
*partition_usage.entry(consumer.partition_id()).or_insert(0) -=
granted_size - real_size;
} else {
// TODO: this would have caused OOM already if size estimation ahead is much smaller than
// that of actual allocation
partition_usage.entry(consumer.partition_id()) +=
*partition_usage.entry(consumer.partition_id()).or_insert(0) +=
real_size - granted_size;
}
}
}

fn release_memory(&self, release_size: usize, partition_id: usize) {
let mut partition_usage = self.memory_usage.lock().unwrap();
let current_mem = partition_usage[partition_id].unwrap_or(0);
let current_mem = match partition_usage.get(&partition_id) {
None => 0,
Some(v) => *v,
};

let to_free = if current_mem < release_size {
warn!(
"Release called to free {} but partition only holds {} from the pool",
Expand All @@ -193,8 +233,9 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
release_size
};
if partition_usage.contains_key(&partition_id) {
partition_usage.entry(partition_id) -= to_free;
if partition_usage[partition_id].unwrap() == 0 {
let entry = partition_usage.entry(partition_id).or_insert(0);
*entry -= to_free;
if *entry == 0 {
partition_usage.remove(&partition_id);
}
}
Expand All @@ -203,10 +244,12 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {

fn release_all(&self, partition_id: usize) -> usize {
let mut partition_usage = self.memory_usage.lock().unwrap();
let current_mem = partition_usage[partition_id].unwrap_or(0);
if current_mem == 0 {
return 0;
let mut current_mem = 0;
match partition_usage.get(&partition_id) {
None => return 0,
Some(v) => current_mem = *v,
}

partition_usage.remove(&partition_id);
self.condvar.notify_all();
return current_mem;
Expand Down
56 changes: 29 additions & 27 deletions datafusion/src/execution/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
pub mod memory_pool;

use crate::error::DataFusionError::OutOfMemory;
use crate::error::{DataFusionError, Result};
use crate::execution::disk_manager::DiskManager;
use crate::error::Result;
use crate::execution::memory_management::memory_pool::{
ConstraintExecutionMemoryPool, DummyExecutionMemoryPool, ExecutionMemoryPool,
};
use async_trait::async_trait;
use hashbrown::{HashMap, HashSet};
use log::{debug, info};
use std::borrow::BorrowMut;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

Expand All @@ -41,21 +41,21 @@ pub struct MemoryManager {

impl MemoryManager {
pub fn new(exec_pool_size: usize) -> Self {
let pool: dyn ExecutionMemoryPool = if exec_pool_size == usize::MAX {
DummyExecutionMemoryPool::new()
let execution_pool = if exec_pool_size == usize::MAX {
Arc::new(DummyExecutionMemoryPool::new() as dyn ExecutionMemoryPool)
} else {
ConstraintExecutionMemoryPool::new(exec_pool_size)
Arc::new(ConstraintExecutionMemoryPool::new(exec_pool_size))
};
Self {
execution_pool: Arc::new(pool),
execution_pool,
partition_memory_manager: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn acquire_exec_memory(
self: Arc<Self>,
required: usize,
consumer: &dyn MemoryConsumer,
consumer: Arc<dyn MemoryConsumer>,
) -> Result<usize> {
let partition_id = consumer.partition_id();
let partition_manager = {
Expand All @@ -70,7 +70,7 @@ impl MemoryManager {
pub fn acquire_exec_pool_memory(
&self,
required: usize,
consumer: &dyn MemoryConsumer,
consumer: &Arc<dyn MemoryConsumer>,
) -> usize {
self.execution_pool.acquire_memory(required, consumer)
}
Expand Down Expand Up @@ -110,7 +110,7 @@ fn next_id() -> usize {
pub struct PartitionMemoryManager {
memory_manager: Arc<MemoryManager>,
partition_id: usize,
consumers: Arc<Mutex<HashSet<dyn MemoryConsumer>>>,
consumers: Arc<Mutex<HashSet<Arc<dyn MemoryConsumer>>>>,
}

impl PartitionMemoryManager {
Expand All @@ -125,12 +125,12 @@ impl PartitionMemoryManager {
pub fn acquire_exec_memory(
&mut self,
required: usize,
consumer: &dyn MemoryConsumer,
consumer: Arc<dyn MemoryConsumer>,
) -> Result<usize> {
let mut consumers = self.consumers.lock().unwrap();
let mut consumers = self.consumers.lock().unwrap().borrow_mut();
let mut got = self
.memory_manager
.acquire_exec_pool_memory(required, consumer);
.acquire_exec_pool_memory(required, &consumer);
if got < required {
// spill others first
}
Expand Down Expand Up @@ -162,14 +162,14 @@ impl PartitionMemoryManager {
info!(
"Consumer {} acquired {}",
c.str_repr(),
human_readable_size(cur_used)
human_readable_size(cur_used as usize)
)
}
}
let no_consumer_size = self
.memory_manager
.exec_memory_used_for_partition(self.partition_id)
- used;
- (used as usize);
info!(
"{} bytes of memory were used for partition {} without specific consumer",
human_readable_size(no_consumer_size),
Expand All @@ -178,10 +178,10 @@ impl PartitionMemoryManager {
}
}

#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
pub struct MemoryConsumerId {
partition_id: usize,
id: usize,
pub partition_id: usize,
pub id: usize,
}

impl MemoryConsumerId {
Expand All @@ -198,20 +198,22 @@ impl Display for MemoryConsumerId {
}

#[async_trait]
pub trait MemoryConsumer {
pub trait MemoryConsumer: Send + Sync + Debug {
/// Display name of the consumer
fn name(&self) -> String;
/// Unique id of the consumer
fn id(&self) -> &MemoryConsumerId;

fn memory_manager(&self) -> Arc<MemoryManager>;
/// partition that the consumer belongs to
fn partition_id(&self) -> uszie {
fn partition_id(&self) -> usize {
self.id().partition_id
}
/// Try allocate `required` bytes as needed
fn allocate(&self, required: usize) -> Result<()> {
let got = self.memory_manager().acquire_exec_memory(required, self)?;
fn allocate(self: Arc<Self>, required: usize) -> Result<()> {
let got = self
.memory_manager()
.acquire_exec_memory(required, self.clone())?;
self.update_used(got as isize);
Ok(())
}
Expand Down Expand Up @@ -250,15 +252,15 @@ fn human_readable_size(size: usize) -> String {
let size = size as u64;
let (value, unit) = {
if size >= 2 * TB {
(size as f64 / TB, "TB")
(size as f64 / TB as f64, "TB")
} else if size >= 2 * GB {
(size as f64 / GB, "GB")
(size as f64 / GB as f64, "GB")
} else if size >= 2 * MB {
(size as f64 / MB, "MB")
(size as f64 / MB as f64, "MB")
} else if size >= 2 * KB {
(size as f64 / KB, "KB")
(size as f64 / KB as f64, "KB")
} else {
(size, "B")
(size as f64, "B")
}
};
format!("{:.1} {}", value, unit)
Expand Down
Loading

0 comments on commit 2e6eb0c

Please sign in to comment.