Skip to content

Commit

Permalink
Lazy TempDir creation in DiskManager (#1695)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jan 30, 2022
1 parent a7f0156 commit fecce97
Showing 1 changed file with 73 additions and 26 deletions.
99 changes: 73 additions & 26 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
use crate::error::{DataFusionError, Result};
use log::debug;
use rand::{thread_rng, Rng};
use std::path::PathBuf;
use std::sync::Arc;
use std::{path::PathBuf, sync::Mutex};
use tempfile::{Builder, NamedTempFile, TempDir};

/// Configuration for temporary disk access
Expand Down Expand Up @@ -67,39 +67,49 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Vec<TempDir>,
/// TempDirs to put temporary files in. A new OS specified
/// temporary directory will be created if this list is empty.
local_dirs: Mutex<Vec<TempDir>>,
}

impl DiskManager {
/// Create a DiskManager given the configuration
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;

debug!(
"Created directory {:?} as DataFusion working directory",
tempdir
);
Ok(Arc::new(Self {
local_dirs: vec![tempdir],
}))
}
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(vec![]),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Arc::new(Self { local_dirs }))
Ok(Arc::new(Self {
local_dirs: Mutex::new(local_dirs),
}))
}
}
}

/// Return a temporary file from a randomized choice in the configured locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
create_tmp_file(&self.local_dirs)
let mut local_dirs = self.local_dirs.lock().unwrap();

// Create a temporary directory if needed
if local_dirs.is_empty() {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;

debug!(
"Created directory '{:?}' as DataFusion tempfile directory",
tempdir.path().to_string_lossy()
);

local_dirs.push(tempdir);
}

create_tmp_file(&local_dirs)
}
}

Expand Down Expand Up @@ -129,10 +139,42 @@ fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {

#[cfg(test)]
mod tests {
use std::path::Path;

use super::*;
use crate::error::Result;
use tempfile::TempDir;

#[test]
fn lazy_temp_dir_creation() -> Result<()> {
// A default configuration should not create temp files until requested
let config = DiskManagerConfig::new();
let dm = DiskManager::try_new(config)?;

assert_eq!(0, local_dir_snapshot(&dm).len());

// can still create a tempfile however:
let actual = dm.create_tmp_file()?;

// Now the tempdir has been created on demand
assert_eq!(1, local_dir_snapshot(&dm).len());

// the returned tempfile file should be in the temp directory
let local_dirs = local_dir_snapshot(&dm);
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));

Ok(())
}

fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
dm.local_dirs
.lock()
.unwrap()
.iter()
.map(|p| p.path().into())
.collect()
}

#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
Expand All @@ -147,19 +189,24 @@ mod tests {
let actual = dm.create_tmp_file()?;

// the file should be in one of the specified local directories
let found = local_dirs.iter().any(|p| {
actual
.path()
assert_path_in_dirs(actual.path(), local_dirs.into_iter());

Ok(())
}

/// Asserts that `file_path` is found anywhere in any of `dir` directories
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
dirs: impl Iterator<Item = &'a Path>,
) {
let dirs: Vec<&Path> = dirs.collect();

let found = dirs.iter().any(|file_path| {
file_path
.ancestors()
.any(|candidate_path| *p == candidate_path)
.any(|candidate_path| *file_path == candidate_path)
});

assert!(
found,
"Can't find {:?} in specified local dirs: {:?}",
actual, local_dirs
);

Ok(())
assert!(found, "Can't find {:?} in dirs: {:?}", file_path, dirs);
}
}

0 comments on commit fecce97

Please sign in to comment.