diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index f4a6321dc..468fc5267 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,5 +1,11 @@ ## Unreleased +### Breaking changes +* ... + +### Other changes +* Fixed a bug that cause poor performance for sequential reads in some cases ([#488](https://github.com/awslabs/mountpoint-s3/pull/488)). A workaround we have previously shared for this issue (setting the `--max-threads` argument to `1`) is no longer necessary with this fix. ([#556](https://github.com/awslabs/mountpoint-s3/pull/556)) + ## v1.0.2 (September 22, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 310487802..b4717bc0b 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -11,6 +11,7 @@ pub mod checksummed_bytes; mod feed; mod part; mod part_queue; +mod seek_window; use std::collections::VecDeque; use std::fmt::Debug; @@ -18,7 +19,7 @@ use std::time::Duration; use futures::future::RemoteHandle; use futures::task::{Spawn, SpawnExt}; -use metrics::counter; +use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; @@ -29,7 +30,8 @@ use crate::prefetch::checksummed_bytes::{ChecksummedBytes, IntegrityError}; use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; -use crate::sync::{Arc, RwLock}; +use crate::prefetch::seek_window::SeekWindow; +use crate::sync::Arc; type TaskError = ObjectClientError::ClientError>; @@ -43,6 +45,12 @@ pub struct PrefetcherConfig { pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available pub read_timeout: Duration, + /// The maximum distance the prefetcher will seek forwards before resetting and starting a new + /// S3 request + pub max_forward_seek_distance: u64, + /// The maximum distance the prefetcher will seek backwards before resetting and starting a new + /// S3 request. We keep this much data in memory in addition to any inflight requests. + pub max_backward_seek_distance: u64, } impl Default for PrefetcherConfig { @@ -61,6 +69,12 @@ impl Default for PrefetcherConfig { max_request_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), + // We want these large enough to tolerate a single out-of-order Linux readahead, which + // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also + // making a guess about where the optimal cut-off point is before it would be faster to + // just start a new request instead. + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 1 * 1024 * 1024, } } } @@ -111,9 +125,14 @@ where #[derive(Debug)] pub struct PrefetchGetObject { inner: Arc>, + // Invariant: the offset of the first byte in this task's part queue is always + // self.next_sequential_read_offset. current_task: Option>>, // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: Arc>>>>, + future_tasks: VecDeque>>, + // Invariant: the offset of the last byte in this window is always + // self.next_sequential_read_offset - 1. + backward_seek_window: SeekWindow, bucket: String, key: String, // preferred part size in the prefetcher's part queue, not the object part @@ -136,6 +155,7 @@ where inner: inner.clone(), current_task: None, future_tasks: Default::default(), + backward_seek_window: SeekWindow::new(inner.config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, next_request_size: inner.config.first_request_size, next_sequential_read_offset: 0, @@ -178,17 +198,22 @@ where } let mut to_read = (length as u64).min(remaining); - // Cancel and reset prefetching if this is an out-of-order read + // Try to seek if this read is not sequential, and if seeking fails, cancel and reset the + // prefetcher. if self.next_sequential_read_offset != offset { - trace!( - expected = self.next_sequential_read_offset, - actual = offset, - "out-of-order read, resetting prefetch" - ); - counter!("prefetch.out_of_order", 1); - self.reset_prefetch_to_offset(offset); + if self.try_seek(offset).await? { + trace!("seek succeeded"); + } else { + trace!( + expected = self.next_sequential_read_offset, + actual = offset, + "out-of-order read, resetting prefetch" + ); + counter!("prefetch.out_of_order", 1); + self.reset_prefetch_to_offset(offset); + } } - debug_assert_eq!(self.next_sequential_read_offset, offset); + assert_eq!(self.next_sequential_read_offset, offset); self.prepare_requests(); @@ -208,6 +233,7 @@ where } Ok(part) => part, }; + self.backward_seek_window.push(part.clone()); let part_bytes = part.into_bytes(&self.key, self.next_sequential_read_offset).unwrap(); self.next_sequential_read_offset += part_bytes.len() as u64; @@ -227,7 +253,7 @@ where Err(e @ IntegrityError::ChecksumMismatch(_, _)) => { // cancel inflight tasks self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); return Err(e.into()); } } @@ -242,20 +268,23 @@ where let current_task = self.current_task.as_ref(); if current_task.map(|task| task.remaining == 0).unwrap_or(true) { // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.write().unwrap().pop_front() { + if let Some(next_task) = self.future_tasks.pop_front() { self.current_task = Some(next_task); return; } self.current_task = self.spawn_next_request(); } else if current_task - .map(|task| task.remaining <= task.total_size / 2) + .map(|task| { + // Don't trigger prefetch if we're in a fake task created by backward streaming + task.is_streaming() && task.remaining <= task.total_size / 2 + }) .unwrap_or(false) - && self.future_tasks.read().unwrap().is_empty() + && self.future_tasks.is_empty() { // The current task is nearing completion, so pre-spawn the next request in anticipation // of it completing. if let Some(task) = self.spawn_next_request() { - self.future_tasks.write().unwrap().push_back(task); + self.future_tasks.push_back(task); } } } @@ -298,9 +327,10 @@ where let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap(); Some(RequestTask { - _task_handle: task_handle, + task_handle: Some(task_handle), total_size: size as usize, remaining: size as usize, + start_offset: start, part_queue, }) } @@ -308,7 +338,11 @@ where /// Suggest next request size. /// The next request size is the current request size multiplied by sequential prefetch multiplier. fn get_next_request_size(&self) -> usize { - // calculate next request size + // TODO: this logic doesn't work well right now in the case where part_size < + // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly + // shrinking the request size until it reaches 1. But this isn't a configuration we + // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a + // prefetcher with multiplier 1 is not very good). let next_request_size = (self.next_request_size * self.inner.config.sequential_prefetch_multiplier) .min(self.inner.config.max_request_size); self.inner @@ -318,21 +352,123 @@ where /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - // TODO see if we can reuse any inflight requests rather than dropping them immediately self.current_task = None; - self.future_tasks.write().unwrap().drain(..); + self.future_tasks.drain(..); + self.backward_seek_window.clear(); self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; self.next_request_offset = offset; } + + /// Try to seek within the current inflight requests without restarting them. Returns true if + /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the + /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset. + async fn try_seek(&mut self, offset: u64) -> Result>> { + assert_ne!(offset, self.next_sequential_read_offset); + trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); + if offset > self.next_sequential_read_offset { + self.try_seek_forward(offset).await + } else { + self.try_seek_backward(offset) + } + } + + async fn try_seek_forward(&mut self, offset: u64) -> Result>> { + assert!(offset > self.next_sequential_read_offset); + let total_seek_distance = offset - self.next_sequential_read_offset; + let Some(current_task) = self.current_task.as_mut() else { + // Can't seek if there's no requests in flight at all + return Ok(false); + }; + let future_remaining = self.future_tasks.iter().map(|task| task.remaining).sum::() as u64; + if total_seek_distance + >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + { + // TODO maybe adjust the next_request_size somehow if we were still within + // max_forward_seek_distance, so that strides > first_request_size can still get + // prefetched. + trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + return Ok(false); + } + + // Jump ahead to the right request + if total_seek_distance >= current_task.remaining as u64 { + self.next_sequential_read_offset += current_task.remaining as u64; + self.current_task = None; + while let Some(next_request) = self.future_tasks.pop_front() { + if next_request.end_offset() > offset { + self.current_task = Some(next_request); + break; + } else { + self.next_sequential_read_offset = next_request.end_offset(); + } + } + // We checked there was an inflight task that contained the target offset, so this + // is impossible. + assert!(self.current_task.is_some()); + // We could try harder to preserve the backwards seek buffer if we're near the + // request boundary, but it's probably not worth the trouble. + self.backward_seek_window.clear(); + } + let mut seek_distance = offset - self.next_sequential_read_offset; + + let current_task = self + .current_task + .as_mut() + .expect("a request existed that covered this seek offset"); + while seek_distance > 0 { + let part = current_task.read(seek_distance as usize).await?; + seek_distance -= part.len() as u64; + self.next_sequential_read_offset += part.len() as u64; + self.backward_seek_window.push(part); + } + + histogram!("prefetch.seek_distance", total_seek_distance as f64, "dir" => "forward"); + + Ok(true) + } + + fn try_seek_backward(&mut self, offset: u64) -> Result>> { + assert!(offset < self.next_sequential_read_offset); + let backwards_length_needed = self.next_sequential_read_offset - offset; + let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else { + trace!("seek failed: not enough data in backwards seek window"); + return Ok(false); + }; + // We're going to create a new fake "request" that contains the parts we read out of the + // window. That sounds a bit hacky, but it keeps all the read logic simple rather than + // needing separate paths for backwards seeks vs others. + let (part_queue, part_queue_producer) = unbounded_part_queue(); + for part in parts { + part_queue_producer.push(Ok(part)); + } + let request = RequestTask { + task_handle: None, + remaining: backwards_length_needed as usize, + start_offset: offset, + total_size: backwards_length_needed as usize, + part_queue, + }; + if let Some(current_task) = self.current_task.take() { + self.future_tasks.push_front(current_task); + } + self.current_task = Some(request); + self.next_sequential_read_offset = offset; + + histogram!("prefetch.seek_distance", backwards_length_needed as f64, "dir" => "backward"); + + Ok(true) + } } /// A single GetObject request submitted to the S3 client #[derive(Debug)] struct RequestTask { - /// Handle on the task/future. Future is cancelled when handle is dropped. - _task_handle: RemoteHandle<()>, + /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if + /// the request is fake (created by seeking backwards in the stream) + task_handle: Option>, remaining: usize, + start_offset: u64, total_size: usize, part_queue: PartQueue, } @@ -344,6 +480,16 @@ impl RequestTask { self.remaining -= part.len(); Ok(part) } + + fn end_offset(&self) -> u64 { + self.start_offset + self.total_size as u64 + } + + /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and + /// shouldn't be counted for prefetcher progress. + fn is_streaming(&self) -> bool { + self.task_handle.is_some() + } } #[derive(Debug, Error)] @@ -386,6 +532,10 @@ mod tests { sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] client_part_size: usize, + #[proptest(strategy = "1u64..4*1024*1024")] + max_forward_seek_distance: u64, + #[proptest(strategy = "1u64..4*1024*1024")] + max_backward_seek_distance: u64, } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -404,6 +554,8 @@ mod tests { max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), + max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -431,6 +583,8 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } @@ -442,6 +596,8 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -453,6 +609,8 @@ mod tests { max_request_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -513,6 +671,8 @@ mod tests { max_request_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let mut get_failures = HashMap::new(); @@ -554,6 +714,8 @@ mod tests { sequential_prefetch_multiplier: prefetch_multiplier, max_request_size, read_timeout: Duration::from_secs(60), + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); @@ -584,6 +746,21 @@ mod tests { } } + #[test] + fn test_sequential_read_regression() { + let object_size = 854966; + let read_size = 161647; + let config = TestConfig { + first_request_size: 484941, + max_request_size: 81509, + sequential_prefetch_multiplier: 1, + client_part_size: 181682, + max_forward_seek_distance: 1, + max_backward_seek_distance: 18668, + }; + run_sequential_read_test(object_size, read_size, config); + } + fn run_random_read_test(object_size: u64, reads: Vec<(u64, usize)>, test_config: TestConfig) { let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -599,6 +776,8 @@ mod tests { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + max_forward_seek_distance: test_config.max_forward_seek_distance, + max_backward_seek_distance: test_config.max_backward_seek_distance, ..Default::default() }; let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -661,10 +840,132 @@ mod tests { max_request_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression2() { + let object_size = 755678; + let reads = vec![(0, 278499), (311250, 1)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + }; + run_random_read_test(object_size, reads, config); + } + + #[test] + fn test_random_read_regression3() { + let object_size = 755678; + let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; + let config = TestConfig { + first_request_size: 556997, + max_request_size: 105938, + sequential_prefetch_multiplier: 7, + client_part_size: 1219731, + max_forward_seek_distance: 2260662, + max_backward_seek_distance: 2369799, }; run_random_read_test(object_size, reads, config); } + #[test] + fn test_random_read_regression4() { + let object_size = 14201; + let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; + let config = TestConfig { + first_request_size: 457999, + max_request_size: 863511, + sequential_prefetch_multiplier: 5, + client_part_size: 1972409, + max_forward_seek_distance: 2810651, + max_backward_seek_distance: 3531090, + }; + run_random_read_test(object_size, reads, config); + } + + #[test_case(0, 25; "no first read")] + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_forward_seek(first_read_size: usize, part_size: usize) { + const OBJECT_SIZE: usize = 200; + const FIRST_REQUEST_SIZE: usize = 100; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + // Try every possible seek from first_read_size + for offset in first_read_size + 1..OBJECT_SIZE { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + } + + #[test_case(60, 25; "read beyond first part")] + #[test_case(20, 25; "read in first part")] + #[test_case(125, 110; "read in second request")] + fn test_backward_seek(first_read_size: usize, part_size: usize) { + const OBJECT_SIZE: usize = 200; + const FIRST_REQUEST_SIZE: usize = 100; + + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size, + }; + let client = MockClient::new(config); + let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); + let etag = object.etag(); + + client.add_object("hello", object); + + let test_config = PrefetcherConfig { + first_request_size: FIRST_REQUEST_SIZE, + ..Default::default() + }; + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + // Try every possible seek from first_read_size + for offset in 0..first_read_size { + let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + if first_read_size > 0 { + let _first_read = block_on(request.read(0, first_read_size)).unwrap(); + } + + let byte = block_on(request.read(offset as u64, 1)).unwrap(); + let expected = ramp_bytes(0xaa + offset, 1); + assert_eq!(byte.into_bytes().unwrap()[..], expected[..]); + } + } + #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*; @@ -686,8 +987,10 @@ mod tests { let object_size = rng.gen_range(1u64..1 * 1024 * 1024); let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let sequential_prefetch_multiplier = rng.gen_range(1usize..16); - let part_size = rng.gen_range(16usize..2 * 1024 * 1024); + let sequential_prefetch_multiplier = rng.gen_range(2usize..16); + let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let max_forward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); + let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -703,6 +1006,8 @@ mod tests { first_request_size, max_request_size, sequential_prefetch_multiplier, + max_forward_seek_distance, + max_backward_seek_distance, ..Default::default() }; @@ -739,8 +1044,10 @@ mod tests { // under Shuttle (lots of concurrent tasks) let max_object_size = first_request_size.min(max_request_size) * 20; let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); - let sequential_prefetch_multiplier = rng.gen_range(1usize..16); + let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let max_forward_seek_distance = rng.gen_range(16u64..192 * 1024); + let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -756,6 +1063,8 @@ mod tests { first_request_size, max_request_size, sequential_prefetch_multiplier, + max_forward_seek_distance, + max_backward_seek_distance, ..Default::default() }; diff --git a/mountpoint-s3/src/prefetch/checksummed_bytes.rs b/mountpoint-s3/src/prefetch/checksummed_bytes.rs index 7bf6dea13..4131c04d7 100644 --- a/mountpoint-s3/src/prefetch/checksummed_bytes.rs +++ b/mountpoint-s3/src/prefetch/checksummed_bytes.rs @@ -4,7 +4,7 @@ use thiserror::Error; /// A `ChecksummedBytes` is a bytes buffer that carries its checksum. /// The implementation guarantees that its integrity will be validated when data transformation occurs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChecksummedBytes { orig_bytes: Bytes, curr_slice: Bytes, diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index a2b12215a..8a8af6b25 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -7,7 +7,7 @@ use super::checksummed_bytes::ChecksummedBytes; // TODO this is not very efficient right now -- it forces a lot of copying around of Strings. If // that's a bottleneck, let's think about either carrying &str (hard to make lifetimes work?) or // the etag or some kind of "cookie" (like the hash of the key). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Part { key: String, offset: u64, diff --git a/mountpoint-s3/src/prefetch/seek_window.rs b/mountpoint-s3/src/prefetch/seek_window.rs new file mode 100644 index 000000000..f2b10c6ac --- /dev/null +++ b/mountpoint-s3/src/prefetch/seek_window.rs @@ -0,0 +1,78 @@ +use std::collections::VecDeque; + +use crate::prefetch::part::Part; + +/// A backwards seek window for a single prefetch stream. Parts can be pushed onto the end of the +/// window (== closest to the current offset in the stream) and older parts will be dropped to +/// remain within a maximum size. +#[derive(Debug)] +pub struct SeekWindow { + parts: VecDeque, + max_size: usize, + current_size: usize, +} + +impl SeekWindow { + pub fn new(max_size: usize) -> Self { + assert!(max_size > 0); + SeekWindow { + parts: VecDeque::new(), + max_size, + current_size: 0, + } + } + + /// Add a new part to the front of the window, and drop any parts necessary to fit the new part + /// within the maximum size. + pub fn push(&mut self, part: Part) { + if part.len() > self.max_size { + self.clear(); + return; + } + + while self.max_size - self.current_size < part.len() { + let p = self + .parts + .pop_front() + .expect("window is non-empty if current size is non-zero"); + self.current_size -= p.len(); + } + + self.current_size += part.len(); + self.parts.push_back(part); + } + + /// Read off the back of the window. Returns None if there's not enough data in the window to + /// satisfy the desired length. + pub fn read_back(&mut self, mut length: usize) -> Option> { + if length > self.current_size { + return None; + } + + let mut result = VecDeque::new(); + loop { + if length == 0 { + break; + } + let mut part = self.parts.pop_back().expect("we checked that current_size >= length"); + // If we only need some of this part, split it up and put the rest back onto the window + if part.len() > length { + let back = part.split_off(part.len() - length); + self.parts.push_back(part); + part = back; + } + length -= part.len(); + self.current_size -= part.len(); + // We're walking backwards through the queue, so to keep the result in object offset + // order, push to the front. + result.push_front(part); + } + Some(result.into()) + } + + /// Reset the seek window to an empty state + pub fn clear(&mut self) { + self.parts.drain(..); + self.current_size = 0; + } +}