Skip to content

Commit

Permalink
dc: Prune peer map if we've pruned the id map (#2319)
Browse files Browse the repository at this point in the history
* Prune peer list if we've pruned the path secret

This ensures that when we prune entries from the map, we bound the size
of the peer set as well.

This also updates our test coverage to include a small-size map, which
causes us to need to tweak a few assertions to account for the now
semi-random removals. I'm not very happy with the result, but I think
it's OK for now. Mid-term it probably makes sense to figure out a better
way to make sure our logic is sound (e.g., probability of removing a
recently added peer should be near zero).

* Bound requested_handshakes too
  • Loading branch information
Mark-Simulacrum authored Sep 13, 2024
1 parent 5d73bf8 commit 132ba54
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 22 deletions.
69 changes: 51 additions & 18 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl Cleaner {
// handshake to happen. This handshake will happen on the next request for this
// particular peer.
if entry.rehandshake_time() <= now {
state.requested_handshakes.pin().insert(entry.peer);
state.request_handshake(entry.peer);
}

// Not retired.
Expand All @@ -208,24 +208,47 @@ impl Cleaner {
}
}

if state.ids.len() <= (state.max_capacity * 95 / 100) {
return;
}

let mut to_remove = std::cmp::max(state.ids.len() / 100, 1);
let guard = state.ids.guard();
for (id, entry) in state.ids.iter(&guard) {
if to_remove > 0 {
// Only remove with the minimum epoch. This hopefully means that we will remove
// fairly stale entries.
if entry.used_at.load(Ordering::Relaxed) == minimum {
state.ids.remove(id, &guard);
to_remove -= 1;
if state.ids.len() > (state.max_capacity * 95 / 100) {
let mut to_remove = std::cmp::max(state.ids.len() / 100, 1);
let guard = state.ids.guard();
for (id, entry) in state.ids.iter(&guard) {
if to_remove > 0 {
// Only remove with the minimum epoch. This hopefully means that we will remove
// fairly stale entries.
if entry.used_at.load(Ordering::Relaxed) == minimum {
state.ids.remove(id, &guard);
to_remove -= 1;
}
} else {
break;
}
} else {
break;
}
}

// Prune the peer list of any entries that no longer have a corresponding `id` entry.
//
// This ensures that the peer list is naturally bounded in size by the size of the `id`
// set, and relies on precisely the same mechanisms for eviction.
{
let ids = state.ids.pin();
state
.peers
.pin()
.retain(|_, entry| ids.contains_key(entry.secret.id()));
}

// Iteration order should be effectively random, so this effectively just prunes the list
// periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note
// that peers the application is actively interested in will typically bypass this list, so
// this is mostly a risk of delaying regular re-handshaking with very large cardinalities.
//
// FIXME: Long or mid-term it likely makes sense to replace this data structure with a
// fuzzy set of some kind and/or just moving to immediate background handshake attempts.
let mut count = 0;
state.requested_handshakes.pin().retain(|_| {
count += 1;
count < 5000
});
}

fn epoch(&self) -> u64 {
Expand All @@ -235,6 +258,16 @@ impl Cleaner {

const EVICTION_CYCLES: u64 = if cfg!(test) { 0 } else { 10 };

impl State {
fn request_handshake(&self, peer: SocketAddr) {
// The length is reset as part of cleanup to 5000.
let handshakes = self.requested_handshakes.pin();
if handshakes.len() <= 6000 {
handshakes.insert(peer);
}
}
}

impl Map {
pub fn new(signer: stateless_reset::Signer) -> Self {
// FIXME: Avoid unwrap and the whole socket.
Expand Down Expand Up @@ -385,7 +418,7 @@ impl Map {

// FIXME: More actively schedule a new handshake.
// See comment on requested_handshakes for details.
self.state.requested_handshakes.pin().insert(state.peer);
self.state.request_handshake(state.peer);
}

pub fn handle_control_packet(&self, packet: &control::Packet) {
Expand Down Expand Up @@ -433,7 +466,7 @@ impl Map {
//
// Handshaking will be rate limited per destination peer (and at least
// de-duplicated).
self.state.requested_handshakes.pin().insert(state.peer);
self.state.request_handshake(state.peer);
}
control::Packet::UnknownPathSecret(_) => unreachable!(),
}
Expand Down
58 changes: 54 additions & 4 deletions dc/s2n-quic-dc/src/path/secret/map/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ impl Model {
let ids = state.state.ids.guard();
self.invariants.retain(|invariant| {
if let Invariant::ContainsId(id) = invariant {
if state.state.ids.get(id, &ids).unwrap().retired.retired() {
if state
.state
.ids
.get(id, &ids)
.map_or(true, |v| v.retired.retired())
{
invalidated.push(*id);
return false;
}
Expand Down Expand Up @@ -191,12 +196,18 @@ impl Model {
let peers = state.peers.guard();
let ids = state.ids.guard();
for invariant in self.invariants.iter() {
// We avoid assertions for contains() if we're running the small capacity test, since
// they are likely broken -- we semi-randomly evict peers in that case.
match invariant {
Invariant::ContainsIp(ip) => {
assert!(state.peers.contains_key(ip, &peers), "{:?}", ip);
if state.max_capacity != 5 {
assert!(state.peers.contains_key(ip, &peers), "{:?}", ip);
}
}
Invariant::ContainsId(id) => {
assert!(state.ids.contains_key(id, &ids), "{:?}", id);
if state.max_capacity != 5 {
assert!(state.ids.contains_key(id, &ids), "{:?}", id);
}
}
Invariant::IdRemoved(id) => {
assert!(
Expand All @@ -207,6 +218,16 @@ impl Model {
}
}
}

// All entries in the peer set should also be in the `ids` set (which is actively garbage
// collected).
for (_, entry) in state.peers.iter(&peers) {
assert!(
state.ids.contains_key(entry.secret.id(), &ids),
"{:?} not present in IDs",
entry.secret.id()
);
}
}
}

Expand Down Expand Up @@ -236,7 +257,36 @@ fn has_duplicate_pids(ops: &[Operation]) -> bool {
fn check_invariants() {
bolero::check!()
.with_type::<Vec<Operation>>()
.with_iterations(100_000)
.with_iterations(10_000)
.for_each(|input: &Vec<Operation>| {
if has_duplicate_pids(input) {
// Ignore this attempt.
return;
}

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let mut map = Map::new(signer);

// Avoid background work interfering with testing.
map.state.cleaner.stop();

Arc::get_mut(&mut map.state).unwrap().max_capacity = 5;

model.check_invariants(&map.state);

for op in input {
model.perform(*op, &map);
model.check_invariants(&map.state);
}
})
}

#[test]
fn check_invariants_no_overflow() {
bolero::check!()
.with_type::<Vec<Operation>>()
.with_iterations(10_000)
.for_each(|input: &Vec<Operation>| {
if has_duplicate_pids(input) {
// Ignore this attempt.
Expand Down

0 comments on commit 132ba54

Please sign in to comment.