Skip to content

Commit

Permalink
limit read_many concurrency based on inflight IO memory
Browse files Browse the repository at this point in the history
Add the option to limit the amount of concurrent IO requests based on
memory usage. This is a useful knob in conjunction with IO requests
coalescing because it makes sense to keep a high number of concurrent
small IO requests but less so if they are extremely large. i.e., if all
the IO requests are 4MiB large, it doesn't make much sense to schedule
more than a few at a time. Scheduling too many at once could starve
other concurrent IO tasks for no throughput benefit. Conversely, It
makes sense to schedule 4KiB requests with a higher concurrency level.
  • Loading branch information
HippoBaro committed Jan 3, 2022
1 parent 16d9166 commit 6471358
Showing 1 changed file with 68 additions and 13 deletions.
81 changes: 68 additions & 13 deletions glommio/src/io/bulk_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,39 +246,58 @@ impl<V: IoVec + Unpin, S: Stream<Item = V> + Unpin> Stream for CoalescedReads<V,
}

#[derive(Debug)]
pub(crate) struct OrderedBulkIo<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> {
pub(crate) struct OrderedBulkIo<U: IoVec + Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> {
file: Rc<DmaFile>,
iovs: S,

inflight: VecDeque<(ScheduledSource, U)>,
cap: usize,
concurrency_cap: usize,
inflight_memory: usize,
memory_cap: usize,
terminated: bool,
}

impl<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> OrderedBulkIo<U, S> {
impl<U: IoVec + Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> OrderedBulkIo<U, S> {
pub(crate) fn new(file: Rc<DmaFile>, concurrency: usize, iovs: S) -> OrderedBulkIo<U, S> {
assert!(concurrency > 0);
OrderedBulkIo {
file,
iovs,
inflight: VecDeque::with_capacity(concurrency),
cap: concurrency,
concurrency_cap: concurrency,
inflight_memory: 0,
memory_cap: usize::MAX,
terminated: false,
}
}

pub(crate) fn set_concurrency(&mut self, concurrency: usize) {
assert!(concurrency > 0);
pub(crate) fn set_concurrency_limit(&mut self, limit: usize) {
assert!(limit > 0);
assert!(
!self.terminated && self.inflight.is_empty(),
"should be called before the first call to poll()"
);
self.inflight.reserve(limit);
self.concurrency_cap = limit
}

pub(crate) fn set_memory_limit(&mut self, limit: Option<usize>) {
assert!(limit.unwrap_or(usize::MAX) > 0);
assert!(
!self.terminated && self.inflight.is_empty(),
"should be called before the first call to poll()"
);
self.inflight.reserve(concurrency);
self.cap = concurrency
self.memory_cap = limit.unwrap_or(usize::MAX)
}

fn is_full(&self) -> bool {
self.inflight.len() == self.concurrency_cap || self.inflight_memory > self.memory_cap
}
}

impl<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> Stream for OrderedBulkIo<U, S> {
impl<U: IoVec + Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> Stream
for OrderedBulkIo<U, S>
{
type Item = (ScheduledSource, U);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -287,7 +306,10 @@ impl<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> Stream for Ordere
// Poll the underlying stream and insert the resulting source, if any, in the
// local buffer
let poll_inner = |this: &mut Self, cx: &mut Context<'_>| match this.iovs.poll_next(cx) {
Poll::Ready(Some(res)) => this.inflight.push_front(res),
Poll::Ready(Some(res)) => {
this.inflight_memory += res.1.size();
this.inflight.push_front(res)
}
Poll::Ready(None) => this.terminated = true,
_ => {}
};
Expand All @@ -302,6 +324,7 @@ impl<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> Stream for Ordere
if !this.terminated {
poll_inner(this, cx);
}
this.inflight_memory -= ret.1.size();
Poll::Ready(Some(ret))
} else {
// we have a source in the buffer but it's not ready yet to we register the
Expand All @@ -315,12 +338,12 @@ impl<U: Unpin, S: Stream<Item = (ScheduledSource, U)> + Unpin> Stream for Ordere
}
};

if this.inflight.len() == this.cap || (this.terminated && !this.inflight.is_empty()) {
if this.is_full() || (this.terminated && !this.inflight.is_empty()) {
// The internal buffer is full so we consume them instead of creating new ones
poll_buffer(this, cx)
} else {
// fill the internal buffer as much as possible
while this.inflight.len() < this.cap && !this.terminated {
while !this.is_full() && !this.terminated {
poll_inner(this, cx);
}

Expand All @@ -347,6 +370,16 @@ pub struct ReadManyArgs<V: IoVec + Unpin> {
pub(crate) system_read: (u64, usize),
}

impl<V: IoVec + Unpin> IoVec for ReadManyArgs<V> {
fn pos(&self) -> u64 {
self.system_read.pos()
}

fn size(&self) -> usize {
self.system_read.size()
}
}

/// A stream of ReadResult produced asynchronously.
///
/// See [`DmaFile::read_many`] for more information
Expand Down Expand Up @@ -375,7 +408,29 @@ impl<V: IoVec + Unpin, S: Stream<Item = (ScheduledSource, ReadManyArgs<V>)> + Un
/// will panic otherwise.
#[must_use]
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.inner.set_concurrency(concurrency);
self.inner.set_concurrency_limit(concurrency);
self
}

/// Set a limit to the number of concurrent IO requests based on memory
/// usage.
///
/// This is a useful knob in conjunction with IO requests coalescing because
/// it makes sense to keep a high number of concurrent small IO requests but
/// less so if they are extremely large. i.e., if all the IO requests are
/// 4MiB large, it doesn't make much sense to schedule more than a few at a
/// time. Scheduling too many at once could starve other concurrent IO tasks
/// for no throughput benefit. Conversely, It makes sense to schedule 4KiB
/// requests with a higher concurrency level.
///
/// Note that this is a soft limit. No matter how small this limit is set
/// to, a single IO request will always be allowed to run. This can happen
/// if you configure the IO merging logic very aggressively.
///
/// Defaults to no limit.
#[must_use]
pub fn with_memory_limit(mut self, limit: Option<usize>) -> Self {
self.inner.set_memory_limit(limit);
self
}
}
Expand Down

0 comments on commit 6471358

Please sign in to comment.