Skip to content

Commit

Permalink
Disallow some endpoints whilst syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 28, 2020
1 parent 642f4fc commit 5308fdd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 12 deletions.
17 changes: 7 additions & 10 deletions beacon_node/http_api/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,13 @@ impl BeaconProposerCache {
chain: &BeaconChain<T>,
epoch: Epoch,
) -> Result<Vec<ProposerData>, warp::Rejection> {
let is_prior_to_genesis = chain.slot_clock.is_prior_to_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error("unable to read slot clock".to_string())
})?;
let current_epoch = if is_prior_to_genesis {
chain.spec.genesis_slot.epoch(T::EthSpec::slots_per_epoch())
} else {
chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?
};
let current_epoch = chain
.slot_clock
.now_or_genesis()
.ok_or_else(|| {
warp_utils::reject::custom_server_error("unable to read slot clock".to_string())
})?
.epoch(T::EthSpec::slots_per_epoch());

// Disallow requests that are outside the current epoch. This ensures the cache doesn't get
// washed-out with old values.
Expand Down
53 changes: 52 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use eth2::{
types::{self as api_types, ValidatorId},
StatusCode,
};
use eth2_libp2p::{NetworkGlobals, PubsubMessage};
use eth2_libp2p::{types::SyncState, NetworkGlobals, PubsubMessage};
use lighthouse_version::version_with_platform;
use network::NetworkMessage;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use state_id::StateId;
use state_processing::per_slot_processing;
use std::borrow::Cow;
Expand All @@ -45,6 +46,13 @@ use warp::Filter;
const API_PREFIX: &str = "eth";
const API_VERSION: &str = "v1";

/// If the node is within this many epochs from the head, we declare it to be synced regardless of
/// the network sync state.
///
/// This helps prevent attacks where nodes can convince us that we're syncing some non-existent
/// finalized head.
const SYNC_TOLERANCE_EPOCHS: u64 = 8;

/// A wrapper around all the items required to spawn the HTTP server.
///
/// The server will gracefully handle the case where any fields are `None`.
Expand Down Expand Up @@ -274,6 +282,43 @@ pub fn serve<T: BeaconChainTypes>(
}
});

let not_while_syncing_filter = warp::any()
.and(network_globals.clone())
.and(chain_filter.clone())
.and_then(
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| async move {
match *network_globals.sync_state.read() {
SyncState::SyncingFinalized { head_slot, .. } => {
let current_slot = chain
.slot_clock
.now_or_genesis()
.ok_or_else(|| {
warp_utils::reject::custom_server_error(
"unable to read slot clock".to_string(),
)
})?;

let tolerance = SYNC_TOLERANCE_EPOCHS * T::EthSpec::slots_per_epoch();

if head_slot + tolerance >= current_slot {
Ok(())
} else {
Err(warp_utils::reject::not_synced(format!(
"head slot is {}, current slot is {}",
head_slot, current_slot
)))
}
}
SyncState::SyncingHead { .. } => Ok(()),
SyncState::Synced => Ok(()),
SyncState::Stalled => Err(warp_utils::reject::not_synced(
"sync is stalled".to_string(),
)),
}
},
)
.untuple_one();

// Create a `warp` filter that provides access to the logger.
let log_filter = warp::any().map(move || ctx.log.clone());

Expand Down Expand Up @@ -1121,6 +1166,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attester"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::query::<api_types::ValidatorDutiesQuery>())
.and(chain_filter.clone())
.and_then(
Expand Down Expand Up @@ -1286,6 +1332,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("proposer"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(beacon_proposer_cache())
.and_then(
Expand All @@ -1307,6 +1354,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::path::param::<Slot>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::query::<api_types::ValidatorBlocksQuery>())
.and(chain_filter.clone())
.and_then(
Expand Down Expand Up @@ -1334,6 +1382,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestation_data"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAttestationDataQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAttestationDataQuery, chain: Arc<BeaconChain<T>>| {
Expand All @@ -1353,6 +1402,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("aggregate_attestation"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAggregateAttestationQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAggregateAttestationQuery, chain: Arc<BeaconChain<T>>| {
Expand All @@ -1377,6 +1427,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator"))
.and(warp::path("aggregate_and_proofs"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(warp::body::json())
.and(network_tx_filter.clone())
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use beacon_chain::{
use discv5::enr::{CombinedKey, EnrBuilder};
use environment::null_logger;
use eth2::{types::*, BeaconNodeHttpClient, Url};
use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, NetworkGlobals};
use eth2_libp2p::{
rpc::methods::MetaData,
types::{EnrBitfield, SyncState},
NetworkGlobals,
};
use http_api::{Config, Context};
use network::NetworkMessage;
use state_processing::per_slot_processing;
Expand Down Expand Up @@ -145,6 +149,8 @@ impl ApiTester {
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let network_globals = NetworkGlobals::new(enr, 42, 42, meta_data, vec![], &log);

*network_globals.sync_state.write() = SyncState::Synced;

let context = Arc::new(Context {
config: Config {
enabled: true,
Expand Down
10 changes: 10 additions & 0 deletions common/slot_clock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ pub trait SlotClock: Send + Sync + Sized {
/// Returns the slot at this present time.
fn now(&self) -> Option<Slot>;

/// Returns the slot at this present time if genesis has happened. Otherwise, returns the
/// genesis slot.
fn now_or_genesis(&self) -> Option<Slot> {
if self.is_prior_to_genesis()? {
Some(self.genesis_slot())
} else {
self.now()
}
}

/// Indicates if the current time is prior to genesis time.
///
/// Returns `None` if the system clock cannot be read.
Expand Down
12 changes: 12 additions & 0 deletions common/warp_utils/src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ pub fn object_invalid(msg: String) -> warp::reject::Rejection {
warp::reject::custom(ObjectInvalid(msg))
}

#[derive(Debug)]
pub struct NotSynced(pub String);

impl Reject for NotSynced {}

pub fn not_synced(msg: String) -> warp::reject::Rejection {
warp::reject::custom(NotSynced(msg))
}

/// This function receives a `Rejection` and tries to return a custom
/// value, otherwise simply passes the rejection along.
pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
Expand Down Expand Up @@ -138,6 +147,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
} else if let Some(e) = err.find::<crate::reject::ObjectInvalid>() {
code = StatusCode::BAD_REQUEST;
message = format!("BAD_REQUEST: Invalid object: {}", e.0);
} else if let Some(e) = err.find::<crate::reject::NotSynced>() {
code = StatusCode::SERVICE_UNAVAILABLE;
message = format!("SERVICE_UNAVAILABLE: beacon node is syncing: {}", e.0);
} else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "METHOD_NOT_ALLOWED".to_string();
Expand Down

0 comments on commit 5308fdd

Please sign in to comment.