diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 31a3a632e..6aaa89acd 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -6,7 +6,7 @@ use std::ffi::OsStr; use std::path::Path; use std::time::{Duration, SystemTime}; use time::OffsetDateTime; -use tracing::{instrument, Instrument}; +use tracing::{instrument, warn, Instrument}; use crate::fs::{self, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno}; use crate::prefix::Prefix; @@ -69,6 +69,12 @@ where { #[instrument(level="warn", skip_all, fields(req=_req.unique()))] fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> { + if let Err(closest_valid) = config.set_max_readahead(0) { + warn!( + "failed to configure readahead to zero, closest accepted value is {:?}", + closest_valid, + ); + } block_on(self.fs.init(config).in_current_span()) } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 96fe01be1..822f690cc 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -51,15 +51,7 @@ impl Default for PrefetcherConfig { fn default() -> Self { #[allow(clippy::identity_op)] Self { - // This is a weird looking number! We really want our first request size to be 1MiB, - // which is a common IO size. But Linux's readahead will try to read an extra 128k on on - // top of a 1MiB read, which we'd have to wait for a second request to service. Because - // FUSE doesn't know the difference between regular reads and readahead reads, it will - // send us a READ request for that 128k, so we'll have to block waiting for it even if - // the application doesn't want it. This is all in the noise for sequential IO, but - // waiting for the readahead hurts random IO. So we add 128k to the first request size - // to avoid the latency hit of the second request. - first_request_size: 1 * 1024 * 1024 + 128 * 1024, + first_request_size: 1 * 1024 * 1024 * 1024, max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), @@ -113,12 +105,12 @@ pub struct PrefetchGetObject { future_tasks: Arc>>>>, bucket: String, key: String, - // preferred part size in the prefetcher's part queue, not the object part + /// Preferred part size in the prefetcher's part queue, not the object part preferred_part_size: usize, next_sequential_read_offset: u64, next_request_size: usize, next_request_offset: u64, - size: u64, + object_size: u64, etag: ETag, } @@ -128,7 +120,13 @@ where Runtime: Spawn, { /// Create and spawn a new prefetching request for an object - fn new(inner: Arc>, bucket: &str, key: &str, size: u64, etag: ETag) -> Self { + fn new( + inner: Arc>, + bucket: &str, + key: &str, + object_size: u64, + etag: ETag, + ) -> Self { PrefetchGetObject { inner: inner.clone(), current_task: None, @@ -139,7 +137,7 @@ where next_request_offset: 0, bucket: bucket.to_owned(), key: key.to_owned(), - size, + object_size, etag, } } @@ -159,17 +157,14 @@ where "read" ); - // Currently, we set preferred part size to the current read size. + // We set preferred part size to the largest read size we've seen so far, + // bounded by 128k at minimum (typical readahead size for Linux when not disabled) and 1MiB at maximum. + // Our assumption is that the read size will be the same for most sequential // read and it can be aligned to the size of prefetched chunks. - // - // We initialize this value to 128k as it is the Linux's readahead size - // and it can also be used as a lower bound in case the read size is too small. - // The upper bound is 1MiB since it should be a common IO size. - let max_preferred_part_size = 1024 * 1024; - self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size); - - let remaining = self.size.saturating_sub(offset); + self.preferred_part_size = self.preferred_part_size.clamp(length, 1024 * 1024); + + let remaining = self.object_size.saturating_sub(offset); if remaining == 0 { return Ok(ChecksummedBytes::default()); } @@ -260,9 +255,9 @@ where /// Spawn the next required request fn spawn_next_request(&mut self) -> Option>> { let start = self.next_request_offset; - let end = (start + self.next_request_size as u64).min(self.size); + let end = (start + self.next_request_size as u64).min(self.object_size); - if start >= self.size { + if start >= self.object_size { return None; } @@ -389,6 +384,9 @@ struct RequestTask { } impl RequestTask { + /// Read up to `length` bytes from the part queue. The returned part may be smaller than `length`. + /// + /// Task must be discarded if an [Err] is returned. async fn read(&mut self, length: usize) -> Result> { let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); diff --git a/vendor/fuser/src/lib.rs b/vendor/fuser/src/lib.rs index 15207252b..cffa33fa8 100644 --- a/vendor/fuser/src/lib.rs +++ b/vendor/fuser/src/lib.rs @@ -214,9 +214,6 @@ impl KernelConfig { /// /// On success returns the previous value. On error returns the nearest value which will succeed pub fn set_max_readahead(&mut self, value: u32) -> Result { - if value == 0 { - return Err(1); - } if value > self.max_max_readahead { return Err(self.max_max_readahead); }