Skip to content

Commit

Permalink
Server sent events (#1920)
Browse files Browse the repository at this point in the history
## Issue Addressed

Resolves #1434 (this is the last major feature in the standard spec. There are only a couple of places we may be off-spec due to recent spec changes or ongoing discussion)
Partly addresses #1669
 
## Proposed Changes

- remove the websocket server
- remove the `TeeEventHandler` and `NullEventHandler` 
- add server sent events according to the eth2 API spec

## Additional Info

This is according to the currently unmerged PR here: ethereum/beacon-APIs#117


Co-authored-by: realbigsean <[email protected]>
  • Loading branch information
realbigsean and realbigsean committed Dec 4, 2020
1 parent 2b5c0df commit fdfb81a
Show file tree
Hide file tree
Showing 28 changed files with 960 additions and 757 deletions.
167 changes: 49 additions & 118 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ members = [
"beacon_node/network",
"beacon_node/store",
"beacon_node/timer",
"beacon_node/websocket_server",

"boot_node",

Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ tree_hash = "0.1.1"
types = { path = "../../consensus/types" }
tokio = "0.3.2"
eth1 = { path = "../eth1" }
websocket_server = { path = "../websocket_server" }
futures = "0.3.7"
genesis = { path = "../genesis" }
integer-sqrt = "0.1.5"
Expand All @@ -56,7 +55,6 @@ bls = { path = "../../crypto/bls" }
safe_arith = { path = "../../consensus/safe_arith" }
fork_choice = { path = "../../consensus/fork_choice" }
task_executor = { path = "../../common/task_executor" }
bus = "2.2.3"
derivative = "2.1.1"
itertools = "0.9.0"
regex = "1.3.9"
Expand Down
153 changes: 107 additions & 46 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::block_verification::{
use crate::chain_config::ChainConfig;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::events::{EventHandler, EventKind};
use crate::events::ServerSentEventHandler;
use crate::head_tracker::HeadTracker;
use crate::migrate::BackgroundMigrator;
use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool};
Expand All @@ -27,6 +27,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
Expand Down Expand Up @@ -157,7 +158,6 @@ pub trait BeaconChainTypes: Send + Sync + 'static {
type SlotClock: slot_clock::SlotClock;
type Eth1Chain: Eth1ChainBackend<Self::EthSpec>;
type EthSpec: types::EthSpec;
type EventHandler: EventHandler<Self::EthSpec>;
}

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
Expand Down Expand Up @@ -214,8 +214,9 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub fork_choice: RwLock<
ForkChoice<BeaconForkChoiceStore<T::EthSpec, T::HotStore, T::ColdStore>, T::EthSpec>,
>,
/// A handler for events generated by the beacon chain.
pub event_handler: T::EventHandler,
/// A handler for events generated by the beacon chain. This is only initialized when the
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// A cache dedicated to block processing.
Expand Down Expand Up @@ -955,17 +956,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// aggregation bit set.
pub fn verify_unaggregated_attestation_for_gossip(
&self,
attestation: Attestation<T::EthSpec>,
unaggregated_attestation: Attestation<T::EthSpec>,
subnet_id: Option<SubnetId>,
) -> Result<VerifiedUnaggregatedAttestation<T>, AttestationError> {
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS);
let _timer =
metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);

VerifiedUnaggregatedAttestation::verify(attestation, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);
v
})
VerifiedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, self).map(
|v| {
// This method is called for API and gossip attestations, so this covers all unaggregated attestation events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attestation_subscribers() {
event_handler.register(EventKind::Attestation(v.attestation().clone()));
}
}
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);
v
},
)
}

/// Accepts some `SignedAggregateAndProof` from the network and attempts to verify it,
Expand All @@ -979,6 +988,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES);

VerifiedAggregatedAttestation::verify(signed_aggregate, self).map(|v| {
// This method is called for API and gossip attestations, so this covers all aggregated attestation events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attestation_subscribers() {
event_handler.register(EventKind::Attestation(v.attestation().clone()));
}
}
metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);
v
})
Expand Down Expand Up @@ -1222,11 +1237,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<ObservationOutcome<SignedVoluntaryExit>, Error> {
// NOTE: this could be more efficient if it avoided cloning the head state
let wall_clock_state = self.wall_clock_state()?;
Ok(self.observed_voluntary_exits.lock().verify_and_observe(
exit,
&wall_clock_state,
&self.spec,
)?)
Ok(self
.observed_voluntary_exits
.lock()
.verify_and_observe(exit, &wall_clock_state, &self.spec)
.map(|exit| {
// this method is called for both API and gossip exits, so this covers all exit events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_exit_subscribers() {
if let ObservationOutcome::New(exit) = exit.clone() {
event_handler.register(EventKind::VoluntaryExit(exit.into_inner()));
}
}
}
exit
})?)
}

