Skip to content

Commit

Permalink
mononoke/rendezvous: introduce query batching
Browse files Browse the repository at this point in the history
Summary:
This introduces a basic building block for query batching. I called this
rendezvous, since it's about multiple queries meeting up in the same place :)

There are a few (somewhat conflicting) goals this tries to satisfy, so let's go
over them:

1), we'd like to reduce the total number of queries made by batch jobs. For
example, group hg bonsai lookups made by the walker. Those jobs are
characterized by the fact that they have a lot of queries to make, all the
time. Here's an example: https://fburl.com/ods/zuiep7yh.

2), we'd like to reduce the overall number of connections held to MySQL by
our tasks. The main way we achieve this is by reducing the maximum number of
concurrent queries. Indeed, a high total number of queries doesn't necessarily
result in a lot of connections as long as they're not concurrent, because we
can reuse connections. On the other hand, if you dispatch 100 concurrent
queries, that _does_ use 100 connections. This is something that applies to
batch jobs due to their query volume, but also to "interactive" jobs like
Mononoke Server or SCS, just not all the time. Here's an example:
https://fburl.com/ods/o6gp07qp (you can see the query count is overall low, but
sometimes spikes substantially).

2.1) It's also worth noting that concurrent queries are often the result of
many clients wanting the same data, so deduplication is also useful here.

3), we also don't want to impact the latency of interactive jobs when they
need to a little query here or there (i.e. it's largely fine if our jobs all
hold a few connections to MySQL and use them somewhat consistently).

4), we'd like this to make it easier to do batching right. For example, if
you have 100 Bonsais to map to hg, you should be able to just map and call
`future::try_join_all` and have that do the right thing.

5), we don't want "bad" queries to affect other queries negatively. One
example would be the occasional queries we make to Bonsai <-> Hg mapping in
`known` for thousands (if not more) of rows.

6), we want this to be easy to incorporate into the codebase.

So, how do we try to address all of this? Here's how:

- We ... do batching, and we deduplicate requests in a batch. This is the
  easier bit and should address #1, #2 and #2.1, #4.
- However, batching is conditional. We notably don't batch very large requests
  with the rest (addresses #5). We also don't batch small queries all the time:
  we only batch if we are observing a throughput of queries that suggests we
  can find some benefit in batching (this targets #3).
- Finally, we have some utilities for common cases like having to group by repo
  id (this is `MultiRendezVous`), and this is all configurable via tunables
  (and the default is to not do anything).

Reviewed By: StanislavGlebik

Differential Revision: D27010317

fbshipit-source-id: 4a2397255f9785c6722c02e4d419438fd0aafa07
  • Loading branch information
krallin authored and facebook-github-bot committed Mar 19, 2021
1 parent 6741565 commit a3a0347
Show file tree
Hide file tree
Showing 9 changed files with 728 additions and 0 deletions.
1 change: 1 addition & 0 deletions eden/mononoke/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ members = [
"common/dedupmap",
"common/futures_watchdog",
"common/iterhelpers",
"common/rendezvous",
"common/rust/caching_ext",
"common/rust/slog_ext",
"common/rust/sql_ext",
Expand Down
23 changes: 23 additions & 0 deletions eden/mononoke/common/rendezvous/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "rendezvous"
version = "0.1.0"
authors = ["Facebook"]
edition = "2018"
license = "GPLv2+"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.45"
dashmap = { version = "4.0.2", features = ["serde"] }
fbinit = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures = { version = "0.3.13", features = ["async-await", "compat"] }
futures_stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
shared_error = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
time_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
tokio = { version = "0.2.25", features = ["full", "test-util"] }
tunables = { version = "0.1.0", path = "../../tunables" }

[dev-dependencies]
fbinit-tokio-02 = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
maplit = "1.0"
19 changes: 19 additions & 0 deletions eden/mononoke/common/rendezvous/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/

mod multi_rendez_vous;
mod rendez_vous;
mod rendez_vous_stats;
mod tunables;

#[cfg(test)]
mod test;

pub use crate::tunables::{TunablesMultiRendezVousController, TunablesRendezVousController};
pub use multi_rendez_vous::{MultiRendezVous, MultiRendezVousController};
pub use rendez_vous::{RendezVous, RendezVousController};
pub use rendez_vous_stats::RendezVousStats;
76 changes: 76 additions & 0 deletions eden/mononoke/common/rendezvous/src/multi_rendez_vous.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/

use dashmap::DashMap;
use std::hash::Hash;
use std::sync::Arc;

use crate::{RendezVous, RendezVousController, RendezVousStats, TunablesMultiRendezVousController};

pub trait MultiRendezVousController: Send + Sync + 'static {
type Controller: RendezVousController;

fn new_controller(&self) -> Self::Controller;
}

/// A wrapper around RendezVous that can be keyed by a grouping key (G). This is useful when you
/// want multiple RendezVous instances for a set of groups but you don't know the groups ahead of
/// time (e.g. the groups might be repository ids).
pub struct MultiRendezVous<
G,
K,
V,
C: MultiRendezVousController = TunablesMultiRendezVousController,
> {
inner: Arc<DashMap<G, RendezVous<K, V, <C as MultiRendezVousController>::Controller>>>,
multi_controller: C,
stats: Arc<RendezVousStats>,
}

impl<G, K, V, C> Clone for MultiRendezVous<G, K, V, C>
where
G: Hash + Eq,
C: MultiRendezVousController + Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
multi_controller: self.multi_controller.clone(),
stats: self.stats.clone(),
}
}
}

