Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject connections from outdated peers #2519

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
03104b3
Simplify state service initialization in test
jvff Jul 29, 2021
0c8f4ff
Create `BestTipHeight` helper type
jvff Jul 28, 2021
0aeef35
Add `best_tip_height` field to `StateService`
jvff Jul 28, 2021
9f9aa3d
Return receiver endpoint from service constructor
jvff Jul 28, 2021
00c52c4
Update finalized height after finalizing blocks
jvff Jul 28, 2021
076f7a1
Update best non-finalized height after validation
jvff Jul 28, 2021
4d9a2d5
Update finalized height after loading from disk
jvff Jul 29, 2021
8c38013
Update the finalized height on checkpoint commit
jvff Jul 29, 2021
167cb9f
Add `best_tip_height` to `Handshake` service
jvff Jul 28, 2021
824fc50
Require best tip height to init. `zebra_network`
jvff Jul 28, 2021
041d844
Pass `best_tip_height` to proto. ver. negotiation
jvff Jul 28, 2021
3461392
Handle an optional height in `Version`
jvff Jul 30, 2021
eae3d8b
Reject connections to peers on old proto. versions
jvff Jul 28, 2021
465f2bd
Document why peers on old versions are rejected
jvff Jul 23, 2021
d317335
Test if `BestTipHeight` starts with `None`
jvff Jul 30, 2021
8cdeba4
Test if best tip height is max. of latest values
jvff Jul 30, 2021
ca1435f
Add `queue_and_commit_finalized` method
jvff Aug 2, 2021
138c7c6
Add `assert_block_can_be_validated` helper
jvff Aug 3, 2021
49178a4
Remove redundant PoW block assertion
jvff Aug 6, 2021
94a65b2
Create a test strategy for test vector chain
jvff Aug 5, 2021
73a96e0
Test committing blocks update best tip height
jvff Jul 30, 2021
8f33738
Merge branch 'main' into reject-connections-from-outdated-peers
teor2345 Aug 8, 2021
f5fbda5
Merge branch 'main' into reject-connections-from-outdated-peers
teor2345 Aug 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio::{
net::TcpStream,
sync::{broadcast, watch},
task::JoinError,
time::timeout,
};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand Down Expand Up @@ -53,6 +58,7 @@ pub struct Handshake<S> {
our_services: PeerServices,
relay: bool,
parent_span: Span,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
}

/// The peer address that we are handshaking with.
Expand Down Expand Up @@ -302,6 +308,7 @@ pub struct Builder<S> {
user_agent: Option<String>,
relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
}

impl<S> Builder<S>
Expand Down Expand Up @@ -361,6 +368,18 @@ where
self
}

/// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
///
/// If this is unset, the minimum accepted protocol version for peer connections is kept
/// constant over network upgrade activations.
pub fn with_best_tip_height(
mut self,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> Self {
self.best_tip_height = best_tip_height;
self
}

/// Whether to request that peers relay transactions to our node. Optional.
///
/// If this is unset, the node will not request transactions.
Expand Down Expand Up @@ -402,6 +421,7 @@ where
our_services,
relay,
parent_span: Span::current(),
best_tip_height: self.best_tip_height,
})
}
}
Expand All @@ -424,6 +444,7 @@ where
our_services: None,
relay: None,
inv_collector: None,
best_tip_height: None,
}
}
}
Expand All @@ -433,6 +454,7 @@ where
///
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
#[allow(clippy::too_many_arguments)]
pub async fn negotiate_version(
peer_conn: &mut Framed<TcpStream, Codec>,
connected_addr: &ConnectedAddr,
Expand All @@ -441,6 +463,7 @@ pub async fn negotiate_version(
user_agent: String,
our_services: PeerServices,
relay: bool,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
Expand Down Expand Up @@ -552,17 +575,11 @@ pub async fn negotiate_version(
Err(HandshakeError::NonceReuse)?;
}

// TODO: Reject connections with nodes that don't know about the current network upgrade (#1334)
// Use the latest non-finalized block height, rather than the minimum
if remote_version
< Version::min_remote_for_height(
config.network,
// This code will be replaced in #1334
constants::INITIAL_MIN_NETWORK_PROTOCOL_VERSION
.activation_height(config.network)
.expect("minimum network protocol network upgrade has an activation height"),
)
{
// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let height = best_tip_height.and_then(|height| *height.borrow());
let min_version = Version::min_remote_for_height(config.network, height);
if remote_version < min_version {
// Disconnect if peer is using an obsolete version.
Err(HandshakeError::ObsoleteVersion(remote_version))?;
}
Expand Down Expand Up @@ -617,6 +634,7 @@ where
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;
let best_tip_height = self.best_tip_height.clone();

let fut = async move {
debug!(
Expand Down Expand Up @@ -647,6 +665,7 @@ where
user_agent,
our_services,
relay,
best_tip_height,
),
)
.await??;
Expand Down
10 changes: 8 additions & 2 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tokio::{
net::TcpListener,
sync::{broadcast, watch},
time::Instant,
};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt,
Expand All @@ -25,7 +29,7 @@ use crate::{
BoxError, Config, Request, Response,
};

use zebra_chain::parameters::Network;
use zebra_chain::{block, parameters::Network};

use super::CandidateSet;
use super::PeerSet;
Expand Down Expand Up @@ -59,6 +63,7 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
pub async fn init<S>(
config: Config,
inbound_service: S,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>,
Expand Down Expand Up @@ -87,6 +92,7 @@ where
.with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string())
.with_best_tip_height(best_tip_height)
.want_transactions(true)
.finish()
.expect("configured all required parameters");
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer_set/initialize/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });

let (_peer_service, address_book) = init(config, inbound_service).await;
let (_peer_service, address_book) = init(config, inbound_service, None).await;
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();

if listen_addr.port() == 0 {
Expand Down
6 changes: 5 additions & 1 deletion zebra-network/src/protocol/external/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl Version {
/// # Panics
///
/// If we are incompatible with our own minimum remote protocol version.
pub fn min_remote_for_height(network: Network, height: block::Height) -> Version {
pub fn min_remote_for_height(
network: Network,
height: impl Into<Option<block::Height>>,
) -> Version {
let height = height.into().unwrap_or(block::Height(0));
let min_spec = Version::min_specified_for_height(network, height);

// shut down if our own version is too old
Expand Down
95 changes: 64 additions & 31 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@ use std::{
time::{Duration, Instant},
};

use check::difficulty::POW_MEDIAN_BLOCK_SPAN;
use futures::future::FutureExt;
use non_finalized_state::{NonFinalizedState, QueuedBlocks};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
#[cfg(any(test, feature = "proptest-impl"))]
use tower::buffer::Buffer;
use tower::{util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
block::{self, Block},
parameters::POW_AVERAGING_WINDOW,
parameters::{Network, NetworkUpgrade},
transaction,
transaction::Transaction,
transparent,
};

use self::best_tip_height::BestTipHeight;
use crate::{
constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config,
FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError,
};

mod best_tip_height;
pub(crate) mod check;
mod finalized_state;
mod non_finalized_state;
Expand Down Expand Up @@ -63,14 +63,21 @@ pub(crate) struct StateService {
network: Network,
/// Instant tracking the last time `pending_utxos` was pruned
last_prune: Instant,
/// The current best chain tip height.
best_tip_height: BestTipHeight,
}

impl StateService {
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);

pub fn new(config: Config, network: Network) -> Self {
pub fn new(config: Config, network: Network) -> (Self, watch::Receiver<Option<block::Height>>) {
let (mut best_tip_height, best_tip_height_receiver) = BestTipHeight::new();
let disk = FinalizedState::new(&config, network);

if let Some(finalized_height) = disk.finalized_tip_height() {
best_tip_height.set_finalized_height(finalized_height);
}

let mem = NonFinalizedState::new(network);
let queued_blocks = QueuedBlocks::default();
let pending_utxos = PendingUtxos::default();
Expand All @@ -82,6 +89,7 @@ impl StateService {
pending_utxos,
network,
last_prune: Instant::now(),
best_tip_height,
};

tracing::info!("starting legacy chain check");
Expand All @@ -108,7 +116,23 @@ impl StateService {
}
tracing::info!("no legacy chain found");

state
(state, best_tip_height_receiver)
}

/// Queue a finalized block for verification and storage in the finalized state.
fn queue_and_commit_finalized(
&mut self,
finalized: FinalizedBlock,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
let (rsp_tx, rsp_rx) = oneshot::channel();

self.disk.queue_and_commit_finalized((finalized, rsp_tx));

if let Some(finalized_height) = self.disk.finalized_tip_height() {
self.best_tip_height.set_finalized_height(finalized_height);
}

rsp_rx
}

/// Queue a non finalized block for verification and check if any queued
Expand Down Expand Up @@ -165,10 +189,17 @@ impl StateService {
);
}

self.queued_blocks
.prune_by_height(self.disk.finalized_tip_height().expect(
let finalized_tip_height = self.disk.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
));
);
let non_finalized_tip_height = self.mem.best_tip().map(|(height, _hash)| height);

self.queued_blocks.prune_by_height(finalized_tip_height);

self.best_tip_height
.set_finalized_height(finalized_tip_height);
self.best_tip_height
.set_best_non_finalized_height(non_finalized_tip_height);

tracing::trace!("finished processing queued block");
rsp_rx
Expand Down Expand Up @@ -204,23 +235,6 @@ impl StateService {
let queued_children = self.queued_blocks.dequeue_children(parent_hash);

for (child, rsp_tx) in queued_children {
// required by validate_and_commit, moved here to make testing easier
assert!(
child.height > self.network.mandatory_checkpoint_height(),
"invalid non-finalized block height: the canopy checkpoint is mandatory, \
pre-canopy blocks, and the canopy activation block, \
must be committed to the state as finalized blocks"
);

// required by check_contextual_validity, moved here to make testing easier
let relevant_chain =
self.any_ancestor_blocks(child.block.header.previous_block_hash);
assert!(
relevant_chain.len() >= POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN,
"contextual validation requires at least \
28 (POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN) blocks"
);

let child_hash = child.hash;
let result;

Expand Down Expand Up @@ -504,6 +518,17 @@ impl StateService {
let intersection = self.find_best_chain_intersection(known_blocks);
self.collect_best_chain_hashes(intersection, stop, max_len)
}

/// Assert some assumptions about the prepared `block` before it is validated.
fn assert_block_can_be_validated(&self, block: &PreparedBlock) {
// required by validate_and_commit, moved here to make testing easier
assert!(
block.height > self.network.mandatory_checkpoint_height(),
"invalid non-finalized block height: the canopy checkpoint is mandatory, pre-canopy \
blocks, and the canopy activation block, must be committed to the state as finalized \
blocks"
);
}
}

pub(crate) struct Iter<'a> {
Expand Down Expand Up @@ -640,6 +665,8 @@ impl Service<Request> for StateService {
Request::CommitBlock(prepared) => {
metrics::counter!("state.requests", 1, "type" => "commit_block");

self.assert_block_can_be_validated(&prepared);

self.pending_utxos
.check_against_ordered(&prepared.new_outputs);
let rsp_rx = self.queue_and_commit_non_finalized(prepared);
Expand All @@ -656,10 +683,8 @@ impl Service<Request> for StateService {
Request::CommitFinalizedBlock(finalized) => {
metrics::counter!("state.requests", 1, "type" => "commit_finalized_block");

let (rsp_tx, rsp_rx) = oneshot::channel();

self.pending_utxos.check_against(&finalized.new_outputs);
self.disk.queue_and_commit_finalized((finalized, rsp_tx));
let rsp_rx = self.queue_and_commit_finalized(finalized);

async move {
rsp_rx
Expand Down Expand Up @@ -748,16 +773,24 @@ impl Service<Request> for StateService {
/// possible to construct multiple state services in the same application (as
/// long as they, e.g., use different storage locations), but doing so is
/// probably not what you want.
pub fn init(config: Config, network: Network) -> BoxService<Request, Response, BoxError> {
BoxService::new(StateService::new(config, network))
pub fn init(
config: Config,
network: Network,
) -> (
BoxService<Request, Response, BoxError>,
watch::Receiver<Option<block::Height>>,
) {
let (state_service, best_tip_height) = StateService::new(config, network);

(BoxService::new(state_service), best_tip_height)
}

/// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot.
///
/// This can be used to create a state service for testing. See also [`init`].
#[cfg(any(test, feature = "proptest-impl"))]
pub fn init_test(network: Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
let state_service = StateService::new(Config::ephemeral(), network);
let (state_service, _) = StateService::new(Config::ephemeral(), network);

Buffer::new(BoxService::new(state_service), 1)
}
Expand Down
Loading