Skip to content

Commit

Permalink
Merge 74f4727 into 5b8099d
Browse files Browse the repository at this point in the history
  • Loading branch information
rbtcollins authored May 8, 2021
2 parents 5b8099d + 74f4727 commit 5360c02
Show file tree
Hide file tree
Showing 15 changed files with 525 additions and 153 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ chrono = "0.4"
clap = "2"
download = {path = "download", default-features = false}
effective-limits = "0.5.2"
enum-map = "1.1.0"
flate2 = "1"
git-testament = "0.1.4"
home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"}
Expand All @@ -52,6 +53,7 @@ scopeguard = "1"
semver = "0.11"
serde = {version = "1.0", features = ["derive"]}
sha2 = "0.9"
sharded-slab = "0.1.1"
strsim = "0.10"
tar = "0.4.26"
tempfile = "3.1"
Expand Down
3 changes: 3 additions & 0 deletions src/cli/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ impl NotifyOnConsole {
NotificationLevel::Error => {
err!("{}", n);
}
NotificationLevel::Debug => {
debug!("{}", n);
}
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
time::Instant,
};

use super::{CompletedIo, Executor, Item};
use super::{CompletedIo, Executor, FileBuffer, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
Expand Down Expand Up @@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker {
item.result = match &mut item.kind {
super::Kind::Directory => super::create_dir(&item.full_path),
super::Kind::File(ref contents) => {
super::write_file(&item.full_path, &contents, item.mode)
if let super::FileBuffer::Immediate(ref contents) = &contents {
super::write_file(&item.full_path, contents, item.mode)
} else {
unreachable!()
}
}
super::Kind::IncrementalFile(_incremental_file) => {
return {
Expand Down Expand Up @@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker {
super::IncrementalFileState::Immediate(self.incremental_state.clone())
}
}

fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer {
super::FileBuffer::Immediate(Vec::with_capacity(capacity))
}

fn buffer_available(&self, _len: usize) -> bool {
true
}
}

/// The non-shared state for writing a file incrementally
Expand Down Expand Up @@ -160,10 +172,15 @@ impl IncrementalFileWriter {
})
}

pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
let chunk = if let FileBuffer::Immediate(v) = chunk {
v
} else {
unreachable!()
};
match self.write(chunk) {
Ok(v) => v,
Err(e) => {
Expand Down
101 changes: 91 additions & 10 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod test;
pub mod threaded;

use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::mpsc::Receiver;
use std::time::{Duration, Instant};
Expand All @@ -66,12 +67,73 @@ use anyhow::{Context, Result};

use crate::process;
use crate::utils::notifications::Notification;
use threaded::PoolReference;

/// Carries the implementation specific data for complete file transfers into the executor.
#[derive(Debug)]
pub enum FileBuffer {
Immediate(Vec<u8>),
// A reference to the object in the pool, and a handle to write to it
Threaded(PoolReference),
}

impl FileBuffer {
/// All the buffers space to be re-used when the last reference to it is dropped.
pub(crate) fn clear(&mut self) {
if let FileBuffer::Threaded(ref mut contents) = self {
contents.clear()
}
}

pub(crate) fn len(&self) -> usize {
match self {
FileBuffer::Immediate(ref vec) => vec.len(),
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(),
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(),
}
}

pub(crate) fn finished(self) -> Self {
match self {
FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => {
FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool))
}
_ => self,
}
}
}

impl Deref for FileBuffer {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
match self {
FileBuffer::Immediate(ref vec) => &vec,
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned,
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

impl DerefMut for FileBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
FileBuffer::Immediate(ref mut vec) => vec,
FileBuffer::Threaded(PoolReference::Owned(_, _)) => {
unimplemented!()
}
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
}
}
}

pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;

/// Carries the implementation specific channel data into the executor.
#[derive(Debug)]
pub enum IncrementalFile {
ImmediateReceiver,
ThreadedReceiver(Receiver<Vec<u8>>),
ThreadedReceiver(Receiver<FileBuffer>),
}

// The basic idea is that in single threaded mode we get this pattern:
Expand Down Expand Up @@ -116,7 +178,7 @@ pub enum IncrementalFile {
#[derive(Debug)]
pub enum Kind {
Directory,
File(Vec<u8>),
File(FileBuffer),
IncrementalFile(IncrementalFile),
}

Expand Down Expand Up @@ -160,7 +222,7 @@ impl Item {
}
}

pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
let len = content.len();
Self {
full_path,
Expand All @@ -177,7 +239,7 @@ impl Item {
full_path: PathBuf,
mode: u32,
state: IncrementalFileState,
) -> Result<(Self, Box<dyn FnMut(Vec<u8>) -> bool + 'a>)> {
) -> Result<(Self, Box<dyn FnMut(FileBuffer) -> bool + 'a>)> {
let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?;
let result = Self {
full_path,
Expand Down Expand Up @@ -210,19 +272,19 @@ impl IncrementalFileState {
&self,
path: &Path,
mode: u32,
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
) -> Result<(Box<dyn FnMut(FileBuffer) -> bool>, IncrementalFile)> {
use std::sync::mpsc::channel;
match *self {
IncrementalFileState::Threaded => {
let (tx, rx) = channel::<Vec<u8>>();
let (tx, rx) = channel::<FileBuffer>();
let content_callback = IncrementalFile::ThreadedReceiver(rx);
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok();
Ok((Box::new(chunk_submit), content_callback))
}
IncrementalFileState::Immediate(ref state) => {
let content_callback = IncrementalFile::ImmediateReceiver;
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk);
Ok((Box::new(chunk_submit), content_callback))
}
}
Expand Down Expand Up @@ -258,6 +320,14 @@ pub trait Executor {

/// Get any state needed for incremental file processing
fn incremental_file_state(&self) -> IncrementalFileState;

/// Get a disk buffer E.g. this gets the right sized pool object for
/// optimized situations, or just a malloc when optimisations are off etc
/// etc.
fn get_buffer(&mut self, len: usize) -> FileBuffer;

/// Query the memory budget to see if a particular size buffer is available
fn buffer_available(&self, len: usize) -> bool;
}

/// Trivial single threaded IO to be used from executors.
Expand All @@ -267,7 +337,17 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
// Files, write them.
item.result = match &mut item.kind {
Kind::Directory => create_dir(&item.full_path),
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
Kind::File(ref mut contents) => {
contents.clear();
match contents {
FileBuffer::Immediate(ref contents) => {
write_file(&item.full_path, &contents, item.mode)
}
FileBuffer::Threaded(ref mut contents) => {
write_file(&item.full_path, &contents, item.mode)
}
}
}
Kind::IncrementalFile(incremental_file) => write_file_incremental(
&item.full_path,
incremental_file,
Expand Down Expand Up @@ -367,6 +447,7 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
/// Get the executor for disk IO.
pub fn get_executor<'a>(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
ram_budget: usize,
) -> Result<Box<dyn Executor + 'a>> {
// If this gets lots of use, consider exposing via the config file.
let thread_count = match process().var("RUSTUP_IO_THREADS") {
Expand All @@ -377,6 +458,6 @@ pub fn get_executor<'a>(
};
Ok(match thread_count {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(notify_handler, n)),
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
})
}
Loading

0 comments on commit 5360c02

Please sign in to comment.