diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 310487802..c10b0f5c4 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -43,6 +43,9 @@ pub struct PrefetcherConfig { pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available pub read_timeout: Duration, + /// The maximum distance the prefetcher will seek forwards before resetting and starting a new + /// S3 request + pub max_forward_seek_distance: u64, } impl Default for PrefetcherConfig { @@ -61,6 +64,7 @@ impl Default for PrefetcherConfig { max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, } } } @@ -180,15 +184,20 @@ where // Cancel and reset prefetching if this is an out-of-order read if self.next_sequential_read_offset != offset { - trace!( - expected = self.next_sequential_read_offset, - actual = offset, - "out-of-order read, resetting prefetch" - ); - counter!("prefetch.out_of_order", 1); - self.reset_prefetch_to_offset(offset); + if self.try_seek(offset).await? { + trace!("seek succeeded"); + counter!("prefetch.seek", 1); + } else { + trace!( + expected = self.next_sequential_read_offset, + actual = offset, + "out-of-order read, resetting prefetch" + ); + counter!("prefetch.out_of_order", 1); + self.reset_prefetch_to_offset(offset); + } } - debug_assert_eq!(self.next_sequential_read_offset, offset); + assert_eq!(self.next_sequential_read_offset, offset); self.prepare_requests(); @@ -301,6 +310,7 @@ where _task_handle: task_handle, total_size: size as usize, remaining: size as usize, + start_offset: start, part_queue, }) } @@ -318,13 +328,77 @@ where /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - // TODO see if we can reuse any inflight requests rather than dropping them immediately self.current_task = None; self.future_tasks.write().unwrap().drain(..); self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; self.next_request_offset = offset; } + + /// Try to seek within the current inflight requests without restarting them. Returns true if + /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the + /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset. + async fn try_seek(&mut self, offset: u64) -> Result>> { + assert_ne!(offset, self.next_sequential_read_offset); + trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); + if offset > self.next_sequential_read_offset { + let mut seek_distance = offset - self.next_sequential_read_offset; + let Some(current_task) = self.current_task.as_mut() else { + // Can't seek if there's no requests in flight at all + return Ok(false); + }; + let future_remaining = self + .future_tasks + .read() + .unwrap() + .iter() + .map(|task| task.remaining) + .sum::() as u64; + if seek_distance + >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + { + // TODO maybe adjust the next_request_size somehow if we were still within + // max_forward_seek_distance, so that strides > first_request_size can still get + // prefetched. + trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + return Ok(false); + } + + // Jump ahead to the right request + if seek_distance >= current_task.remaining as u64 { + self.next_sequential_read_offset += current_task.remaining as u64; + self.current_task = None; + let mut future_tasks = self.future_tasks.write().unwrap(); + while let Some(next_request) = future_tasks.pop_front() { + if next_request.end_offset() > offset { + self.current_task = Some(next_request); + break; + } else { + self.next_sequential_read_offset = next_request.end_offset(); + } + } + // We checked there was an inflight task that contained the target offset, so this + // is impossible. + assert!(self.current_task.is_some()); + seek_distance = offset - self.next_sequential_read_offset; + } + + let current_task = self + .current_task + .as_mut() + .expect("a request existed that covered this seek offset"); + while seek_distance > 0 { + let part = current_task.read(seek_distance as usize).await?; + seek_distance -= part.len() as u64; + self.next_sequential_read_offset += part.len() as u64; + } + + Ok(true) + } else { + // TODO seek backwards + Ok(false) + } + } } /// A single GetObject request submitted to the S3 client @@ -333,6 +407,7 @@ struct RequestTask { /// Handle on the task/future. Future is cancelled when handle is dropped. _task_handle: RemoteHandle<()>, remaining: usize, + start_offset: u64, total_size: usize, part_queue: PartQueue, } @@ -344,6 +419,10 @@ impl RequestTask { self.remaining -= part.len(); Ok(part) } + + fn end_offset(&self) -> u64 { + self.start_offset + self.total_size as u64 + } } #[derive(Debug, Error)] @@ -386,6 +465,8 @@ mod tests { sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] client_part_size: usize, + #[proptest(strategy = "1u64..4*1024*1024")] + max_forward_seek_distance: u64, } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -404,6 +485,7 @@ mod tests { max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), + max_forward_seek_distance: test_config.max_forward_seek_distance, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -431,6 +513,7 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } @@ -442,6 +525,7 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -453,6 +537,7 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -513,6 +598,7 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; let mut get_failures = HashMap::new(); @@ -554,6 +640,7 @@ mod tests { sequential_prefetch_multiplier: prefetch_multiplier, max_request_size, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -599,6 +686,7 @@ mod tests { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + max_forward_seek_distance: test_config.max_forward_seek_distance, ..Default::default() }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -661,10 +749,64 @@ mod tests { max_request_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, + max_forward_seek_distance: 16 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression2() { + let object_size = 755678; + let reads = vec![(0, 278499), (311250, 1)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_random_read_test(object_size, reads, config); } + #[test_case(61; "no seek")] + #[test_case(70; "same part seek")] + #[test_case(80; "next part seek")] + #[test_case(100; "next request first byte seek")] + #[test_case(101; "next request seek")] + #[test_case(130; "next request next part seek")] + #[test_case(199; "last byte seek")] + fn test_forward_seek(offset: u64) { + const OBJECT_SIZE: usize = 200; + const PART_SIZE: usize = 25; + const FIRST_REQUEST_SIZE: usize = 100; + // Big enough to trigger the second request + const FIRST_READ_SIZE: usize = 60; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: PART_SIZE, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag); + let _first_read = block_on(request.read(0, FIRST_READ_SIZE)).unwrap(); + + let byte = block_on(request.read(offset, 1)).unwrap(); + let expected = ramp_bytes((0xaa + offset) as usize, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*;