impl<G, K, V, C> MultiRendezVous<G, K, V, C>
where
G: Hash + Eq,
C: MultiRendezVousController,
{
pub fn new(multi_controller: C, stats: RendezVousStats) -> Self {
Self {
inner: Arc::new(DashMap::new()),
multi_controller,
stats: Arc::new(stats),
}
}

pub fn get(&self, group: G) -> RendezVous<K, V, <C as MultiRendezVousController>::Controller> {
use dashmap::mapref::entry::Entry;

let ret = match self.inner.entry(group) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(e) => e
.insert(RendezVous::new(
self.multi_controller.new_controller(),
self.stats.clone(),
))
.clone(),
};

ret
}
}
243 changes: 243 additions & 0 deletions eden/mononoke/common/rendezvous/src/rendez_vous.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/

use anyhow::Error;
use fbinit::FacebookInit;
use futures::future::{BoxFuture, Future, FutureExt, Shared};
use futures_stats::TimedFutureExt;
use shared_error::anyhow::{IntoSharedError, SharedError};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::{Arc, Mutex};
use time_ext::DurationExt;
use tokio::sync::Notify;

use crate::{RendezVousStats, TunablesRendezVousController};

/// The RendezVousController controls the behavior of a RendezVous instance. It notably decides
/// when to wait for a batch to build up and when to kick off queries.
#[async_trait::async_trait]
pub trait RendezVousController: Send + Sync + 'static {
/// Decide whether we should be starting a new batch. This will be called once per request that
/// arrives to the RendezVous instance. It is expected that the RendezVousController might
/// store some internal state to make this decision.
fn should_batch(&self) -> bool;

/// Delay sending a batch to give ourselves a chance to accumulate some data. The batch will be
/// kicked off once this future returns;
async fn wait_for_dispatch(&self);

/// If our number of queued keys exceeds this threshold, then we'll dispatch the query even if
/// wait_for_dispatch hasn't returned yet.
fn early_dispatch_threshold(&self) -> usize;
}

struct RendezVousInner<K, V, C> {
staging: Mutex<
Option<(
HashSet<K>,
Shared<BoxFuture<'static, Result<Arc<HashMap<K, V>>, SharedError>>>,
Arc<Notify>,
)>,
>,
controller: C,
stats: Arc<RendezVousStats>,
}

pub struct RendezVous<K, V, C = TunablesRendezVousController> {
inner: Arc<RendezVousInner<K, V, C>>,
}

impl<K, V, C> Clone for RendezVous<K, V, C> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}

impl<K, V, C> RendezVous<K, V, C> {
pub fn new(controller: C, stats: Arc<RendezVousStats>) -> Self {
Self {
inner: Arc::new(RendezVousInner {
staging: Mutex::new(None),
controller,
stats,
}),
}
}
}

