Skip to content

Commit

Permalink
Allow seeking forwards within the prefetch stream
Browse files Browse the repository at this point in the history
Right now we reset the prefetcher any time it seeks forwards, even if
the distance it's seeking could be handled by inflight requests (in the
worst case, the bytes are already in our buffers, and we just throw them
away). That's expensive and slow!

This change allows us to seek forwards a limited distance into the
prefetch stream. When we see a seek of an acceptable distance, we
fast-forward through the stream to the desired target offset, dropping
the skipped bytes on the floor. We enforce a maximum seek distance,
which is a trade-off between streaming a lot of unnecessary bytes versus
an extra request's latency. I haven't put any careful thought into the
number.

This commit also sets us up to support backwards seeking, which will
come in the future.

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt committed Oct 16, 2023
1 parent 8490e8b commit 39823c5
Showing 1 changed file with 151 additions and 9 deletions.
160 changes: 151 additions & 9 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub struct PrefetcherConfig {
pub sequential_prefetch_multiplier: usize,
/// Timeout to wait for a part to become available
pub read_timeout: Duration,
/// The maximum distance the prefetcher will seek forwards before resetting and starting a new
/// S3 request
pub max_forward_seek_distance: u64,
}

impl Default for PrefetcherConfig {
Expand All @@ -61,6 +64,7 @@ impl Default for PrefetcherConfig {
max_request_size: 2 * 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
read_timeout: Duration::from_secs(60),
max_forward_seek_distance: 16 * 1024 * 1024,
}
}
}
Expand Down Expand Up @@ -180,15 +184,20 @@ where

// Cancel and reset prefetching if this is an out-of-order read
if self.next_sequential_read_offset != offset {
trace!(
expected = self.next_sequential_read_offset,
actual = offset,
"out-of-order read, resetting prefetch"
);
counter!("prefetch.out_of_order", 1);
self.reset_prefetch_to_offset(offset);
if self.try_seek(offset).await? {
trace!("seek succeeded");
counter!("prefetch.seek", 1);
} else {
trace!(
expected = self.next_sequential_read_offset,
actual = offset,
"out-of-order read, resetting prefetch"
);
counter!("prefetch.out_of_order", 1);
self.reset_prefetch_to_offset(offset);
}
}
debug_assert_eq!(self.next_sequential_read_offset, offset);
assert_eq!(self.next_sequential_read_offset, offset);

self.prepare_requests();

Expand Down Expand Up @@ -301,6 +310,7 @@ where
_task_handle: task_handle,
total_size: size as usize,
remaining: size as usize,
start_offset: start,
part_queue,
})
}
Expand All @@ -318,13 +328,77 @@ where

/// Reset this prefetch request to a new offset, clearing any existing tasks queued.
fn reset_prefetch_to_offset(&mut self, offset: u64) {
// TODO see if we can reuse any inflight requests rather than dropping them immediately
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
}

/// Try to seek within the current inflight requests without restarting them. Returns true if
/// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the
/// new offset. If this returns false, the prefetcher is in an unknown state and must be reset.
async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<TaskError<Client>>> {
assert_ne!(offset, self.next_sequential_read_offset);
trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
if offset > self.next_sequential_read_offset {
let mut seek_distance = offset - self.next_sequential_read_offset;
let Some(current_task) = self.current_task.as_mut() else {
// Can't seek if there's no requests in flight at all
return Ok(false);
};
let future_remaining = self
.future_tasks
.read()
.unwrap()
.iter()
.map(|task| task.remaining)
.sum::<usize>() as u64;
if seek_distance
>= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance)
{
// TODO maybe adjust the next_request_size somehow if we were still within
// max_forward_seek_distance, so that strides > first_request_size can still get
// prefetched.
trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data");
return Ok(false);
}

// Jump ahead to the right request
if seek_distance >= current_task.remaining as u64 {
self.next_sequential_read_offset += current_task.remaining as u64;
self.current_task = None;
let mut future_tasks = self.future_tasks.write().unwrap();
while let Some(next_request) = future_tasks.pop_front() {
if next_request.end_offset() > offset {
self.current_task = Some(next_request);
break;
} else {
self.next_sequential_read_offset = next_request.end_offset();
}
}
// We checked there was an inflight task that contained the target offset, so this
// is impossible.
assert!(self.current_task.is_some());
seek_distance = offset - self.next_sequential_read_offset;
}

let current_task = self
.current_task
.as_mut()
.expect("a request existed that covered this seek offset");
while seek_distance > 0 {
let part = current_task.read(seek_distance as usize).await?;
seek_distance -= part.len() as u64;
self.next_sequential_read_offset += part.len() as u64;
}

Ok(true)
} else {
// TODO seek backwards
Ok(false)
}
}
}

