From 39823c5d4cba7ae5d69472b4643c51ee7fdb7e16 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Thu, 12 Oct 2023 20:42:32 +0000 Subject: [PATCH] Allow seeking forwards within the prefetch stream Right now we reset the prefetcher any time it seeks forwards, even if the distance it's seeking could be handled by inflight requests (in the worst case, the bytes are already in our buffers, and we just throw them away). That's expensive and slow! This change allows us to seek forwards a limited distance into the prefetch stream. When we see a seek of an acceptable distance, we fast-forward through the stream to the desired target offset, dropping the skipped bytes on the floor. We enforce a maximum seek distance, which is a trade-off between streaming a lot of unnecessary bytes versus an extra request's latency. I haven't put any careful thought into the number. This commit also sets us up to support backwards seeking, which will come in the future. Signed-off-by: James Bornholt --- mountpoint-s3/src/prefetch.rs | 160 ++++++++++++++++++++++++++++++++-- 1 file changed, 151 insertions(+), 9 deletions(-) diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 310487802..c10b0f5c4 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -43,6 +43,9 @@ pub struct PrefetcherConfig { pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available pub read_timeout: Duration, + /// The maximum distance the prefetcher will seek forwards before resetting and starting a new + /// S3 request + pub max_forward_seek_distance: u64, } impl Default for PrefetcherConfig { @@ -61,6 +64,7 @@ impl Default for PrefetcherConfig { max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, } } } @@ -180,15 +184,20 @@ where // Cancel and reset prefetching if this is an out-of-order read if self.next_sequential_read_offset != offset { - trace!( - expected = self.next_sequential_read_offset, - actual = offset, - "out-of-order read, resetting prefetch" - ); - counter!("prefetch.out_of_order", 1); - self.reset_prefetch_to_offset(offset); + if self.try_seek(offset).await? { + trace!("seek succeeded"); + counter!("prefetch.seek", 1); + } else { + trace!( + expected = self.next_sequential_read_offset, + actual = offset, + "out-of-order read, resetting prefetch" + ); + counter!("prefetch.out_of_order", 1); + self.reset_prefetch_to_offset(offset); + } } - debug_assert_eq!(self.next_sequential_read_offset, offset); + assert_eq!(self.next_sequential_read_offset, offset); self.prepare_requests(); @@ -301,6 +310,7 @@ where _task_handle: task_handle, total_size: size as usize, remaining: size as usize, + start_offset: start, part_queue, }) } @@ -318,13 +328,77 @@ where /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - // TODO see if we can reuse any inflight requests rather than dropping them immediately self.current_task = None; self.future_tasks.write().unwrap().drain(..); self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; self.next_request_offset = offset; } + + /// Try to seek within the current inflight requests without restarting them. Returns true if + /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the + /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset. + async fn try_seek(&mut self, offset: u64) -> Result>> { + assert_ne!(offset, self.next_sequential_read_offset); + trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); + if offset > self.next_sequential_read_offset { + let mut seek_distance = offset - self.next_sequential_read_offset; + let Some(current_task) = self.current_task.as_mut() else { + // Can't seek if there's no requests in flight at all + return Ok(false); + }; + let future_remaining = self + .future_tasks + .read() + .unwrap() + .iter() + .map(|task| task.remaining) + .sum::() as u64; + if seek_distance + >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + { + // TODO maybe adjust the next_request_size somehow if we were still within + // max_forward_seek_distance, so that strides > first_request_size can still get + // prefetched. + trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + return Ok(false); + } + + // Jump ahead to the right request + if seek_distance >= current_task.remaining as u64 { + self.next_sequential_read_offset += current_task.remaining as u64; + self.current_task = None; + let mut future_tasks = self.future_tasks.write().unwrap(); + while let Some(next_request) = future_tasks.pop_front() { + if next_request.end_offset() > offset { + self.current_task = Some(next_request); + break; + } else { + self.next_sequential_read_offset = next_request.end_offset(); + } + } + // We checked there was an inflight task that contained the target offset, so this + // is impossible. + assert!(self.current_task.is_some()); + seek_distance = offset - self.next_sequential_read_offset; + } + + let current_task = self + .current_task + .as_mut() + .expect("a request existed that covered this seek offset"); + while seek_distance > 0 { + let part = current_task.read(seek_distance as usize).await?; + seek_distance -= part.len() as u64; + self.next_sequential_read_offset += part.len() as u64; + } + + Ok(true) + } else { + // TODO seek backwards + Ok(false) + } + } } /// A single GetObject request submitted to the S3 client @@ -333,6 +407,7 @@ struct RequestTask { /// Handle on the task/future. Future is cancelled when handle is dropped. _task_handle: RemoteHandle<()>, remaining: usize, + start_offset: u64, total_size: usize, part_queue: PartQueue, } @@ -344,6 +419,10 @@ impl RequestTask { self.remaining -= part.len(); Ok(part) } + + fn end_offset(&self) -> u64 { + self.start_offset + self.total_size as u64 + } } #[derive(Debug, Error)] @@ -386,6 +465,8 @@ mod tests { sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] client_part_size: usize, + #[proptest(strategy = "1u64..4*1024*1024")] + max_forward_seek_distance: u64, } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -404,6 +485,7 @@ mod tests { max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), + max_forward_seek_distance: test_config.max_forward_seek_distance, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -431,6 +513,7 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } @@ -442,6 +525,7 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -453,6 +537,7 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -513,6 +598,7 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, }; let mut get_failures = HashMap::new(); @@ -554,6 +640,7 @@ mod tests { sequential_prefetch_multiplier: prefetch_multiplier, max_request_size, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -599,6 +686,7 @@ mod tests { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + max_forward_seek_distance: test_config.max_forward_seek_distance, ..Default::default() }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -661,10 +749,64 @@ mod tests { max_request_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, + max_forward_seek_distance: 16 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression2() { + let object_size = 755678; + let reads = vec![(0, 278499), (311250, 1)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 16 * 1024 * 1024, }; run_random_read_test(object_size, reads, config); } + #[test_case(61; "no seek")] + #[test_case(70; "same part seek")] + #[test_case(80; "next part seek")] + #[test_case(100; "next request first byte seek")] + #[test_case(101; "next request seek")] + #[test_case(130; "next request next part seek")] + #[test_case(199; "last byte seek")] + fn test_forward_seek(offset: u64) { + const OBJECT_SIZE: usize = 200; + const PART_SIZE: usize = 25; + const FIRST_REQUEST_SIZE: usize = 100; + // Big enough to trigger the second request + const FIRST_READ_SIZE: usize = 60; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: PART_SIZE, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag); + let _first_read = block_on(request.read(0, FIRST_READ_SIZE)).unwrap(); + + let byte = block_on(request.read(offset, 1)).unwrap(); + let expected = ramp_bytes((0xaa + offset) as usize, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*;