impl<K, V, C> RendezVous<K, V, C>
where
K: Clone + Eq + Hash + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
C: RendezVousController,
{
pub fn dispatch<F0, F1, Fut>(
&self,
fb: FacebookInit,
keys: HashSet<K>,
f0: F0,
) -> impl Future<Output = Result<HashMap<K, Option<V>>, Error>>
where
F0: FnOnce() -> F1, // Can construct a F1 if we are the first caller here
F1: FnOnce(HashSet<K>) -> Fut + Send + 'static, // Actually makes the call
Fut: Future<Output = Result<HashMap<K, V>, Error>> + Send,
{
if self.inner.controller.should_batch()
&& keys.len() < self.inner.controller.early_dispatch_threshold()
{
self.dispatch_batched(fb, keys, f0).left_future()
} else {
self.dispatch_not_batched(fb, keys, f0).right_future()
}
}

fn dispatch_batched<F0, F1, Fut>(
&self,
fb: FacebookInit,
keys: HashSet<K>,
f0: F0,
) -> impl Future<Output = Result<HashMap<K, Option<V>>, Error>>
where
F0: FnOnce() -> F1,
F1: FnOnce(HashSet<K>) -> Fut + Send + 'static,
Fut: Future<Output = Result<HashMap<K, V>, Error>> + Send,
{
let mut deduplicated = 0;

let mut guard = self.inner.staging.lock().expect("Poisoned lock");

let fut = match &mut *guard {
guard @ None => {
let inner = self.inner.clone();
let f1 = f0();

let notify = Arc::new(Notify::new());

let fut = {
let notify = notify.clone();

async move {
let is_early = futures::select! {
_ = inner.controller.wait_for_dispatch().fuse() => false,
_ = notify.notified().fuse() => true,
};

if is_early {
inner.stats.dispatch_batch_early.add_value(1);
} else {
inner.stats.dispatch_batch_scheduled.add_value(1);
}

let (keys, _, _) = inner
.staging
.lock()
.expect("Poisoned lock")
.take()
.expect("Staging cannot be empty if a task was dispatched");

let ret = dispatch_with_stats(fb, f1, keys, &inner.stats)
.await
.shared_error()?;

Result::<_, SharedError>::Ok(Arc::new(ret))
}
}
.boxed()
.shared();

*guard = Some((keys.clone(), fut.clone(), notify));

fut
}
Some((ref mut staged_keys, ref fut, ref notify)) => {
for k in keys.iter().cloned() {
if !staged_keys.insert(k) {
deduplicated += 1;
}
}

if staged_keys.len() >= self.inner.controller.early_dispatch_threshold() {
notify.notify();
}

fut.clone()
}
};

std::mem::drop(guard);

self.inner.stats.keys_deduplicated.add_value(deduplicated);

async move {
let shared_ret = fut.await?;
let ret = keys
.into_iter()
.map(|k| {
let v = shared_ret.get(&k).cloned();
(k, v)
})
.collect();
Ok(ret)
}
}

fn dispatch_not_batched<F0, F1, Fut>(
&self,
fb: FacebookInit,
keys: HashSet<K>,
f0: F0,
) -> impl Future<Output = Result<HashMap<K, Option<V>>, Error>>
where
F0: FnOnce() -> F1,
F1: FnOnce(HashSet<K>) -> Fut + Send + 'static,
Fut: Future<Output = Result<HashMap<K, V>, Error>> + Send,
{
let inner = self.inner.clone();

async move {
inner.stats.dispatch_no_batch.add_value(1);

let mut ret = dispatch_with_stats(fb, f0(), keys.clone(), &inner.stats).await?;

let ret = keys
.into_iter()
.map(|k| {
let v = ret.remove(&k);
(k, v)
})
.collect();

Ok(ret)
}
}
}

async fn dispatch_with_stats<K, V, F1, Fut>(
fb: FacebookInit,
f1: F1,
keys: HashSet<K>,
rdv_stats: &RendezVousStats,
) -> Result<HashMap<K, V>, Error>
where
F1: FnOnce(HashSet<K>) -> Fut + Send + 'static,
Fut: Future<Output = Result<HashMap<K, V>, Error>>,
{
rdv_stats.keys_dispatched.add_value(keys.len() as i64);

rdv_stats.inflight.increment_value(fb, 1);
let (stats, ret) = f1(keys).timed().await;
rdv_stats.inflight.increment_value(fb, -1);

rdv_stats
.fetch_completion_time_ms
.add_value(stats.completion_time.as_millis_unchecked() as i64);

ret
}
23 changes: 23 additions & 0 deletions eden/mononoke/common/rendezvous/src/rendez_vous_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/

use stats::prelude::*;

define_stats_struct! {
RendezVousStats("mononoke.rdv.{}", label: String),

dispatch_no_batch: timeseries("dispatch_no_batch"; Sum),
dispatch_batch_early: timeseries("dispatch_batch_early"; Sum),
dispatch_batch_scheduled: timeseries("dispatch_batch_scheduled"; Sum),

keys_dispatched: timeseries("keys_dispatched"; Average, Sum),
keys_deduplicated: timeseries("keys_deduplicated"; Sum),

fetch_completion_time_ms: histogram(1, 0, 10000, Average; P 50; P 95; P 99),

inflight: singleton_counter("inflight"),
}
Loading

0 comments on commit a3a0347

Please sign in to comment.