From fd6c31f17f2772ef29516636af76038e5392ae13 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Sat, 18 May 2019 12:59:34 +1200 Subject: [PATCH 1/2] Customisable tracker units Files are in bytes, but other things we may track are not. --- src/cli/download_tracker.rs | 48 +++++++++++++++++++++++++++---------- src/utils/notifications.rs | 10 ++++++++ 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/cli/download_tracker.rs b/src/cli/download_tracker.rs index e53d9ca1bd..4288dd30aa 100644 --- a/src/cli/download_tracker.rs +++ b/src/cli/download_tracker.rs @@ -42,6 +42,8 @@ pub struct DownloadTracker { /// If we have displayed progress, this is the number of characters we /// rendered, so we can erase it cleanly. displayed_charcount: Option, + /// What units to show progress in + units: Vec, } impl DownloadTracker { @@ -56,6 +58,7 @@ impl DownloadTracker { last_sec: None, term: term2::stdout(), displayed_charcount: None, + units: vec!["B".into(); 1], } } @@ -76,6 +79,15 @@ impl DownloadTracker { self.download_finished(); true } + Notification::Install(In::Utils(Un::DownloadPushUnits(units))) => { + self.push_units(units.into()); + true + } + Notification::Install(In::Utils(Un::DownloadPopUnits)) => { + self.pop_units(); + true + } + _ => false, } } @@ -139,11 +151,13 @@ impl DownloadTracker { } /// Display the tracked download information to the terminal. fn display(&mut self) { - let total_h = Size(self.total_downloaded); + // Panic if someone pops the default bytes unit... + let units = &self.units.last().unwrap(); + let total_h = Size(self.total_downloaded, units); let sum = self.downloaded_last_few_secs.iter().fold(0, |a, &v| a + v); let len = self.downloaded_last_few_secs.len(); let speed = if len > 0 { sum / len } else { 0 }; - let speed_h = Size(speed); + let speed_h = Size(speed, units); let elapsed_h = Duration(precise_time_s() - self.start_sec); // First, move to the start of the current line and clear it. @@ -163,7 +177,7 @@ impl DownloadTracker { let output = match self.content_len { Some(content_len) => { - let content_len_h = Size(content_len); + let content_len_h = Size(content_len, units); let content_len = content_len as f64; let percent = (self.total_downloaded as f64 / content_len) * 100.; let remaining = content_len - self.total_downloaded as f64; @@ -184,6 +198,14 @@ impl DownloadTracker { let _ = self.term.flush(); self.displayed_charcount = Some(output.chars().count()); } + + pub fn push_units(&mut self, new_units: String) { + self.units.push(new_units); + } + + pub fn pop_units(&mut self) { + self.units.pop(); + } } /// Human readable representation of duration(seconds). @@ -207,21 +229,21 @@ impl fmt::Display for Duration { } } -/// Human readable size (bytes) -struct Size(usize); +/// Human readable size (some units) +struct Size<'a>(usize, &'a str); -impl fmt::Display for Size { +impl<'a> fmt::Display for Size<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - const KIB: f64 = 1024.0; - const MIB: f64 = KIB * KIB; + const KI: f64 = 1024.0; + const MI: f64 = KI * KI; let size = self.0 as f64; - if size >= MIB { - write!(f, "{:5.1} MiB", size / MIB) // XYZ.P MiB - } else if size >= KIB { - write!(f, "{:5.1} KiB", size / KIB) + if size >= MI { + write!(f, "{:5.1} Mi{}", size / MI, self.1) // XYZ.P Mi + } else if size >= KI { + write!(f, "{:5.1} Ki{}", size / KI, self.1) } else { - write!(f, "{:3.0} B", size) + write!(f, "{:3.0} {}", size, self.1) } } } diff --git a/src/utils/notifications.rs b/src/utils/notifications.rs index 71ecfbac58..4e9edf6c66 100644 --- a/src/utils/notifications.rs +++ b/src/utils/notifications.rs @@ -18,6 +18,12 @@ pub enum Notification<'a> { DownloadDataReceived(&'a [u8]), /// Download has finished. DownloadFinished, + /// This thins we're tracking is not counted in bytes. + /// Must be paired with a pop-units; our other calls are not + /// setup to guarantee this any better. + DownloadPushUnits(&'a str), + /// finish using an unusual unit. + DownloadPopUnits, NoCanonicalPath(&'a Path), ResumingPartialDownload, UsingCurl, @@ -34,6 +40,8 @@ impl<'a> Notification<'a> { | DownloadingFile(_, _) | DownloadContentLengthReceived(_) | DownloadDataReceived(_) + | DownloadPushUnits(_) + | DownloadPopUnits | DownloadFinished | ResumingPartialDownload | UsingCurl @@ -58,6 +66,8 @@ impl<'a> Display for Notification<'a> { DownloadingFile(url, _) => write!(f, "downloading file from: '{}'", url), DownloadContentLengthReceived(len) => write!(f, "download size is: '{}'", len), DownloadDataReceived(data) => write!(f, "received some data of size {}", data.len()), + DownloadPushUnits(_) => Ok(()), + DownloadPopUnits => Ok(()), DownloadFinished => write!(f, "download finished"), NoCanonicalPath(path) => write!(f, "could not canonicalize path: '{}'", path.display()), ResumingPartialDownload => write!(f, "resuming partial download"), From ff0dc0dfe9eab265e81e77c28fb554e4af6f381c Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Sun, 12 May 2019 20:55:38 +1200 Subject: [PATCH 2/2] Avoid blocking on CloseHandle On Windows closing a file involves CloseHandle, which can be quite slow (6+ms) for various reasons, including circumstances outside our control such as virus scanners, content indexing, device driver cache write-back synchronisation and so forth. Rather than having a super long list of all the things users need to do to optimise the performance of CloseHandle, use a small threadpool to avoid blocking package extraction when closing file handles. This does run a risk of resource exhaustion, but as we only have 20k files in the largest package today that should not be a problem in practice. https://www.mercurial-scm.org/pipermail/mercurial-devel/2016-January/078404.html provided inspiration for this. My benchmark system went from 21/22s to 11s with this change with both 4 or 8 threads. --- Cargo.lock | 23 ++++-- Cargo.toml | 3 +- src/dist/component/package.rs | 147 ++++++++++++++++++++++++++++++++-- src/dist/manifestation.rs | 11 +-- src/install.rs | 3 +- 5 files changed, 167 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 820bf9c1a0..cb37fa82b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,12 +412,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "filetime" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.54 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1251,9 +1252,10 @@ dependencies = [ "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "strsim 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tar 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)", + "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "term 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1460,10 +1462,10 @@ dependencies = [ [[package]] name = "tar" -version = "0.4.24" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "filetime 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", + "filetime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.54 (registry+https://github.com/rust-lang/crates.io-index)", "redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)", "xattr 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1544,6 +1546,14 @@ dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "threadpool" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.42" @@ -1937,7 +1947,7 @@ dependencies = [ "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -"checksum filetime 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2f8c63033fcba1f51ef744505b3cad42510432b904c062afa67ad7ece008429d" +"checksum filetime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "450537dc346f0c4d738dda31e790da1da5d4bd12145aad4da0d03d713cb3794f" "checksum flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f87e68aa82b2de08a6e037f1385455759df6e445a8df5e005b4297191dbf18aa" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -2054,7 +2064,7 @@ dependencies = [ "checksum strsim 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "032c03039aae92b350aad2e3779c352e104d919cb192ba2fabbd7b831ce4f0f6" "checksum syn 0.15.33 (registry+https://github.com/rust-lang/crates.io-index)" = "ec52cd796e5f01d0067225a5392e70084acc4c0013fa71d55166d38a8b307836" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" -"checksum tar 0.4.24 (registry+https://github.com/rust-lang/crates.io-index)" = "2dd071a2604c8fd8902ca42908856821ed7a06e3cd846f84d75873c978dec7fb" +"checksum tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)" = "b3196bfbffbba3e57481b6ea32249fbaf590396a52505a2615adbb79d9d826d3" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" "checksum term 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5e6b677dd1e8214ea1ef4297f85dbcbed8e8cdddb561040cc998ca2551c37561" @@ -2063,6 +2073,7 @@ dependencies = [ "checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" +"checksum threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e2f0c90a5f3459330ac8bc0d2f879c693bb7a2f59689c1083fc4ef83834da865" "checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "cec6c34409089be085de9403ba2010b80e36938c9ca992c4f67f407bb13db0b1" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" diff --git a/Cargo.toml b/Cargo.toml index 86a30d5401..448eb14395 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,10 +43,11 @@ scopeguard = "1" semver = "0.9" sha2 = "0.8" strsim = "0.9.1" -tar = "0.4" +tar = "0.4.26" tempdir = "0.3.4" # FIXME(issue #1788) term = "=0.5.1" +threadpool = "1" time = "0.1.34" toml = "0.5" url = "1" diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index b5504dce11..0c5fc644d6 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -7,6 +7,7 @@ use crate::dist::component::transaction::*; use crate::dist::temp; use crate::errors::*; +use crate::utils::notifications::Notification; use crate::utils::utils; use std::collections::HashSet; @@ -194,13 +195,17 @@ fn set_file_perms(_dest_path: &Path, _src_path: &Path) -> Result<()> { pub struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>); impl<'a> TarPackage<'a> { - pub fn new(stream: R, temp_cfg: &'a temp::Cfg) -> Result { + pub fn new( + stream: R, + temp_cfg: &'a temp::Cfg, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ) -> Result { let temp_dir = temp_cfg.new_directory()?; let mut archive = tar::Archive::new(stream); // The rust-installer packages unpack to a directory called // $pkgname-$version-$target. Skip that directory when // unpacking. - unpack_without_first_dir(&mut archive, &*temp_dir)?; + unpack_without_first_dir(&mut archive, &*temp_dir, notify_handler)?; Ok(TarPackage( DirectoryPackage::new(temp_dir.to_owned(), false)?, @@ -209,11 +214,122 @@ impl<'a> TarPackage<'a> { } } -fn unpack_without_first_dir(archive: &mut tar::Archive, path: &Path) -> Result<()> { +#[cfg(windows)] +mod unpacker { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use threadpool; + + use crate::utils::notifications::Notification; + + pub struct Unpacker<'a> { + n_files: Arc, + pool: threadpool::ThreadPool, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + } + + impl<'a> Unpacker<'a> { + pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker { + // Defaults to hardware thread count threads; this is suitable for + // our needs as IO bound operations tend to show up as write latencies + // rather than close latencies, so we don't need to look at + // more threads to get more IO dispatched at this stage in the process. + let pool = threadpool::Builder::new() + .thread_name("CloseHandle".into()) + .build(); + Unpacker { + n_files: Arc::new(AtomicUsize::new(0)), + pool: pool, + notify_handler: notify_handler, + } + } + + pub fn handle(&mut self, unpacked: tar::Unpacked) { + if let tar::Unpacked::File(f) = unpacked { + self.n_files.fetch_add(1, Ordering::Relaxed); + let n_files = self.n_files.clone(); + self.pool.execute(move || { + drop(f); + n_files.fetch_sub(1, Ordering::Relaxed); + }); + } + } + } + + impl<'a> Drop for Unpacker<'a> { + fn drop(&mut self) { + // Some explanation is in order. Even though the tar we are reading from (if + // any) will have had its FileWithProgress download tracking + // completed before we hit drop, that is not true if we are unwinding due to a + // failure, where the logical ownership of the progress bar is + // ambiguous, and as the tracker itself is abstracted out behind + // notifications etc we cannot just query for that. So: we assume no + // more reads of the underlying tar will take place: either the + // error unwinding will stop reads, or we completed; either way, we + // notify finished to the tracker to force a reset to zero; we set + // the units to files, show our progress, and set our units back + // afterwards. The largest archives today - rust docs - have ~20k + // items, and the download tracker's progress is confounded with + // actual handling of data today, we synthesis a data buffer and + // pretend to have bytes to deliver. + self.notify_handler + .map(|handler| handler(Notification::DownloadFinished)); + self.notify_handler + .map(|handler| handler(Notification::DownloadPushUnits("handles"))); + let mut prev_files = self.n_files.load(Ordering::Relaxed); + self.notify_handler.map(|handler| { + handler(Notification::DownloadContentLengthReceived( + prev_files as u64, + )) + }); + if prev_files > 50 { + println!("Closing {} deferred file handles", prev_files); + } + let buf: Vec = vec![0; prev_files]; + assert!(32767 > prev_files); + let mut current_files = prev_files; + while current_files != 0 { + use std::thread::sleep; + sleep(std::time::Duration::from_millis(100)); + prev_files = current_files; + current_files = self.n_files.load(Ordering::Relaxed); + let step_count = prev_files - current_files; + self.notify_handler.map(|handler| { + handler(Notification::DownloadDataReceived(&buf[0..step_count])) + }); + } + self.pool.join(); + self.notify_handler + .map(|handler| handler(Notification::DownloadFinished)); + self.notify_handler + .map(|handler| handler(Notification::DownloadPopUnits)); + } + } +} + +#[cfg(not(windows))] +mod unpacker { + use crate::utils::notifications::Notification; + pub struct Unpacker {} + impl Unpacker { + pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker { + Unpacker {} + } + pub fn handle(&mut self, _unpacked: tar::Unpacked) {} + } +} + +fn unpack_without_first_dir<'a, R: Read>( + archive: &mut tar::Archive, + path: &Path, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, +) -> Result<()> { + let mut unpacker = unpacker::Unpacker::new(notify_handler); let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; let mut checked_parents: HashSet = HashSet::new(); + for entry in entries { let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; let relpath = { @@ -249,6 +365,7 @@ fn unpack_without_first_dir(archive: &mut tar::Archive, path: &Path) entry.set_preserve_mtime(false); entry .unpack(&full_path) + .map(|unpacked| unpacker.handle(unpacked)) .chain_err(|| ErrorKind::ExtractingPackage)?; } @@ -277,9 +394,17 @@ impl<'a> Package for TarPackage<'a> { pub struct TarGzPackage<'a>(TarPackage<'a>); impl<'a> TarGzPackage<'a> { - pub fn new(stream: R, temp_cfg: &'a temp::Cfg) -> Result { + pub fn new( + stream: R, + temp_cfg: &'a temp::Cfg, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ) -> Result { let stream = flate2::read::GzDecoder::new(stream); - Ok(TarGzPackage(TarPackage::new(stream, temp_cfg)?)) + Ok(TarGzPackage(TarPackage::new( + stream, + temp_cfg, + notify_handler, + )?)) } } @@ -305,9 +430,17 @@ impl<'a> Package for TarGzPackage<'a> { pub struct TarXzPackage<'a>(TarPackage<'a>); impl<'a> TarXzPackage<'a> { - pub fn new(stream: R, temp_cfg: &'a temp::Cfg) -> Result { + pub fn new( + stream: R, + temp_cfg: &'a temp::Cfg, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ) -> Result { let stream = xz2::read::XzDecoder::new(stream); - Ok(TarXzPackage(TarPackage::new(stream, temp_cfg)?)) + Ok(TarXzPackage(TarPackage::new( + stream, + temp_cfg, + notify_handler, + )?)) } } diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 6583739e27..b4f4998c20 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -204,20 +204,20 @@ impl Manifestation { component.target.as_ref(), )); - let gz; - let xz; let notification_converter = |notification: crate::utils::Notification<'_>| { notify_handler(notification.into()); }; + let gz; + let xz; let reader = utils::FileReaderWithProgress::new_file(&installer_file, ¬ification_converter)?; let package: &dyn Package = match format { Format::Gz => { - gz = TarGzPackage::new(reader, temp_cfg)?; + gz = TarGzPackage::new(reader, temp_cfg, Some(¬ification_converter))?; &gz } Format::Xz => { - xz = TarXzPackage::new(reader, temp_cfg)?; + xz = TarXzPackage::new(reader, temp_cfg, Some(¬ification_converter))?; &xz } }; @@ -407,7 +407,8 @@ impl Manifestation { }; let reader = utils::FileReaderWithProgress::new_file(&installer_file, ¬ification_converter)?; - let package: &dyn Package = &TarGzPackage::new(reader, temp_cfg)?; + let package: &dyn Package = + &TarGzPackage::new(reader, temp_cfg, Some(¬ification_converter))?; for component in package.components() { tx = package.install(&self.installation, &component, None, tx)?; diff --git a/src/install.rs b/src/install.rs index cf10fdcd54..3b8a17ea58 100644 --- a/src/install.rs +++ b/src/install.rs @@ -89,7 +89,8 @@ impl<'a> InstallMethod<'a> { notify_handler(notification.into()); }; let reader = utils::FileReaderWithProgress::new_file(&src, ¬ification_converter)?; - let package: &dyn Package = &TarGzPackage::new(reader, temp_cfg)?; + let package: &dyn Package = + &TarGzPackage::new(reader, temp_cfg, Some(¬ification_converter))?; let mut tx = Transaction::new(prefix.clone(), temp_cfg, notify_handler);