Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Halborn 2023 02 20 #6297

Merged
merged 8 commits into from
Mar 13, 2023
5 changes: 3 additions & 2 deletions zebra-chain/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ pub use error::SerializationError;
pub use read_zcash::ReadZcashExt;
pub use write_zcash::WriteZcashExt;
pub use zcash_deserialize::{
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count, TrustedPreallocate,
ZcashDeserialize, ZcashDeserializeInto,
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count,
zcash_deserialize_string_external_count, TrustedPreallocate, ZcashDeserialize,
ZcashDeserializeInto,
};
pub use zcash_serialize::{
zcash_serialize_bytes, zcash_serialize_bytes_external_count, zcash_serialize_empty_list,
Expand Down
23 changes: 20 additions & 3 deletions zebra-chain/src/serialization/zcash_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,28 @@ pub fn zcash_deserialize_bytes_external_count<R: io::Read>(
Ok(vec)
}

/// `zcash_deserialize_external_count`, specialised for [`String`].
/// The external count is in bytes. (Not UTF-8 characters.)
///
/// This allows us to optimize the inner loop into a single call to `read_exact()`.
///
/// This function has a `zcash_` prefix to alert the reader that the
/// serialization in use is consensus-critical serialization, rather than
/// some other kind of serialization.
pub fn zcash_deserialize_string_external_count<R: io::Read>(
external_byte_count: usize,
reader: R,
) -> Result<String, SerializationError> {
let bytes = zcash_deserialize_bytes_external_count(external_byte_count, reader)?;

String::from_utf8(bytes).map_err(|_| SerializationError::Parse("invalid utf-8"))
}

/// Read a Bitcoin-encoded UTF-8 string.
impl ZcashDeserialize for String {
fn zcash_deserialize<R: io::Read>(reader: R) -> Result<Self, SerializationError> {
let bytes: Vec<_> = Vec::zcash_deserialize(reader)?;
String::from_utf8(bytes).map_err(|_| SerializationError::Parse("invalid utf-8"))
fn zcash_deserialize<R: io::Read>(mut reader: R) -> Result<Self, SerializationError> {
let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
zcash_deserialize_string_external_count(byte_count.into(), reader)
}
}

Expand Down
130 changes: 117 additions & 13 deletions zebra-network/src/peer_set/inventory_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html

use std::{
collections::HashMap,
convert::TryInto,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};

use futures::{FutureExt, Stream, StreamExt};
use indexmap::IndexMap;
use tokio::{
sync::broadcast,
time::{self, Instant},
Expand All @@ -35,6 +34,40 @@ pub mod update;
#[cfg(test)]
mod tests;

/// The maximum number of inventory hashes we will track from a single peer.
///
/// # Security
///
/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
/// ```text
/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
/// ```
///
/// Since the inventory registry is an efficiency optimisation, which falls back to a
/// random peer, we only need to track a small number of hashes for available inventory.
///
/// But we want to be able to track a significant amount of missing inventory,
/// to limit queries for globally missing inventory.
//
// TODO: split this into available (25) and missing (1000 or more?)
pub const MAX_INV_PER_MAP: usize = 1000;

/// The maximum number of peers we will track inventory for.
///
/// # Security
///
/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
///
/// Since the inventory registry is an efficiency optimisation, which falls back to a
/// random peer, we only need to track a small number of peers per inv for available inventory.
///
/// But we want to be able to track missing inventory for almost all our peers,
/// so we only query a few peers for inventory that is genuinely missing from the network.
//
// TODO: split this into available (25) and missing (70)
pub const MAX_PEERS_PER_INV: usize = 70;

/// A peer inventory status, which tracks a hash for both available and missing inventory.
pub type InventoryStatus<T> = InventoryResponse<T, T>;

Expand All @@ -59,10 +92,12 @@ type InventoryMarker = InventoryStatus<()>;
pub struct InventoryRegistry {
/// Map tracking the latest inventory status from the current interval
/// period.
current: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
//
// TODO: split maps into available and missing, so we can limit them separately.
current: IndexMap<InventoryHash, IndexMap<SocketAddr, InventoryMarker>>,

/// Map tracking inventory statuses from the previous interval period.
prev: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
prev: IndexMap<InventoryHash, IndexMap<SocketAddr, InventoryMarker>>,

/// Stream of incoming inventory statuses to register.
inv_stream: Pin<
Expand Down Expand Up @@ -99,7 +134,17 @@ impl InventoryChange {
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();

// # Security
//
// Don't send more hashes than we're going to store.
// It doesn't matter which hashes we choose, because this is an efficiency optimisation.
//
// This limits known memory denial of service attacks to:
// `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
hashes.truncate(MAX_INV_PER_MAP);

let hashes = hashes.try_into().ok();

hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
Expand All @@ -110,7 +155,14 @@ impl InventoryChange {
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();

// # Security
//
// Don't send more hashes than we're going to store.
// It doesn't matter which hashes we choose, because this is an efficiency optimisation.
hashes.truncate(MAX_INV_PER_MAP);

let hashes = hashes.try_into().ok();

hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
Expand Down Expand Up @@ -149,8 +201,15 @@ impl InventoryRegistry {

// Don't do an immediate rotation, current and prev are already empty.
let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
// SECURITY: if the rotation time is late, delay future rotations by the same amount
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
// # Security
//
// If the rotation time is late, execute as many ticks as needed to catch up.
// This is a tradeoff between memory usage and quickly accessing remote data
// under heavy load. Bursting prioritises lower memory usage.
//
// Skipping or delaying could keep peer inventory in memory for a longer time,
// further increasing memory load or delays due to virtual memory swapping.
interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);

Self {
current: Default::default(),
Expand Down Expand Up @@ -206,6 +265,17 @@ impl InventoryRegistry {
.is_some()
}

/// Returns an iterator over peer inventory status hashes.
///
/// Yields current statuses first, then previously rotated statuses.
/// This can include multiple statuses for the same hash.
#[allow(dead_code)]
pub fn status_hashes(
&self,
) -> impl Iterator<Item = (&InventoryHash, &IndexMap<SocketAddr, InventoryMarker>)> {
self.current.iter().chain(self.prev.iter())
}

/// Returns a future that polls once for new registry updates.
#[allow(dead_code)]
pub fn update(&mut self) -> Update {
Expand All @@ -219,8 +289,19 @@ impl InventoryRegistry {
/// - rotates HashMaps based on interval events
/// - drains the inv_stream channel and registers all advertised inventory
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> {
// Correctness: Registers the current task for wakeup when the timer next becomes ready.
while Pin::new(&mut self.interval).poll_next(cx).is_ready() {
// # Correctness
//
// Registers the current task for wakeup when the timer next becomes ready.
//
// # Security
//
// Only rotate one inventory per peer request, to give the next inventory
// time to gather some peer advertisements. This is a tradeoff between
// memory usage and quickly accessing remote data under heavy load.
//
// This prevents a burst edge case where all inventory is emptied after
// two interval ticks are delayed.
if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
self.rotate();
}

Expand Down Expand Up @@ -274,13 +355,13 @@ impl InventoryRegistry {
"unexpected inventory type: {inv:?} from peer: {addr:?}",
);

let current = self.current.entry(inv).or_default();
let hash_peers = self.current.entry(inv).or_default();

// # Security
//
// Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
// and funnel multiple failing requests to themselves.
if let Some(old_status) = current.get(&addr) {
if let Some(old_status) = hash_peers.get(&addr) {
if old_status.is_missing() && new_status.is_available() {
debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
continue;
Expand All @@ -295,14 +376,37 @@ impl InventoryRegistry {
);
}

let replaced_status = current.insert(addr, new_status);
let replaced_status = hash_peers.insert(addr, new_status);

debug!(
?new_status,
?replaced_status,
?addr,
?inv,
"inserted new status"
);

// # Security
//
// Limit the number of stored peers per hash, removing the oldest entries,
// because newer entries are likely to be more relevant.
//
// TODO: do random or weighted random eviction instead?
if hash_peers.len() > MAX_PEERS_PER_INV {
// Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
hash_peers.shift_remove_index(0);
}

// # Security
//
// Limit the number of stored inventory hashes, removing the oldest entries,
// because newer entries are likely to be more relevant.
//
// TODO: do random or weighted random eviction instead?
if self.current.len() > MAX_INV_PER_MAP {
// Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
self.current.shift_remove_index(0);
}
}
}

Expand Down
102 changes: 100 additions & 2 deletions zebra-network/src/peer_set/inventory_registry/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
//! Fixed test vectors for the inventory registry.

use zebra_chain::block;
use std::{cmp::min, net::SocketAddr};

use zebra_chain::{block, serialization::AtLeastOne, transaction};

use crate::{
peer_set::inventory_registry::{tests::new_inv_registry, InventoryStatus},
peer_set::inventory_registry::{
tests::new_inv_registry, InventoryMarker, InventoryStatus, MAX_INV_PER_MAP,
MAX_PEERS_PER_INV,
},
protocol::external::InventoryHash,
};

Expand Down Expand Up @@ -182,3 +187,96 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}

/// Check inventory registration limits.
#[tokio::test]
async fn inv_registry_limit() {
inv_registry_limit_for(InventoryMarker::Available(())).await;
inv_registry_limit_for(InventoryMarker::Missing(())).await;
}

/// Check the inventory registration limit for `status`.
async fn inv_registry_limit_for(status: InventoryMarker) {
let single_test_hash = InventoryHash::Block(block::Hash([0xbb; 32]));
let single_test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");

let (mut inv_registry, inv_stream_tx) = new_inv_registry();

// Check hash limit
for hash_count in 0..(MAX_INV_PER_MAP + 10) {
let mut test_hash = hash_count.to_ne_bytes().to_vec();
test_hash.resize(32, 0);
let test_hash = InventoryHash::Tx(transaction::Hash(test_hash.try_into().unwrap()));

let test_change = status.map(|()| (AtLeastOne::from_one(test_hash), single_test_peer));

let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");

assert_eq!(receiver_count, 1);

inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");

if status.is_available() {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}

// SECURITY: limit inventory memory usage
assert_eq!(
inv_registry.status_hashes().count(),
min(hash_count + 1, MAX_INV_PER_MAP),
);
}

// Check peer address per hash limit
let (mut inv_registry, inv_stream_tx) = new_inv_registry();

for peer_count in 0..(MAX_PEERS_PER_INV + 10) {
let test_peer = SocketAddr::new(
"2.2.2.2".parse().unwrap(),
peer_count.try_into().expect("fits in u16"),
);

let test_change = status.map(|()| (AtLeastOne::from_one(single_test_hash), test_peer));

let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");

assert_eq!(receiver_count, 1);

inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");

assert_eq!(inv_registry.status_hashes().count(), 1);

let limited_count = min(peer_count + 1, MAX_PEERS_PER_INV);

// SECURITY: limit inventory memory usage
if status.is_available() {
assert_eq!(
inv_registry.advertising_peers(single_test_hash).count(),
limited_count,
);
assert_eq!(inv_registry.missing_peers(single_test_hash).count(), 0);
} else {
assert_eq!(inv_registry.advertising_peers(single_test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(single_test_hash).count(),
limited_count,
);
}
}
}
Loading