diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 13901f0322a..c867c135d8c 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -181,7 +181,7 @@ pub use crate::{ peer_set::init, policies::RetryLimit, protocol::{ - external::{Version, VersionMessage}, + external::{Version, VersionMessage, MAX_TX_INV_IN_SENT_MESSAGE}, internal::{InventoryResponse, Request, Response}, }, }; diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 066116a2ff5..8c585ee2619 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -37,7 +37,7 @@ use crate::{ external::{types::Nonce, InventoryHash, Message}, internal::{InventoryResponse, Request, Response}, }, - BoxError, + BoxError, MAX_TX_INV_IN_SENT_MESSAGE, }; use InventoryResponse::*; @@ -992,9 +992,31 @@ where ) } (AwaitingRequest, AdvertiseTransactionIds(hashes)) => { + let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE + .try_into() + .expect("constant fits in usize"); + + // # Security + // + // In most cases, we try to split over-sized requests into multiple network-layer + // messages. But we are unlikely to reach this limit with the default mempool + // config, so a gossip like this could indicate a network amplification attack. + // + // This limit is particularly important here, because advertisements send the same + // message to half our available peers. + // + // If there are thousands of transactions in the mempool, letting peers know the + // exact transactions we have isn't that important, so it's ok to drop arbitrary + // transaction hashes from our response. + if hashes.len() > max_tx_inv_in_message { + debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip"); + } + + let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect(); + self .peer_tx - .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) + .send(Message::Inv(hashes)) .await .map(|()| Handler::Finished(Ok(Response::Nil)) @@ -1351,11 +1373,30 @@ where } } Response::TransactionIds(hashes) => { - if let Err(e) = self - .peer_tx - .send(Message::Inv(hashes.into_iter().map(Into::into).collect())) - .await - { + let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE + .try_into() + .expect("constant fits in usize"); + + // # Security + // + // In most cases, we try to split over-sized responses into multiple network-layer + // messages. But we are unlikely to reach this limit with the default mempool + // config, so a response like this could indicate a network amplification attack. + // + // If there are thousands of transactions in the mempool, letting peers know the + // exact transactions we have isn't that important, so it's ok to drop arbitrary + // transaction hashes from our response. + if hashes.len() > max_tx_inv_in_message { + debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response"); + } + + let hashes = hashes + .into_iter() + .take(max_tx_inv_in_message) + .map(Into::into) + .collect(); + + if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await { self.fail_with(e) } } diff --git a/zebra-network/src/protocol/external.rs b/zebra-network/src/protocol/external.rs index b060d646a6d..1901ff9bd4e 100644 --- a/zebra-network/src/protocol/external.rs +++ b/zebra-network/src/protocol/external.rs @@ -18,7 +18,7 @@ mod tests; pub use addr::{canonical_peer_addr, canonical_socket_addr, AddrInVersion}; pub use codec::Codec; -pub use inv::InventoryHash; +pub use inv::{InventoryHash, MAX_TX_INV_IN_SENT_MESSAGE}; pub use message::{Message, VersionMessage}; pub use types::{Nonce, Version}; diff --git a/zebra-network/src/protocol/external/inv.rs b/zebra-network/src/protocol/external/inv.rs index d112b4991c0..2ee18560a8b 100644 --- a/zebra-network/src/protocol/external/inv.rs +++ b/zebra-network/src/protocol/external/inv.rs @@ -179,12 +179,26 @@ impl ZcashDeserialize for InventoryHash { /// The minimum serialized size of an [`InventoryHash`]. pub(crate) const MIN_INV_HASH_SIZE: usize = 36; -/// The maximum number of transaction inventory items in a network message. -/// We also use this limit for block inventory, because it is typically much smaller. +/// The maximum number of inventory items in a network message received from a peer. +/// +/// After [ZIP-239](https://zips.z.cash/zip-0239#deployment), this would allow a message filled +/// with `MSG_WTX` entries to be around 3.4 MB, so we also need a separate constant to limit the +/// number of `inv` entries that we send. /// /// Same as `MAX_INV_SZ` in `zcashd`: /// -pub const MAX_TX_INV_IN_MESSAGE: u64 = 50_000; +pub const MAX_INV_IN_RECEIVED_MESSAGE: u64 = 50_000; + +/// The maximum number of transaction inventory items in a network message received from a peer. +/// +/// After [ZIP-239](https://zips.z.cash/zip-0239#deployment), this would allow a message filled +/// with `MSG_WTX` entries to be around 3.4 MB, so we also need a separate constant to limit the +/// number of `inv` entries that we send. +/// +/// This constant is not critical to compatibility: it just needs to be less than or equal to +/// `zcashd`'s `MAX_INV_SZ`: +/// +pub const MAX_TX_INV_IN_SENT_MESSAGE: u64 = 25_000; impl TrustedPreallocate for InventoryHash { fn max_allocation() -> u64 { @@ -193,6 +207,6 @@ impl TrustedPreallocate for InventoryHash { // a single message let message_size_limit = ((MAX_PROTOCOL_MESSAGE_LEN - 1) / MIN_INV_HASH_SIZE) as u64; - min(message_size_limit, MAX_TX_INV_IN_MESSAGE) + min(message_size_limit, MAX_INV_IN_RECEIVED_MESSAGE) } } diff --git a/zebra-network/src/protocol/external/message.rs b/zebra-network/src/protocol/external/message.rs index 6a6a1e295b0..009566dc24d 100644 --- a/zebra-network/src/protocol/external/message.rs +++ b/zebra-network/src/protocol/external/message.rs @@ -202,7 +202,11 @@ pub enum Message { /// A `tx` message. /// - /// This message is used to advertise unmined transactions for the mempool. + /// This message can be used to: + /// - send unmined transactions in response to `GetData` requests, and + /// - advertise unmined transactions for the mempool. + /// + /// Zebra chooses to advertise new transactions using `Inv(hash)` rather than `Tx(transaction)`. /// /// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#tx) Tx(UnminedTx), diff --git a/zebra-network/src/protocol/external/tests/preallocate.rs b/zebra-network/src/protocol/external/tests/preallocate.rs index 132fdb015a1..1ff30d1285c 100644 --- a/zebra-network/src/protocol/external/tests/preallocate.rs +++ b/zebra-network/src/protocol/external/tests/preallocate.rs @@ -16,7 +16,7 @@ use crate::{ meta_addr::MetaAddr, protocol::external::{ addr::{AddrV1, AddrV2, ADDR_V1_SIZE, ADDR_V2_MIN_SIZE}, - inv::{InventoryHash, MAX_TX_INV_IN_MESSAGE}, + inv::{InventoryHash, MAX_INV_IN_RECEIVED_MESSAGE}, }, }; @@ -66,7 +66,7 @@ proptest! { // // Special case: Zcash has a slightly smaller limit for transaction invs, // so we use it for all invs. - prop_assert!(smallest_disallowed_serialized.len() > min(MAX_PROTOCOL_MESSAGE_LEN, usize::try_from(MAX_TX_INV_IN_MESSAGE).expect("fits in usize"))); + prop_assert!(smallest_disallowed_serialized.len() > min(MAX_PROTOCOL_MESSAGE_LEN, usize::try_from(MAX_INV_IN_RECEIVED_MESSAGE).expect("fits in usize"))); // Create largest_allowed_vec by removing one element from smallest_disallowed_vec without copying (for efficiency) smallest_disallowed_vec.pop(); diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index 03390e09298..0316f9ee12a 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -148,14 +148,11 @@ pub enum Request { /// Advertise a set of unmined transactions to all peers. /// - /// This is intended to be used in Zebra with a single transaction at a time - /// (set of size 1), but multiple transactions are permitted because this is - /// how we interpret advertisements from zcashd, which sometimes advertises - /// multiple transactions at once. + /// Both Zebra and zcashd sometimes advertise multiple transactions at once. /// /// This is implemented by sending an `inv` message containing the unmined - /// transaction ID, allowing the remote peer to choose whether to download - /// it. Remote peers who choose to download the transaction will generate a + /// transaction IDs, allowing the remote peer to choose whether to download + /// them. Remote peers who choose to download the transaction will generate a /// [`Request::TransactionsById`] against the "inbound" service passed to /// [`init`](crate::init). /// diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 57a7b4f3b1d..24a5050c5c7 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -468,6 +468,7 @@ impl Service for Inbound { block_downloads.download_and_verify(hash); async { Ok(zn::Response::Nil) }.boxed() } + // The size of this response is limited by the `Connection` state machine in the network layer zn::Request::MempoolTransactionIds => { mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp { mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil, diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 1a226390c82..55816fd4eb9 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -35,7 +35,7 @@ use crate::{ gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig, Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError, }, - sync::{self, BlockGossipError, SyncStatus}, + sync::{self, BlockGossipError, SyncStatus, TIPS_RESPONSE_TIMEOUT}, }, BoxError, }; @@ -420,7 +420,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { let mut hs = HashSet::new(); hs.insert(tx1_id); - // Transaction and Block IDs are gossipped, in any order + // Transaction and Block IDs are gossipped, in any order, after waiting for the gossip delay + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; let possible_requests = &mut [ Request::AdvertiseTransactionIds(hs), Request::AdvertiseBlock(block_two.hash()), @@ -488,7 +489,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); - // Block is gossiped + // Test the block is gossiped, after waiting for the multi-gossip delay + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; peer_set .expect_request(Request::AdvertiseBlock(block_three.hash())) .await @@ -564,7 +566,9 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::Expired) ); - // Test transaction 2 is gossiped + // Test transaction 2 is gossiped, after waiting for the multi-gossip delay + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; + let mut hs = HashSet::new(); hs.insert(tx2_id); peer_set @@ -583,18 +587,6 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { zebra_test::vectors::BLOCK_MAINNET_6_BYTES .zcash_deserialize_into() .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_7_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_8_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_9_BYTES - .zcash_deserialize_into() - .unwrap(), - zebra_test::vectors::BLOCK_MAINNET_10_BYTES - .zcash_deserialize_into() - .unwrap(), ]; for block in more_blocks { state_service @@ -605,7 +597,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); - // Block is gossiped + // Test the block is gossiped, after waiting for the multi-gossip delay + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; peer_set .expect_request(Request::AdvertiseBlock(block.hash())) .await diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index d771b967ee9..7415dffe03c 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -27,7 +27,7 @@ use std::{ }; use futures::{future::FutureExt, stream::Stream}; -use tokio::sync::watch; +use tokio::sync::broadcast; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ @@ -233,7 +233,7 @@ pub struct Mempool { /// Sender part of a gossip transactions channel. /// Used to broadcast transaction ids to peers. - transaction_sender: watch::Sender>, + transaction_sender: broadcast::Sender>, // Diagnostics // @@ -267,9 +267,9 @@ impl Mempool { sync_status: SyncStatus, latest_chain_tip: zs::LatestChainTip, chain_tip_change: ChainTipChange, - ) -> (Self, watch::Receiver>) { + ) -> (Self, broadcast::Receiver>) { let (transaction_sender, transaction_receiver) = - tokio::sync::watch::channel(HashSet::new()); + tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2); let mut service = Mempool { config: config.clone(), @@ -659,9 +659,6 @@ impl Service for Mempool { if !send_to_peers_ids.is_empty() { tracing::trace!(?send_to_peers_ids, "sending new transactions to peers"); - // TODO: - // - if the transaction gossip task is slow, we can overwrite unsent IDs here - // - does this happen often enough to be worth a fix? self.transaction_sender.send(send_to_peers_ids)?; } } diff --git a/zebrad/src/components/mempool/gossip.rs b/zebrad/src/components/mempool/gossip.rs index cca47234026..eefa2d53ae4 100644 --- a/zebrad/src/components/mempool/gossip.rs +++ b/zebrad/src/components/mempool/gossip.rs @@ -5,31 +5,37 @@ use std::collections::HashSet; -use tokio::sync::watch; +use tokio::sync::broadcast::{ + self, + error::{RecvError, TryRecvError}, +}; use tower::{timeout::Timeout, Service, ServiceExt}; use zebra_chain::transaction::UnminedTxId; +use zebra_network::MAX_TX_INV_IN_SENT_MESSAGE; + use zebra_network as zn; use crate::{components::sync::TIPS_RESPONSE_TIMEOUT, BoxError}; -/// The maximum number of times we will delay sending because there is a new change. +/// The maximum number of channel messages we will combine into a single peer broadcast. pub const MAX_CHANGES_BEFORE_SEND: usize = 10; /// Runs continuously, gossiping new [`UnminedTxId`] to peers. /// -/// Broadcasts any [`UnminedTxId`] that gets stored in the mempool to all ready -/// peers. -/// -/// [`UnminedTxId`]: zebra_chain::transaction::UnminedTxId +/// Broadcasts any new [`UnminedTxId`]s that get stored in the mempool to multiple ready peers. pub async fn gossip_mempool_transaction_id( - mut receiver: watch::Receiver>, + mut receiver: broadcast::Receiver>, broadcast_network: ZN, ) -> Result<(), BoxError> where ZN: Service + Send + Clone + 'static, ZN::Future: Send, { + let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE + .try_into() + .expect("constant fits in usize"); + info!("initializing transaction gossip task"); // use the same timeout as tips requests, @@ -39,29 +45,41 @@ where loop { let mut combined_changes = 1; - // once we get new data in the channel, broadcast to peers, - // the mempool automatically combines some transactions that arrive close together - receiver.changed().await?; - let mut txs = receiver.borrow().clone(); - tokio::task::yield_now().await; - - // also combine transactions that arrived shortly after this one - while receiver.has_changed()? && combined_changes < MAX_CHANGES_BEFORE_SEND { - // Correctness - // - set the has_changed() flag to false using borrow_and_update() - // - clone() so we don't hold the watch channel lock while modifying txs - let extra_txs = receiver.borrow_and_update().clone(); - txs.extend(extra_txs.iter()); + // once we get new data in the channel, broadcast to peers + // + // the mempool automatically combines some transaction IDs that arrive close together, + // and this task also combines the changes that are in the channel before sending + let mut txs = loop { + match receiver.recv().await { + Ok(txs) => break txs, + Err(RecvError::Lagged(skip_count)) => info!( + ?skip_count, + "dropped transactions before gossiping due to heavy mempool or network load" + ), + Err(closed @ RecvError::Closed) => Err(closed)?, + } + }; + + // also combine transaction IDs that arrived shortly after this one, + // but limit the number of changes and the number of transaction IDs + // (the network layer handles the actual limits, this just makes sure the loop terminates) + while combined_changes <= MAX_CHANGES_BEFORE_SEND && txs.len() < max_tx_inv_in_message { + match receiver.try_recv() { + Ok(extra_txs) => txs.extend(extra_txs.iter()), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Lagged(skip_count)) => info!( + ?skip_count, + "dropped transactions before gossiping due to heavy mempool or network load" + ), + Err(closed @ TryRecvError::Closed) => Err(closed)?, + } combined_changes += 1; - - tokio::task::yield_now().await; } let txs_len = txs.len(); let request = zn::Request::AdvertiseTransactionIds(txs); - // TODO: rate-limit this info level log? info!(%request, changes = %combined_changes, "sending mempool transaction broadcast"); debug!( ?request, @@ -73,5 +91,11 @@ where let _ = broadcast_network.ready().await?.call(request).await; metrics::counter!("mempool.gossiped.transactions.total", txs_len as u64); + + // wait for at least the network timeout between gossips + // + // in practice, transactions arrive every 1-20 seconds, + // so waiting 6 seconds can delay transaction propagation, in order to reduce peer load + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; } } diff --git a/zebrad/src/components/sync/gossip.rs b/zebrad/src/components/sync/gossip.rs index 09edc630d0b..a6fcb3b49d5 100644 --- a/zebrad/src/components/sync/gossip.rs +++ b/zebrad/src/components/sync/gossip.rs @@ -85,5 +85,11 @@ where .map_err(PeerSetReadiness)? .call(request) .await; + + // wait for at least the network timeout between gossips + // + // in practice, we expect blocks to arrive approximately every 75 seconds, + // so waiting 6 seconds won't make much difference + tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await; } }