/// Accept a pre-verified exit and queue it for inclusion in an appropriate block.
Expand Down Expand Up @@ -1510,11 +1535,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);

let _ = self.event_handler.register(EventKind::BeaconBlockImported {
block_root,
block: Box::new(block),
});

Ok(block_root)
}
// There was an error whilst attempting to verify and import the block. The block might
Expand All @@ -1525,12 +1545,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Beacon block processing error";
"error" => format!("{:?}", e),
);

let _ = self.event_handler.register(EventKind::BeaconBlockRejected {
reason: format!("Internal error: {:?}", e),
block: Box::new(block),
});

Err(BlockError::BeaconChainError(e))
}
// The block failed verification.
Expand All @@ -1540,12 +1554,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Beacon block rejected";
"reason" => other.to_string(),
);

let _ = self.event_handler.register(EventKind::BeaconBlockRejected {
reason: format!("Invalid block: {}", other),
block: Box::new(block),
});

Err(other)
}
}
Expand Down Expand Up @@ -1664,7 +1672,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network.");
shutdown_sender.try_send("Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint.")
.map_err(|err|BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?;
.map_err(|err| BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?;
return Err(BlockError::WeakSubjectivityConflict);
}
}
Expand Down Expand Up @@ -1745,6 +1753,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker
.register_block(block_root, parent_root, slot);

// send an event to the `events` endpoint after fully processing the block
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
slot,
block: block_root,
}));
}
}

metrics::stop_timer(db_write_timer);

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Expand Down Expand Up @@ -1993,7 +2011,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"previous_slot" => current_head.slot,
"new_head_parent" => format!("{}", new_head.beacon_block.parent_root()),
"new_head" => format!("{}", beacon_block_root),
"new_slot" => new_head.beacon_block.slot()
"new_slot" => new_head.beacon_block.slot(),
);
} else {
debug!(
Expand All @@ -2018,13 +2036,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}

if current_head.slot.epoch(T::EthSpec::slots_per_epoch())
let is_epoch_transition = current_head.slot.epoch(T::EthSpec::slots_per_epoch())
< new_head
.beacon_state
.slot
.epoch(T::EthSpec::slots_per_epoch())
|| is_reorg
{
.epoch(T::EthSpec::slots_per_epoch());

if is_epoch_transition || is_reorg {
self.persist_head_and_fork_choice()?;
self.op_pool.prune_attestations(self.epoch()?);
self.ingest_slashings_to_op_pool(&new_head.beacon_state);
Expand All @@ -2033,6 +2051,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);

// These fields are used for server-sent events
let state_root = new_head.beacon_state_root;
let head_slot = new_head.beacon_state.slot;
let target_epoch_start_slot = new_head
.beacon_state
.current_epoch()
.start_slot(T::EthSpec::slots_per_epoch());
let prev_target_epoch_start_slot = new_head
.beacon_state
.previous_epoch()
.start_slot(T::EthSpec::slots_per_epoch());

// Update the snapshot that stores the head of the chain at the time it received the
// block.
*self
Expand Down Expand Up @@ -2092,11 +2122,37 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.after_finalization(&head.beacon_state, new_finalized_state_root)?;
}

let _ = self.event_handler.register(EventKind::BeaconHeadChanged {
reorg: is_reorg,
previous_head_beacon_block_root: current_head.block_root,
current_head_beacon_block_root: beacon_block_root,
});
// Register a server-sent event if necessary
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_head_subscribers() {
if let Ok(Some(current_duty_dependent_root)) =
self.root_at_slot(target_epoch_start_slot - 1)
{
if let Ok(Some(previous_duty_dependent_root)) =
self.root_at_slot(prev_target_epoch_start_slot - 1)
{
event_handler.register(EventKind::Head(SseHead {
slot: head_slot,
block: beacon_block_root,
state: state_root,
current_duty_dependent_root,
previous_duty_dependent_root,
epoch_transition: is_epoch_transition,
}));
} else {
warn!(
self.log,
"Unable to find previous target root, cannot register head event"
);
}
} else {
warn!(
self.log,
"Unable to find current target root, cannot register head event"
);
}
}
}

Ok(())
}
Expand Down Expand Up @@ -2204,10 +2260,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker.clone(),
)?;

let _ = self.event_handler.register(EventKind::BeaconFinalization {
epoch: new_finalized_checkpoint.epoch,
root: new_finalized_checkpoint.root,
});
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_finalized_subscribers() {
event_handler.register(EventKind::FinalizedCheckpoint(SseFinalizedCheckpoint {
epoch: new_finalized_checkpoint.epoch,
block: new_finalized_checkpoint.root,
state: new_finalized_state_root,
}));
}
}

Ok(())
}
Expand Down
Loading

0 comments on commit fdfb81a

Please sign in to comment.