Skip to content

Commit

Permalink
persist: only fetch Consensus in Listen::next when necessary
Browse files Browse the repository at this point in the history
This adds a performance optimization where a Listener doesn't fetch the
latest Consensus state if the one it currently has can serve the next
request. A similar thing already was true of SnapshotIter, so also
included is a test that covers both.
  • Loading branch information
danhhz committed Jun 10, 2022
1 parent 51ae906 commit 88a2335
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 23 deletions.
56 changes: 38 additions & 18 deletions src/persist-client/src/impl/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,32 +226,52 @@ where
}
}

// NB: Unlike the other methods here, this one is read-only.
pub async fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Self, Since<T>> {
match self.state.verify_listen(as_of) {
Ok(Ok(())) => Ok(self.clone()),
Ok(Err(Upper(_))) => {
// The upper may not be ready yet (maybe it would be ready if we
// re-fetched state), but that's okay! One way to think of
// Listen is as an async stream where creating the stream at any
// legal as_of does not block but then updates trickle in once
// they are available.
Ok(self.clone())
}
Err(Since(since)) => return Err(Since(since)),
}
}

pub async fn next_listen_batch(
&mut self,
frontier: &Antichain<T>,
) -> (Vec<String>, Description<T>) {
// This unconditionally fetches the latest state and uses that to
// determine if we can serve `as_of`. TODO: We could instead check first
// and only fetch if necessary.
let mut retry = self
.retry_metrics
.next_listen_batch
.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
let mut retry: Option<MetricsRetryStream> = None;
loop {
self.fetch_and_update_state().await;
if let Some((keys, desc)) = self.state.next_listen_batch(frontier) {
return (keys.to_owned(), desc.clone());
}
// Wait a bit and try again. Intentionally don't ever log this at
// info level.
//
// TODO: See if we can watch for changes in Consensus to be more
// reactive here.
debug!(
"next_listen_batch didn't find new data, retrying in {:?}",
retry.next_sleep()
);
retry = retry.sleep().instrument(trace_span!("listen::sleep")).await;
// Only sleep after the first fetch, because the first time through
// maybe our state was just out of date.
retry = Some(match retry.take() {
None => self
.retry_metrics
.next_listen_batch
.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream()),
Some(retry) => {
// Wait a bit and try again. Intentionally don't ever log
// this at info level.
//
// TODO: See if we can watch for changes in Consensus to be
// more reactive here.
debug!(
"next_listen_batch didn't find new data, retrying in {:?}",
retry.next_sleep()
);
retry.sleep().instrument(trace_span!("listen::sleep")).await
}
});
self.fetch_and_update_state().await;
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/persist-client/src/impl/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,18 @@ where
Ok(Ok(batches))
}

// NB: Unlike the other methods here, this one is read-only.
pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Result<(), Upper<T>>, Since<T>> {
if PartialOrder::less_than(as_of, &self.collections.since) {
return Err(Since(self.collections.since.clone()));
}
let upper = self.collections.upper();
if PartialOrder::less_equal(&upper, as_of) {
return Ok(Err(Upper(upper)));
}
Ok(Ok(()))
}

pub fn next_listen_batch(
&self,
frontier: &Antichain<T>,
Expand Down
74 changes: 69 additions & 5 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::Stream;
use mz_ore::task::RuntimeExt;
use serde::{Deserialize, Serialize};
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tokio::runtime::Handle;
use tracing::{debug_span, info, instrument, trace, trace_span, warn, Instrument};
use uuid::Uuid;
Expand Down Expand Up @@ -335,14 +334,12 @@ where
#[instrument(level = "debug", skip_all, fields(shard = %self.machine.shard_id()))]
pub async fn listen(&self, as_of: Antichain<T>) -> Result<Listen<K, V, T, D>, Since<T>> {
trace!("ReadHandle::listen as_of={:?}", as_of);
if PartialOrder::less_than(&as_of, &self.since) {
return Err(Since(self.since.clone()));
}
let machine = self.machine.verify_listen(&as_of).await?;
Ok(Listen {
retry_metrics: Arc::clone(&self.retry_metrics),
as_of: as_of.clone(),
frontier: as_of,
machine: self.machine.clone(),
machine,
blob: Arc::clone(&self.blob),
})
}
Expand Down Expand Up @@ -645,3 +642,70 @@ where
ret
})
}

#[cfg(test)]
mod tests {
use mz_ore::metrics::MetricsRegistry;
use mz_persist::location::Consensus;
use mz_persist::mem::{MemBlobMulti, MemBlobMultiConfig, MemConsensus};
use mz_persist::unreliable::{UnreliableConsensus, UnreliableHandle};

use crate::tests::all_ok;
use crate::{Metrics, PersistClient, PersistConfig};

use super::*;

// Verifies performance optimizations where a SnapshotIter/Listener doesn't
// fetch the latest Consensus state if the one it currently has can serve
// the next request.
#[tokio::test]
async fn skip_consensus_fetch_optimization() {
mz_ore::test::init_logging();
let data = vec![
(("0".to_owned(), "zero".to_owned()), 0, 1),
(("1".to_owned(), "one".to_owned()), 1, 1),
(("2".to_owned(), "two".to_owned()), 2, 1),
];

let blob = Arc::new(MemBlobMulti::open(MemBlobMultiConfig::default()));
let consensus = Arc::new(MemConsensus::default());
let unreliable = UnreliableHandle::default();
unreliable.totally_available();
let consensus = Arc::new(UnreliableConsensus::new(consensus, unreliable.clone()))
as Arc<dyn Consensus + Send + Sync>;
let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
let (mut write, read) =
PersistClient::new(PersistConfig::default(), blob, consensus, metrics)
.await
.expect("client construction failed")
.expect_open::<String, String, u64, i64>(ShardId::new())
.await;

write.expect_compare_and_append(&data[0..1], 0, 1).await;
write.expect_compare_and_append(&data[1..2], 1, 2).await;
write.expect_compare_and_append(&data[2..3], 2, 3).await;

let mut snapshot = read.expect_snapshot(2).await;
let mut listen = read.expect_listen(0).await;

// Manually advance the listener's machine so that it has the latest
// state by fetching the first events from next. This is awkward but
// only necessary because we're about to do some weird things with
// unreliable.
let mut listen_actual = listen.next().await;

// At this point, the snapshot and listen's state should have all the
// writes. Test this by making consensus completely unavailable.
unreliable.totally_unavailable();
assert_eq!(snapshot.read_all().await, all_ok(&data, 2));
let expected_events = vec![
ListenEvent::Progress(Antichain::from_elem(1)),
ListenEvent::Updates(all_ok(&data[1..2], 1)),
ListenEvent::Progress(Antichain::from_elem(2)),
ListenEvent::Updates(all_ok(&data[2..3], 1)),
ListenEvent::Progress(Antichain::from_elem(3)),
];
listen_actual.append(&mut listen.read_until(&3).await);
assert_eq!(listen_actual, expected_events);
}
}

0 comments on commit 88a2335

Please sign in to comment.