/// A single GetObject request submitted to the S3 client
Expand All @@ -333,6 +407,7 @@ struct RequestTask<E> {
/// Handle on the task/future. Future is cancelled when handle is dropped.
_task_handle: RemoteHandle<()>,
remaining: usize,
start_offset: u64,
total_size: usize,
part_queue: PartQueue<E>,
}
Expand All @@ -344,6 +419,10 @@ impl<E: std::error::Error + Send + Sync> RequestTask<E> {
self.remaining -= part.len();
Ok(part)
}

fn end_offset(&self) -> u64 {
self.start_offset + self.total_size as u64
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -386,6 +465,8 @@ mod tests {
sequential_prefetch_multiplier: usize,
#[proptest(strategy = "16usize..2*1024*1024")]
client_part_size: usize,
#[proptest(strategy = "1u64..4*1024*1024")]
max_forward_seek_distance: u64,
}

fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) {
Expand All @@ -404,6 +485,7 @@ mod tests {
max_request_size: test_config.max_request_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
read_timeout: Duration::from_secs(5),
max_forward_seek_distance: test_config.max_forward_seek_distance,
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config);
Expand Down Expand Up @@ -431,6 +513,7 @@ mod tests {
max_request_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
};
run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config);
}
Expand All @@ -442,6 +525,7 @@ mod tests {
max_request_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
};
run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config);
}
Expand All @@ -453,6 +537,7 @@ mod tests {
max_request_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
};
run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config);
}
Expand Down Expand Up @@ -513,6 +598,7 @@ mod tests {
max_request_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
};

let mut get_failures = HashMap::new();
Expand Down Expand Up @@ -554,6 +640,7 @@ mod tests {
sequential_prefetch_multiplier: prefetch_multiplier,
max_request_size,
read_timeout: Duration::from_secs(60),
max_forward_seek_distance: 16 * 1024 * 1024,
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config);
Expand Down Expand Up @@ -599,6 +686,7 @@ mod tests {
first_request_size: test_config.first_request_size,
max_request_size: test_config.max_request_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
max_forward_seek_distance: test_config.max_forward_seek_distance,
..Default::default()
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand Down Expand Up @@ -661,10 +749,64 @@ mod tests {
max_request_size: 2147621,
sequential_prefetch_multiplier: 4,
client_part_size: 516882,
max_forward_seek_distance: 16 * 1024 * 1024,
};
run_random_read_test(object_size, reads, config);
}

#[test]
fn test_random_read_regression2() {
let object_size = 755678;
let reads = vec![(0, 278499), (311250, 1)];
let config = TestConfig {
first_request_size: 556997,
max_request_size: 105938,
sequential_prefetch_multiplier: 7,
client_part_size: 1219731,
max_forward_seek_distance: 16 * 1024 * 1024,
};
run_random_read_test(object_size, reads, config);
}

#[test_case(61; "no seek")]
#[test_case(70; "same part seek")]
#[test_case(80; "next part seek")]
#[test_case(100; "next request first byte seek")]
#[test_case(101; "next request seek")]
#[test_case(130; "next request next part seek")]
#[test_case(199; "last byte seek")]
fn test_forward_seek(offset: u64) {
const OBJECT_SIZE: usize = 200;
const PART_SIZE: usize = 25;
const FIRST_REQUEST_SIZE: usize = 100;
// Big enough to trigger the second request
const FIRST_READ_SIZE: usize = 60;

let config = MockClientConfig {
bucket: "test-bucket".to_string(),
part_size: PART_SIZE,
};
let client = MockClient::new(config);
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();

client.add_object("hello", object);

let test_config = PrefetcherConfig {
first_request_size: FIRST_REQUEST_SIZE,
..Default::default()
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config);

let mut request = prefetcher.get("test-bucket", "hello", OBJECT_SIZE as u64, etag);
let _first_read = block_on(request.read(0, FIRST_READ_SIZE)).unwrap();

let byte = block_on(request.read(offset, 1)).unwrap();
let expected = ramp_bytes((0xaa + offset) as usize, 1);
assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
}

#[cfg(feature = "shuttle")]
mod shuttle_tests {
use super::*;
Expand Down

0 comments on commit 39823c5

Please sign in to comment.