Skip to content

Commit

Permalink
feat: simplify rename_no_replace
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 4, 2022
1 parent 92c35d8 commit 7364ea8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 198 deletions.
12 changes: 1 addition & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,6 @@
_has_pandas = True


def _is_old_glibc_version():
if "CS_GNU_LIBC_VERSION" in os.confstr_names:
version = os.confstr("CS_GNU_LIBC_VERSION").split(" ")[1]
return version < "2.28"
else:
return False


if sys.platform == "win32":
pytest.skip("Writer isn't yet supported on Windows", allow_module_level=True)

if _is_old_glibc_version():
pytest.skip(
"Writer isn't yet supported on Linux with glibc < 2.28", allow_module_level=True
)


@pytest.fixture()
def sample_data():
nrows = 5
Expand Down
4 changes: 0 additions & 4 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ description = "Native Delta Lake implementation in Rust"
edition = "2021"

[dependencies]
libc = ">=0.2.90,<1"
errno = "0.2"
thiserror = "1"
serde = { version = "1", features = ["derive"] }
Expand Down Expand Up @@ -108,9 +107,6 @@ gcs = ["async-stream", "tame-gcs", "tame-oauth", "reqwest"]
glue = ["s3", "rusoto_glue"]
python = ["arrow/pyarrow"]

[build-dependencies]
glibc_version = "0"

[dependencies.dynamodb_lock]
path = "../dynamodb_lock"
version = "0"
Expand Down
30 changes: 0 additions & 30 deletions rust/build.rs

This file was deleted.

156 changes: 20 additions & 136 deletions rust/src/storage/file/rename.rs
Original file line number Diff line number Diff line change
@@ -1,123 +1,27 @@
use crate::StorageError;

#[cfg(windows)]
mod imp {
use super::*;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::sync::Mutex;

// async rename under Windows is flaky due to no native rename if not exists support.
// Use a shared lock to prevent threads renaming at the same time.
lazy_static! {
static ref SHARED_LOCK: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
}

pub async fn rename_noreplace(from: &str, to: &str) -> Result<(), StorageError> {
let from_path = String::from(from);
let to_path = String::from(to);

// rename in Windows already set the MOVEFILE_REPLACE_EXISTING flag
// it should always succeed no matter destination filconcurrent_writes_teste exists or not
tokio::task::spawn_blocking(move || {
let lock = Arc::clone(&SHARED_LOCK);
let mut data = lock.lock().unwrap();
*data += 1;

// doing best effort in windows since there is no native rename if not exists support
let to_exists = std::fs::metadata(&to_path).is_ok();
if to_exists {
return Err(StorageError::AlreadyExists(to_path));
}
std::fs::rename(&from_path, &to_path).map_err(|e| {
let to_exists = std::fs::metadata(&to_path).is_ok();
if to_exists {
return StorageError::AlreadyExists(to_path);
}

StorageError::other_std_io_err(format!(
"failed to rename {} to {}: {}",
from_path, to_path, e
))
})
})
.await
.unwrap()
}
}

#[cfg(unix)]
mod imp {
use super::*;
use std::ffi::CString;

fn to_c_string(p: &str) -> Result<CString, StorageError> {
CString::new(p).map_err(|e| StorageError::Generic(format!("{}", e)))
}

pub async fn rename_noreplace(from: &str, to: &str) -> Result<(), StorageError> {
let cs_from = to_c_string(from)?;
let cs_to = to_c_string(to)?;

let ret = unsafe {
tokio::task::spawn_blocking(move || {
let ret = platform_specific_rename(cs_from.as_ptr(), cs_to.as_ptr());
if ret != 0 {
Err(errno::errno())
} else {
Ok(())
}
})
.await
.unwrap()
};

match ret {
Err(e) => {
if let libc::EEXIST = e.0 {
return Err(StorageError::AlreadyExists(String::from(to)));
}
if let libc::EINVAL = e.0 {
return Err(StorageError::Generic(format!(
"rename_noreplace failed with message '{}'",
e
)));
}
return Err(StorageError::other_std_io_err(format!(
"failed to rename {} to {}: {}",
from, to, e
)));
}
Ok(_) => Ok(()),
}
}

#[allow(unused_variables)]
unsafe fn platform_specific_rename(from: *const libc::c_char, to: *const libc::c_char) -> i32 {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", target_env = "gnu"))] {
cfg_if::cfg_if! {
if #[cfg(glibc_renameat2)] {
libc::renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, libc::RENAME_NOREPLACE)
} else {
// target has old glibc (< 2.28), we would need to invoke syscall manually
unimplemented!()
}
}
} else if #[cfg(target_os = "macos")] {
libc::renamex_np(from, to, libc::RENAME_EXCL)
} else {
unimplemented!()
}
}
}
}

/// Atomically renames `from` to `to`.
/// `from` has to exist, but `to` is not, otherwise the operation will fail.
#[inline]
pub async fn rename_noreplace(from: &str, to: &str) -> Result<(), StorageError> {
imp::rename_noreplace(from, to).await
let from_path = String::from(from);
let to_path = String::from(to);

tokio::task::spawn_blocking(move || {
std::fs::hard_link(&from_path, &to_path).map_err(|err| {
if err.kind() == std::io::ErrorKind::AlreadyExists {
StorageError::AlreadyExists(to_path)
} else {
err.into()
}
})?;

std::fs::remove_file(from_path)?;

Ok(())
})
.await
.unwrap()
}

#[cfg(test)]
Expand All @@ -136,28 +40,8 @@ mod tests {

// unsuccessful move not_exists to C, not_exists is missing
match rename_noreplace("not_exists", c.to_str().unwrap()).await {
Err(StorageError::Io { source: e }) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
assert_eq!(
e.to_string(),
format!(
"failed to rename not_exists to {}: The system cannot find the file specified. (os error 2)",
c.to_str().unwrap()
)
);
} else {
assert_eq!(
e.to_string(),
format!(
"failed to rename not_exists to {}: No such file or directory",
c.to_str().unwrap()
)
);
}
}
}
Err(e) => panic!("expect std::io::Error, got: {:#}", e),
Err(StorageError::NotFound) => {}
Err(e) => panic!("expect StorageError::NotFound, got: {:#}", e),
Ok(()) => panic!("{}", "expect rename to fail with Err, but got Ok"),
}

Expand Down

0 comments on commit 7364ea8

Please sign in to comment.