diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index f4a6321dc..468fc5267 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,5 +1,11 @@ ## Unreleased +### Breaking changes +* ... + +### Other changes +* Fixed a bug that cause poor performance for sequential reads in some cases ([#488](https://github.com/awslabs/mountpoint-s3/pull/488)). A workaround we have previously shared for this issue (setting the `--max-threads` argument to `1`) is no longer necessary with this fix. ([#556](https://github.com/awslabs/mountpoint-s3/pull/556)) + ## v1.0.2 (September 22, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index c10b0f5c4..588eff121 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -11,6 +11,7 @@ pub mod checksummed_bytes; mod feed; mod part; mod part_queue; +mod seek_window; use std::collections::VecDeque; use std::fmt::Debug; @@ -18,7 +19,7 @@ use std::time::Duration; use futures::future::RemoteHandle; use futures::task::{Spawn, SpawnExt}; -use metrics::counter; +use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; @@ -29,7 +30,8 @@ use crate::prefetch::checksummed_bytes::{ChecksummedBytes, IntegrityError}; use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; -use crate::sync::{Arc, RwLock}; +use crate::prefetch::seek_window::SeekWindow; +use crate::sync::Arc; type TaskError = ObjectClientError::ClientError>; @@ -46,6 +48,9 @@ pub struct PrefetcherConfig { /// The maximum distance the prefetcher will seek forwards before resetting and starting a new /// S3 request pub max_forward_seek_distance: u64, + /// The maximum distance the prefetcher will seek backwards before resetting and starting a new + /// S3 request. We keep this much data in memory in addition to any inflight requests. + pub max_backward_seek_distance: u64, } impl Default for PrefetcherConfig { @@ -64,7 +69,12 @@ impl Default for PrefetcherConfig { max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), + // We want these large enough to tolerate a single out-of-order Linux readahead, which + // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also + // making a guess about where the optimal cut-off point is before it would be faster to + // just start a new request instead. max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 1 * 1024 * 1024, } } } @@ -115,9 +125,14 @@ where #[derive(Debug)] pub struct PrefetchGetObject { inner: Arc>, + // Invariant: the offset of the first byte in this task's part queue is always + // self.next_sequential_read_offset. current_task: Option>>, // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: Arc>>>>, + future_tasks: VecDeque>>, + // Invariant: the offset of the last byte in this window is always + // self.next_sequential_read_offset - 1. + backward_seek_window: SeekWindow, bucket: String, key: String, // preferred part size in the prefetcher's part queue, not the object part @@ -140,6 +155,7 @@ where inner: inner.clone(), current_task: None, future_tasks: Default::default(), + backward_seek_window: SeekWindow::new(inner.config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, next_request_size: inner.config.first_request_size, next_sequential_read_offset: 0, @@ -182,11 +198,11 @@ where } let mut to_read = (length as u64).min(remaining); - // Cancel and reset prefetching if this is an out-of-order read + // Try to seek if this read is not sequential, and if seeking fails, cancel and reset the + // prefetcher. if self.next_sequential_read_offset != offset { if self.try_seek(offset).await? { trace!("seek succeeded"); - counter!("prefetch.seek", 1); } else { trace!( expected = self.next_sequential_read_offset, @@ -217,6 +233,7 @@ where } Ok(part) => part, }; + self.backward_seek_window.push(part.clone()); let part_bytes = part.into_bytes(&self.key, self.next_sequential_read_offset).unwrap(); self.next_sequential_read_offset += part_bytes.len() as u64; @@ -236,7 +253,7 @@ where Err(e @ IntegrityError::ChecksumMismatch(_, _)) => { // cancel inflight tasks self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); return Err(e.into()); } } @@ -251,20 +268,23 @@ where let current_task = self.current_task.as_ref(); if current_task.map(|task| task.remaining == 0).unwrap_or(true) { // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.write().unwrap().pop_front() { + if let Some(next_task) = self.future_tasks.pop_front() { self.current_task = Some(next_task); return; } self.current_task = self.spawn_next_request(); } else if current_task - .map(|task| task.remaining <= task.total_size / 2) + .map(|task| { + // Don't trigger prefetch if we're in a fake task created by backward streaming + task.is_streaming() && task.remaining <= task.total_size / 2 + }) .unwrap_or(false) - && self.future_tasks.read().unwrap().is_empty() + && self.future_tasks.is_empty() { // The current task is nearing completion, so pre-spawn the next request in anticipation // of it completing. if let Some(task) = self.spawn_next_request() { - self.future_tasks.write().unwrap().push_back(task); + self.future_tasks.push_back(task); } } } @@ -307,7 +327,7 @@ where let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap(); Some(RequestTask { - _task_handle: task_handle, + task_handle: Some(task_handle), total_size: size as usize, remaining: size as usize, start_offset: start, @@ -329,7 +349,8 @@ where /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); + self.backward_seek_window.clear(); self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; self.next_request_offset = offset; @@ -342,70 +363,106 @@ where assert_ne!(offset, self.next_sequential_read_offset); trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); if offset > self.next_sequential_read_offset { - let mut seek_distance = offset - self.next_sequential_read_offset; - let Some(current_task) = self.current_task.as_mut() else { - // Can't seek if there's no requests in flight at all - return Ok(false); - }; - let future_remaining = self - .future_tasks - .read() - .unwrap() - .iter() - .map(|task| task.remaining) - .sum::() as u64; - if seek_distance - >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) - { - // TODO maybe adjust the next_request_size somehow if we were still within - // max_forward_seek_distance, so that strides > first_request_size can still get - // prefetched. - trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); - return Ok(false); - } + self.try_seek_forward(offset).await + } else { + self.try_seek_backward(offset) + } + } - // Jump ahead to the right request - if seek_distance >= current_task.remaining as u64 { - self.next_sequential_read_offset += current_task.remaining as u64; - self.current_task = None; - let mut future_tasks = self.future_tasks.write().unwrap(); - while let Some(next_request) = future_tasks.pop_front() { - if next_request.end_offset() > offset { - self.current_task = Some(next_request); - break; - } else { - self.next_sequential_read_offset = next_request.end_offset(); - } + async fn try_seek_forward(&mut self, offset: u64) -> Result>> { + assert!(offset > self.next_sequential_read_offset); + let total_seek_distance = offset - self.next_sequential_read_offset; + let Some(current_task) = self.current_task.as_mut() else { + // Can't seek if there's no requests in flight at all + return Ok(false); + }; + let future_remaining = self.future_tasks.iter().map(|task| task.remaining).sum::() as u64; + if total_seek_distance + >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + { + // TODO maybe adjust the next_request_size somehow if we were still within + // max_forward_seek_distance, so that strides > first_request_size can still get + // prefetched. + trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + return Ok(false); + } + + // Jump ahead to the right request + if total_seek_distance >= current_task.remaining as u64 { + self.next_sequential_read_offset += current_task.remaining as u64; + self.current_task = None; + while let Some(next_request) = self.future_tasks.pop_front() { + if next_request.end_offset() > offset { + self.current_task = Some(next_request); + break; + } else { + self.next_sequential_read_offset = next_request.end_offset(); } - // We checked there was an inflight task that contained the target offset, so this - // is impossible. - assert!(self.current_task.is_some()); - seek_distance = offset - self.next_sequential_read_offset; } + // We checked there was an inflight task that contained the target offset, so this + // is impossible. + assert!(self.current_task.is_some()); + // We could try harder to preserve the backwards seek buffer if we're near the + // request boundary, but it's probably not worth the trouble. + self.backward_seek_window.clear(); + } + let mut seek_distance = offset - self.next_sequential_read_offset; + + let current_task = self + .current_task + .as_mut() + .expect("a request existed that covered this seek offset"); + while seek_distance > 0 { + let part = current_task.read(seek_distance as usize).await?; + seek_distance -= part.len() as u64; + self.next_sequential_read_offset += part.len() as u64; + self.backward_seek_window.push(part); + } - let current_task = self - .current_task - .as_mut() - .expect("a request existed that covered this seek offset"); - while seek_distance > 0 { - let part = current_task.read(seek_distance as usize).await?; - seek_distance -= part.len() as u64; - self.next_sequential_read_offset += part.len() as u64; - } + histogram!("prefetch.seek_distance", total_seek_distance as f64, "dir" => "forward"); - Ok(true) - } else { - // TODO seek backwards - Ok(false) + Ok(true) + } + + fn try_seek_backward(&mut self, offset: u64) -> Result>> { + assert!(offset < self.next_sequential_read_offset); + let backwards_length_needed = self.next_sequential_read_offset - offset; + let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else { + trace!("seek failed: not enough data in backwards seek window"); + return Ok(false); + }; + // We're going to create a new fake "request" that contains the parts we read out of the + // window. That sounds a bit hacky, but it keeps all the read logic simple rather than + // needing separate paths for backwards seeks vs others. + let (part_queue, part_queue_producer) = unbounded_part_queue(); + for part in parts { + part_queue_producer.push(Ok(part)); } + let request = RequestTask { + task_handle: None, + remaining: backwards_length_needed as usize, + start_offset: offset, + total_size: backwards_length_needed as usize, + part_queue, + }; + if let Some(current_task) = self.current_task.take() { + self.future_tasks.push_front(current_task); + } + self.current_task = Some(request); + self.next_sequential_read_offset = offset; + + histogram!("prefetch.seek_distance", backwards_length_needed as f64, "dir" => "backward"); + + Ok(true) } } /// A single GetObject request submitted to the S3 client #[derive(Debug)] struct RequestTask { - /// Handle on the task/future. Future is cancelled when handle is dropped. - _task_handle: RemoteHandle<()>, + /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if + /// the request is fake (created by seeking backwards in the stream) + task_handle: Option>, remaining: usize, start_offset: u64, total_size: usize, @@ -423,6 +480,12 @@ impl RequestTask { fn end_offset(&self) -> u64 { self.start_offset + self.total_size as u64 } + + /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and + /// shouldn't be counted for prefetcher progress. + fn is_streaming(&self) -> bool { + self.task_handle.is_some() + } } #[derive(Debug, Error)] @@ -467,6 +530,8 @@ mod tests { client_part_size: usize, #[proptest(strategy = "1u64..4*1024*1024")] max_forward_seek_distance: u64, + #[proptest(strategy = "1u64..4*1024*1024")] + max_backward_seek_distance: u64, } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -486,6 +551,7 @@ mod tests { sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -514,6 +580,7 @@ mod tests { sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } @@ -526,6 +593,7 @@ mod tests { sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -538,6 +606,7 @@ mod tests { sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -599,6 +668,7 @@ mod tests { sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let mut get_failures = HashMap::new(); @@ -641,6 +711,7 @@ mod tests { max_request_size, read_timeout: Duration::from_secs(60), max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -671,6 +742,21 @@ mod tests { } } + #[test] + fn test_sequential_read_regression() { + let object_size = 854966; + let read_size = 161647; + let config = TestConfig { + first_request_size: 484941, + max_request_size: 81509, + sequential_prefetch_multiplier: 1, + client_part_size: 181682, + max_forward_seek_distance: 1, + max_backward_seek_distance: 18668, + }; + run_sequential_read_test(object_size, read_size, config); + } + fn run_random_read_test(object_size: u64, reads: Vec<(u64, usize)>, test_config: TestConfig) { let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -687,6 +773,7 @@ mod tests { max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, ..Default::default() }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -750,6 +837,7 @@ mod tests { sequential_prefetch_multiplier: 4, client_part_size: 516882, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_random_read_test(object_size, reads, config); } @@ -764,27 +852,52 @@ mod tests { sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_random_read_test(object_size, reads, config); } - #[test_case(61; "no seek")] - #[test_case(70; "same part seek")] - #[test_case(80; "next part seek")] - #[test_case(100; "next request first byte seek")] - #[test_case(101; "next request seek")] - #[test_case(130; "next request next part seek")] - #[test_case(199; "last byte seek")] - fn test_forward_seek(offset: u64) { + #[test] + fn test_random_read_regression3() { + let object_size = 755678; + let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 2260662, + max_backward_seek_distance: 2369799, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression4() { + let object_size = 14201; + let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; + let config = TestConfig { + first_request_size: 457999, + max_request_size: 863511, + sequential_prefetch_multiplier: 5, + client_part_size: 1972409, + max_forward_seek_distance: 2810651, + max_backward_seek_distance: 3531090, + }; + run_random_read_test(object_size, reads, config); + } + + #[test_case(0, 25; "no first read")] + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_forward_seek(first_read_size: usize, part_size: usize) { const OBJECT_SIZE: usize = 200; - const PART_SIZE: usize = 25; const FIRST_REQUEST_SIZE: usize = 100; - // Big enough to trigger the second request - const FIRST_READ_SIZE: usize = 60; let config = MockClientConfig { bucket: "test-bucket".to_string(), - part_size: PART_SIZE, + part_size, }; let client = MockClient::new(config); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); @@ -799,12 +912,54 @@ mod tests { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); - let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag); - let _first_read = block_on(request.read(0, FIRST_READ_SIZE)).unwrap(); + // Try every possible seek from first_read_size + for offset in first_read_size + 1..OBJECT_SIZE { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + } + + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_backward_seek(first_read_size: usize, part_size: usize) { + const OBJECT_SIZE: usize = 200; + const FIRST_REQUEST_SIZE: usize = 100; - let byte = block_on(request.read(offset, 1)).unwrap(); - let expected = ramp_bytes((0xaa + offset) as usize, 1); - assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + // Try every possible seek from first_read_size + for offset in 0..first_read_size { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } } #[cfg(feature = "shuttle")] diff --git a/mountpoint-s3/src/prefetch/checksummed_bytes.rs b/mountpoint-s3/src/prefetch/checksummed_bytes.rs index 7bf6dea13..4131c04d7 100644 --- a/mountpoint-s3/src/prefetch/checksummed_bytes.rs +++ b/mountpoint-s3/src/prefetch/checksummed_bytes.rs @@ -4,7 +4,7 @@ use thiserror::Error; /// A `ChecksummedBytes` is a bytes buffer that carries its checksum. /// The implementation guarantees that its integrity will be validated when data transformation occurs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChecksummedBytes { orig_bytes: Bytes, curr_slice: Bytes, diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index a2b12215a..8a8af6b25 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -7,7 +7,7 @@ use super::checksummed_bytes::ChecksummedBytes; // TODO this is not very efficient right now -- it forces a lot of copying around of Strings. If // that's a bottleneck, let's think about either carrying &str (hard to make lifetimes work?) or // the etag or some kind of "cookie" (like the hash of the key). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Part { key: String, offset: u64, diff --git a/mountpoint-s3/src/prefetch/seek_window.rs b/mountpoint-s3/src/prefetch/seek_window.rs new file mode 100644 index 000000000..f2b10c6ac --- /dev/null +++ b/mountpoint-s3/src/prefetch/seek_window.rs @@ -0,0 +1,78 @@ +use std::collections::VecDeque; + +use crate::prefetch::part::Part; + +/// A backwards seek window for a single prefetch stream. Parts can be pushed onto the end of the +/// window (== closest to the current offset in the stream) and older parts will be dropped to +/// remain within a maximum size. +#[derive(Debug)] +pub struct SeekWindow { + parts: VecDeque, + max_size: usize, + current_size: usize, +} + +impl SeekWindow { + pub fn new(max_size: usize) -> Self { + assert!(max_size > 0); + SeekWindow { + parts: VecDeque::new(), + max_size, + current_size: 0, + } + } + + /// Add a new part to the front of the window, and drop any parts necessary to fit the new part + /// within the maximum size. + pub fn push(&mut self, part: Part) { + if part.len() > self.max_size { + self.clear(); + return; + } + + while self.max_size - self.current_size < part.len() { + let p = self + .parts + .pop_front() + .expect("window is non-empty if current size is non-zero"); + self.current_size -= p.len(); + } + + self.current_size += part.len(); + self.parts.push_back(part); + } + + /// Read off the back of the window. Returns None if there's not enough data in the window to + /// satisfy the desired length. + pub fn read_back(&mut self, mut length: usize) -> Option> { + if length > self.current_size { + return None; + } + + let mut result = VecDeque::new(); + loop { + if length == 0 { + break; + } + let mut part = self.parts.pop_back().expect("we checked that current_size >= length"); + // If we only need some of this part, split it up and put the rest back onto the window + if part.len() > length { + let back = part.split_off(part.len() - length); + self.parts.push_back(part); + part = back; + } + length -= part.len(); + self.current_size -= part.len(); + // We're walking backwards through the queue, so to keep the result in object offset + // order, push to the front. + result.push_front(part); + } + Some(result.into()) + } + + /// Reset the seek window to an empty state + pub fn clear(&mut self) { + self.parts.drain(..); + self.current_size = 0; + } +}