diff --git a/Cargo.lock b/Cargo.lock index 18e9e8b4792..07b0de21cf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,7 +2,7 @@ # It is not intended for manual editing. [[package]] name = "account_manager" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_utils", "bls", @@ -357,6 +357,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", + "slasher", "slog", "slog-term", "sloggers", @@ -373,7 +374,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "0.2.6" +version = "0.2.7" dependencies = [ "beacon_chain", "clap", @@ -395,6 +396,7 @@ dependencies = [ "node_test_rig", "rand 0.7.3", "serde", + "slasher", "slog", "slog-async", "slog-term", @@ -403,6 +405,16 @@ dependencies = [ "types", ] +[[package]] +name = "bincode" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30d3a39baa26f9651f17b375061f3233dde33424a8b72b0dbe93a68a0bc896d" +dependencies = [ + "byteorder", + "serde", +] + [[package]] name = "bitflags" version = "0.9.1" @@ -530,7 +542,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "0.2.6" +version = "0.2.7" dependencies = [ "beacon_node", "clap", @@ -747,6 +759,7 @@ dependencies = [ "serde", "serde_derive", "serde_yaml", + "slasher", "slog", "slog-async", "sloggers", @@ -2537,7 +2550,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "0.2.6" +version = "0.2.7" dependencies = [ "bls", "clap", @@ -2894,7 +2907,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_manager", "account_utils", @@ -2942,6 +2955,28 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" +[[package]] +name = "lmdb" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" +dependencies = [ + "bitflags 1.2.1", + "libc", + "lmdb-sys", +] + +[[package]] +name = "lmdb-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.3.4" @@ -4788,6 +4823,31 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +[[package]] +name = "slasher" +version = "0.1.0" +dependencies = [ + "bincode", + "blake2b_simd", + "byte-slice-cast", + "criterion", + "environment", + "eth2_ssz", + "lmdb", + "parking_lot 0.11.0", + "rand 0.7.3", + "safe_arith", + "serde", + "serde_derive", + "slog", + "slog-term", + "slot_clock", + "tempdir", + "tokio 0.2.22", + "tree_hash", + "types", +] + [[package]] name = "slashing_protection" version = "0.1.0" @@ -6036,7 +6096,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_utils", "bls", diff --git a/Cargo.toml b/Cargo.toml index 59bf507fa50..19a883e44fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,8 @@ members = [ "lighthouse", "lighthouse/environment", + "slasher", + "testing/simulator", "testing/ef_tests", "testing/eth1_test_rig", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 59c7cbb6e6d..26f9d0da454 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -40,3 +40,4 @@ serde = "1.0.110" clap_utils = { path = "../common/clap_utils" } hyper = "0.13.5" lighthouse_version = { path = "../common/lighthouse_version" } +slasher = { path = "../slasher" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index cc6bcb115f7..90b44ed976f 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -56,3 +56,4 @@ environment = { path = "../../lighthouse/environment" } bus = "2.2.3" derivative = "2.1.1" itertools = "0.9.0" +slasher = { path = "../../slasher" } diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index b81daa4bdb8..df6c0505ece 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -282,6 +282,61 @@ impl SignatureVerifiedAttestation for VerifiedUnaggregat } } +/// Information about invalid attestations which might still be slashable despite being invalid. +pub enum AttestationSlashInfo { + /// The attestation is invalid, but its signature wasn't checked. + SignatureNotChecked(Attestation, TErr), + /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. + SignatureNotCheckedIndexed(IndexedAttestation, TErr), + /// The attestation's signature is invalid, so it will never be slashable. + SignatureInvalid(TErr), + /// The signature is valid but the attestation is invalid in some other way. + SignatureValid(IndexedAttestation, TErr), +} + +fn process_slash_info( + slash_info: AttestationSlashInfo, + chain: &BeaconChain, +) -> Error { + use AttestationSlashInfo::*; + + if let Some(slasher) = chain.slasher.as_ref() { + let (indexed_attestation, err) = match slash_info { + // TODO(sproul): check signatures + // TODO: de-duplicate by attestation hash? + SignatureNotChecked(attestation, err) => { + match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + Ok((indexed, _)) => (indexed, err), + Err(e) => { + debug!( + chain.log, + "Unable to obtain indexed form of attestation for slasher"; + "attestation_root" => format!("{:?}", attestation.tree_hash_root()), + "error" => format!("{:?}", e) + ); + return err; + } + } + } + SignatureNotCheckedIndexed(indexed, err) => (indexed, err), + SignatureInvalid(e) => return e, + SignatureValid(indexed, err) => (indexed, err), + }; + + // Supply to slasher. + slasher.accept_attestation(indexed_attestation); + + err + } else { + match slash_info { + SignatureNotChecked(_, e) + | SignatureNotCheckedIndexed(_, e) + | SignatureInvalid(e) + | SignatureValid(_, e) => e, + } + } +} + impl VerifiedAggregatedAttestation { /// Returns `Ok(Self)` if the `signed_aggregate` is valid to be (re)published on the gossip /// network. @@ -289,6 +344,14 @@ impl VerifiedAggregatedAttestation { signed_aggregate: SignedAggregateAndProof, chain: &BeaconChain, ) -> Result { + Self::verify_slashable(signed_aggregate, chain) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + fn verify_early_checks( + signed_aggregate: &SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result { let attestation = &signed_aggregate.message.aggregate; // Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a @@ -338,37 +401,19 @@ impl VerifiedAggregatedAttestation { // Ensure that the attestation has participants. if attestation.aggregation_bits.is_zero() { - return Err(Error::EmptyAggregationBitfield); + Err(Error::EmptyAggregationBitfield) + } else { + Ok(attestation_root) } + } - let indexed_attestation = - map_attestation_committee(chain, attestation, |(committee, _)| { - // Note: this clones the signature which is known to be a relatively slow operation. - // - // Future optimizations should remove this clone. - let selection_proof = - SelectionProof::from(signed_aggregate.message.selection_proof.clone()); - - if !selection_proof - .is_aggregator(committee.committee.len(), &chain.spec) - .map_err(|e| Error::BeaconChainError(e.into()))? - { - return Err(Error::InvalidSelectionProof { aggregator_index }); - } - - // Ensure the aggregator is a member of the committee for which it is aggregating. - if !committee.committee.contains(&(aggregator_index as usize)) { - return Err(Error::AggregatorNotInCommittee { aggregator_index }); - } - - get_indexed_attestation(committee.committee, &attestation) - .map_err(|e| BeaconChainError::from(e).into()) - })?; - - // Ensure that all signatures are valid. - if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation)? { - return Err(Error::InvalidSignature); - } + fn verify_late_checks( + signed_aggregate: &SignedAggregateAndProof, + attestation_root: Hash256, + chain: &BeaconChain, + ) -> Result<(), Error> { + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; // Observe the valid attestation so we do not re-process it. // @@ -388,7 +433,7 @@ impl VerifiedAggregatedAttestation { // attestations processed at the same time could be published. if chain .observed_aggregators - .observe_validator(&attestation, aggregator_index as usize) + .observe_validator(attestation, aggregator_index as usize) .map_err(BeaconChainError::from)? { return Err(Error::PriorAttestationKnown { @@ -397,6 +442,68 @@ impl VerifiedAggregatedAttestation { }); } + Ok(()) + } + + // TODO(sproul): naming + pub fn verify_slashable( + signed_aggregate: SignedAggregateAndProof, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + let attestation = &signed_aggregate.message.aggregate; + let aggregator_index = signed_aggregate.message.aggregator_index; + let attestation_root = match Self::verify_early_checks(&signed_aggregate, chain) { + Ok(root) => root, + Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + }; + + let indexed_attestation = + match map_attestation_committee(chain, attestation, |(committee, _)| { + // Note: this clones the signature which is known to be a relatively slow operation. + // + // Future optimizations should remove this clone. + let selection_proof = + SelectionProof::from(signed_aggregate.message.selection_proof.clone()); + + if !selection_proof + .is_aggregator(committee.committee.len(), &chain.spec) + .map_err(|e| Error::BeaconChainError(e.into()))? + { + return Err(Error::InvalidSelectionProof { aggregator_index }); + } + + // Ensure the aggregator is a member of the committee for which it is aggregating. + if !committee.committee.contains(&(aggregator_index as usize)) { + return Err(Error::AggregatorNotInCommittee { aggregator_index }); + } + + get_indexed_attestation(committee.committee, attestation) + .map_err(|e| BeaconChainError::from(e).into()) + }) { + Ok(indexed_attestation) => indexed_attestation, + Err(e) => return Err(SignatureNotChecked(signed_aggregate.message.aggregate, e)), + }; + + // Ensure that all signatures are valid. + if let Err(e) = + verify_signed_aggregate_signatures(chain, &signed_aggregate, &indexed_attestation) + .and_then(|is_valid| { + if !is_valid { + Err(Error::InvalidSignature) + } else { + Ok(()) + } + }) + { + return Err(SignatureInvalid(e)); + } + + if let Err(e) = Self::verify_late_checks(&signed_aggregate, attestation_root, chain) { + return Err(SignatureValid(indexed_attestation, e)); + } + Ok(VerifiedAggregatedAttestation { signed_aggregate, indexed_attestation, @@ -415,16 +522,10 @@ impl VerifiedAggregatedAttestation { } impl VerifiedUnaggregatedAttestation { - /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip - /// network. - /// - /// `subnet_id` is the subnet from which we received this attestation. This function will - /// verify that it was received on the correct subnet. - pub fn verify( - attestation: Attestation, - subnet_id: SubnetId, + pub fn verify_early_checks( + attestation: &Attestation, chain: &BeaconChain, - ) -> Result { + ) -> Result<(), Error> { // Ensure attestation is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (within a // MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance). // @@ -444,9 +545,16 @@ impl VerifiedUnaggregatedAttestation { // Enforce a maximum skip distance for unaggregated attestations. verify_head_block_is_known(chain, &attestation, chain.config.import_max_skip_slots)?; - let (indexed_attestation, committees_per_slot) = - obtain_indexed_attestation_and_committees_per_slot(chain, &attestation)?; + Ok(()) + } + pub fn verify_middle_checks( + attestation: &Attestation, + indexed_attestation: &IndexedAttestation, + committees_per_slot: u64, + subnet_id: SubnetId, + chain: &BeaconChain, + ) -> Result { let expected_subnet_id = SubnetId::compute_subnet_for_attestation_data::( &indexed_attestation.data, committees_per_slot, @@ -482,9 +590,14 @@ impl VerifiedUnaggregatedAttestation { }); } - // The aggregate signature of the attestation is valid. - verify_attestation_signature(chain, &indexed_attestation)?; + Ok(validator_index) + } + fn verify_late_checks( + attestation: &Attestation, + validator_index: u64, + chain: &BeaconChain, + ) -> Result<(), Error> { // Now that the attestation has been fully verified, store that we have received a valid // attestation from this validator. // @@ -501,6 +614,61 @@ impl VerifiedUnaggregatedAttestation { epoch: attestation.data.target.epoch, }); } + Ok(()) + } + + /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip + /// network. + /// + /// `subnet_id` is the subnet from which we received this attestation. This function will + /// verify that it was received on the correct subnet. + pub fn verify( + attestation: Attestation, + subnet_id: SubnetId, + chain: &BeaconChain, + ) -> Result { + Self::verify_slashable(attestation, subnet_id, chain) + .map_err(|slash_info| process_slash_info(slash_info, chain)) + } + + pub fn verify_slashable( + attestation: Attestation, + subnet_id: SubnetId, + chain: &BeaconChain, + ) -> Result> { + use AttestationSlashInfo::*; + + if let Err(e) = Self::verify_early_checks(&attestation, chain) { + return Err(SignatureNotChecked(attestation, e)); + } + + let (indexed_attestation, committees_per_slot) = + match obtain_indexed_attestation_and_committees_per_slot(chain, &attestation) { + Ok(x) => x, + Err(e) => { + return Err(SignatureNotChecked(attestation, e)); + } + }; + + let validator_index = match Self::verify_middle_checks( + &attestation, + &indexed_attestation, + committees_per_slot, + subnet_id, + chain, + ) { + Ok(idx) => idx, + Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), + }; + + // The aggregate signature of the attestation is valid. + if let Err(e) = verify_attestation_signature(chain, &indexed_attestation) { + return Err(SignatureInvalid(e)); + } + + if let Err(e) = Self::verify_late_checks(&attestation, validator_index, chain) { + return Err(SignatureValid(indexed_attestation, e)); + } Ok(Self { attestation, diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ef94274fabb..aa508a27d9a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -31,12 +31,14 @@ use fork_choice::ForkChoice; use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; +use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use state_processing::{ common::get_indexed_attestation, per_block_processing, - per_block_processing::errors::AttestationValidationError, per_slot_processing, - BlockSignatureStrategy, SigVerifiedOp, + per_block_processing::errors::AttestationValidationError, + per_block_processing::verify_attester_slashing, per_slot_processing, BlockSignatureStrategy, + SigVerifiedOp, VerifySignatures, }; use std::borrow::Cow; use std::cmp::Ordering; @@ -225,6 +227,8 @@ pub struct BeaconChain { pub(crate) log: Logger, /// Arbitrary bytes included in the blocks. pub(crate) graffiti: Graffiti, + /// Optional slasher. + pub(crate) slasher: Option>>, } type BeaconBlockAndState = (BeaconBlock, BeaconState); @@ -1010,6 +1014,38 @@ impl BeaconChain { Ok(signed_aggregate) } + fn ingest_slashings_to_op_pool(&self, state: &BeaconState) { + if let Some(slasher) = self.slasher.as_ref() { + let slashings = slasher.get_attester_slashings(); + debug!(self.log, "Ingesting {} slashings", slashings.len()); + for slashing in slashings { + if let Err(e) = + verify_attester_slashing(state, &slashing, VerifySignatures::True, &self.spec) + { + error!( + self.log, + "Slashing from slasher failed verification"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + continue; + } + + // FIXME(sproul): remove `trust_me` + if let Err(e) = + self.import_attester_slashing(SigVerifiedOp::trust_me(slashing.clone())) + { + error!( + self.log, + "Slashing from slasher is invalid"; + "error" => format!("{:?}", e), + "slashing" => format!("{:?}", slashing), + ); + } + } + } + } + /// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`. /// /// The `target_epoch` argument determines which shuffling to check compatibility with, it @@ -1630,6 +1666,7 @@ impl BeaconChain { state.latest_block_header.canonical_root() }; + self.ingest_slashings_to_op_pool(&state); let (proposer_slashings, attester_slashings) = self.op_pool.get_slashings(&state); let eth1_data = eth1_chain.eth1_data_for_block_production(&state, &self.spec)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 6ff94cfaad9..7186c36e1ca 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -20,6 +20,7 @@ use eth1::Config as Eth1Config; use fork_choice::ForkChoice; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; +use slasher::Slasher; use slog::{info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; @@ -115,6 +116,7 @@ pub struct BeaconChainBuilder { disabled_forks: Vec, log: Option, graffiti: Graffiti, + slasher: Option>>, } impl @@ -162,6 +164,7 @@ where chain_config: ChainConfig::default(), log: None, graffiti: Graffiti::default(), + slasher: None, } } @@ -197,6 +200,12 @@ where self } + /// Sets the slasher. + pub fn slasher(mut self, slasher: Arc>) -> Self { + self.slasher = Some(slasher); + self + } + /// Sets the logger. /// /// Should generally be called early in the build chain. @@ -550,6 +559,7 @@ where disabled_forks: self.disabled_forks, log: log.clone(), graffiti: self.graffiti, + slasher: self.slasher.clone(), }; let head = beacon_chain diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index de6f7e59d76..773768b72d0 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -41,3 +41,4 @@ lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } time = "0.2.16" bus = "2.2.3" +slasher = { path = "../../slasher" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index d9edbe1d2a6..35bdf0beda4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -18,6 +18,8 @@ use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; use parking_lot::Mutex; +use slasher::Slasher; +use slasher::SlasherServer; use slog::info; use ssz::Decode; use std::net::SocketAddr; @@ -63,6 +65,7 @@ pub struct ClientBuilder { network_send: Option>>, http_listen_addr: Option, websocket_listen_addr: Option, + slasher: Option>>, eth_spec_instance: T::EthSpec, } @@ -105,6 +108,7 @@ where network_send: None, http_listen_addr: None, websocket_listen_addr: None, + slasher: None, eth_spec_instance, } } @@ -121,6 +125,11 @@ where self } + pub fn slasher(mut self, slasher: Arc>) -> Self { + self.slasher = Some(slasher); + self + } + /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// called later in order to actually instantiate the `BeaconChain`. pub async fn beacon_chain_builder( @@ -158,6 +167,12 @@ where .disabled_forks(disabled_forks) .graffiti(graffiti); + let builder = if let Some(slasher) = self.slasher.clone() { + builder.slasher(slasher) + } else { + builder + }; + let chain_exists = builder .store_contains_beacon_chain() .unwrap_or_else(|_| false); @@ -331,6 +346,24 @@ where Ok(self) } + pub fn slasher_server(self) -> Result { + let context = self + .runtime_context + .as_ref() + .ok_or_else(|| "slasher requires a runtime_context")? + .service_context("slasher_server_ctxt".into()); + let slasher = self + .slasher + .clone() + .ok_or_else(|| "slasher server requires a slasher")?; + let slot_clock = self + .slot_clock + .clone() + .ok_or_else(|| "slasher server requires a slot clock")?; + SlasherServer::new(slasher, slot_clock, &context.executor); + Ok(self) + } + /// Immediately starts the service that periodically logs information each slot. pub fn notifier(self) -> Result { let context = self diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 19088e785b5..dbe0d9af126 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -67,6 +67,7 @@ pub struct Config { pub chain: beacon_chain::ChainConfig, pub websocket_server: websocket_server::Config, pub eth1: eth1::Config, + pub slasher: Option, } impl Default for Config { @@ -88,6 +89,7 @@ impl Default for Config { eth1: <_>::default(), disabled_forks: Vec::new(), graffiti: Graffiti::default(), + slasher: None, } } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6caa8acd9b2..8cc814a84ab 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -261,5 +261,25 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("NUM_SLOTS") .takes_value(true) .default_value("700") + ) + /* + * Slasher. + */ + .arg( + Arg::with_name("slasher") + .long("slasher") + .help( + "Run a slasher alongside the beacon node [EXPERIMENTAL]." + ) + .takes_value(false) + ) + .arg( + Arg::with_name("slasher-dir") + .long("slasher-dir") + .help( + "Set the slasher's database directory." + ) + .value_name("DIR") + .takes_value(true) ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f9abfca6aeb..5c7925e34c8 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -261,6 +261,15 @@ pub fn get_config( }; } + if cli_args.is_present("slasher") { + let slasher_dir = if let Some(slasher_dir) = cli_args.value_of("slasher-dir") { + PathBuf::from(slasher_dir) + } else { + client_config.data_dir.join("slasher_db") + }; + client_config.slasher = Some(slasher::Config::new(slasher_dir)); + } + Ok(client_config) } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index cc143c44241..124e67fdf2b 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -19,8 +19,10 @@ use beacon_chain::{ use clap::ArgMatches; use config::get_config; use environment::RuntimeContext; +use slasher::Slasher; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. @@ -90,6 +92,16 @@ impl ProductionBeaconNode { .disk_store(&db_path, &freezer_db_path_res?, store_config)? .background_migrator()?; + let builder = if let Some(slasher_config) = client_config.slasher.clone() { + let slasher = Arc::new( + Slasher::open(slasher_config, log.new(slog::o!("service" => "slasher"))) + .map_err(|e| format!("Slasher open error: {:?}", e))?, + ); + builder.slasher(slasher) + } else { + builder + }; + let builder = builder .beacon_chain_builder(client_genesis, client_config_1) .await?; @@ -136,6 +148,12 @@ impl ProductionBeaconNode { builder }; + let builder = if client_config.slasher.is_some() { + builder.slasher_server()? + } else { + builder + }; + Ok(Self(builder.build())) } diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index 6cc66aa814b..499802241fa 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -17,6 +17,10 @@ use types::{ pub struct SigVerifiedOp(T); impl SigVerifiedOp { + pub fn trust_me(t: T) -> Self { + SigVerifiedOp(t) + } + pub fn into_inner(self) -> T { self.0 } diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 7ad191ea919..06e44161f3e 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -140,16 +140,10 @@ fn main() { Builder::from_env(Env::default()).init(); } - macro_rules! run_with_spec { - ($env_builder: expr) => { - run($env_builder, &matches) - }; - } - let result = match matches.value_of("spec") { - Some("minimal") => run_with_spec!(EnvironmentBuilder::minimal()), - Some("mainnet") => run_with_spec!(EnvironmentBuilder::mainnet()), - Some("interop") => run_with_spec!(EnvironmentBuilder::interop()), + Some("minimal") => run(EnvironmentBuilder::minimal(), &matches), + Some("mainnet") => run(EnvironmentBuilder::mainnet(), &matches), + Some("interop") => run(EnvironmentBuilder::interop(), &matches), spec => { // This path should be unreachable due to slog having a `default_value` unreachable!("Unknown spec configuration: {:?}", spec); diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml new file mode 100644 index 00000000000..a0e96e3ecd2 --- /dev/null +++ b/slasher/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "slasher" +version = "0.1.0" +authors = ["Michael Sproul "] +edition = "2018" + +[dependencies] +slot_clock = { path = "../common/slot_clock" } +bincode = "1.3.1" +blake2b_simd = "0.5.10" +byte-slice-cast = "0.3.5" +environment = { path = "../lighthouse/environment" } +eth2_ssz = { path = "../consensus/ssz" } +lmdb = "0.8" +parking_lot = "0.11.0" +rand = "0.7" +safe_arith = { path = "../consensus/safe_arith" } +serde = "1.0" +serde_derive = "1.0" +slog = "2.5.2" +tokio = { version = "0.2.21", features = ["full"] } +tree_hash = { path = "../consensus/tree_hash" } +types = { path = "../consensus/types" } + +[dev-dependencies] +criterion = "0.3" +tempdir = "0.3.7" +slog-term = "2.6.0" + +[[bench]] +name = "blake2b" +harness = false diff --git a/slasher/benches/blake2b.rs b/slasher/benches/blake2b.rs new file mode 100644 index 00000000000..8bdfdc6db3c --- /dev/null +++ b/slasher/benches/blake2b.rs @@ -0,0 +1,32 @@ +use blake2b_simd::{Hash, Params}; +use byte_slice_cast::AsByteSlice; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; + +const CHUNK_SIZE: usize = 2048; +type Chunk = [u16; CHUNK_SIZE]; + +fn blake2b(data: &Chunk) -> Hash { + let mut params = Params::new(); + params.hash_length(16); + params.hash(data.as_byte_slice()) +} + +fn make_random_chunk() -> Chunk { + let mut chunk = [0; CHUNK_SIZE]; + thread_rng().fill(&mut chunk[..]); + chunk +} + +pub fn uniform_chunk(c: &mut Criterion) { + let chunk = [33; CHUNK_SIZE]; + c.bench_function("uniform_chunk", |b| b.iter(|| blake2b(&black_box(chunk)))); +} + +pub fn random_chunk(c: &mut Criterion) { + let chunk = make_random_chunk(); + c.bench_function("random_chunk", |b| b.iter(|| blake2b(&black_box(chunk)))); +} + +criterion_group!(benches, uniform_chunk, random_chunk); +criterion_main!(benches); diff --git a/slasher/src/array.rs b/slasher/src/array.rs new file mode 100644 index 00000000000..c2d1a03f966 --- /dev/null +++ b/slasher/src/array.rs @@ -0,0 +1,513 @@ +use crate::{Config, Error, SlasherDB, SlashingStatus}; +use lmdb::{RwTransaction, Transaction}; +use safe_arith::SafeArith; +use serde_derive::{Deserialize, Serialize}; +use std::collections::{btree_map::Entry, BTreeMap}; +use std::convert::TryFrom; +use std::sync::Arc; +use types::{AttesterSlashing, Epoch, EthSpec, IndexedAttestation}; + +pub const MAX_DISTANCE: u16 = u16::MAX; + +/// Terminology: +/// +/// Let +/// N = config.history_length +/// C = config.chunk_size +/// K = config.validator_chunk_size +/// +/// Then +/// +/// `chunk_index` in [0..N/C) is the column of a chunk in the 2D matrix +/// `validator_chunk_index` in [0..N/K) is the row of a chunk in the 2D matrix +/// `chunk_offset` in [0..C) is the horizontal (epoch) offset of a value within a 2D chunk +/// `validator_offset` in [0..K) is the vertical (validator) offset of a value within a 2D chunk +#[derive(Debug, Serialize, Deserialize)] +pub struct Chunk { + data: Vec, +} + +impl Chunk { + // TODO: write tests for epochs greater than length + pub fn get_target( + &self, + validator_index: u64, + epoch: Epoch, + config: &Config, + ) -> Result { + assert_eq!( + self.data.len(), + config.chunk_size * config.validator_chunk_size + ); + let validator_offset = config.validator_offset(validator_index); + let chunk_offset = config.chunk_offset(epoch); + let cell_index = config.cell_index(validator_offset, chunk_offset); + self.data + .get(cell_index) + .map(|distance| epoch + u64::from(*distance)) + .ok_or_else(|| Error::ChunkIndexOutOfBounds(cell_index)) + } + + pub fn set_target( + &mut self, + validator_index: u64, + epoch: Epoch, + target_epoch: Epoch, + config: &Config, + ) -> Result<(), Error> { + let validator_offset = config.validator_offset(validator_index); + let chunk_offset = config.chunk_offset(epoch); + let cell_index = config.cell_index(validator_offset, chunk_offset); + + let cell = self + .data + .get_mut(cell_index) + .ok_or_else(|| Error::ChunkIndexOutOfBounds(cell_index))?; + + *cell = Self::epoch_distance(target_epoch, epoch)?; + Ok(()) + } + + /// Compute the distance (difference) between two epochs. + /// + /// Error if the distance is greater than or equal to `MAX_DISTANCE`. + pub fn epoch_distance(epoch: Epoch, base_epoch: Epoch) -> Result { + let distance_u64 = epoch + .as_u64() + .checked_sub(base_epoch.as_u64()) + .ok_or(Error::DistanceCalculationOverflow)?; + + let distance = u16::try_from(distance_u64).map_err(|_| Error::DistanceTooLarge)?; + if distance < MAX_DISTANCE { + Ok(distance) + } else { + Err(Error::DistanceTooLarge) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct MinTargetChunk { + chunk: Chunk, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct MaxTargetChunk { + chunk: Chunk, +} + +pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwned { + fn empty(config: &Config) -> Self; + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error>; + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result; + + fn first_start_epoch(source_epoch: Epoch, current_epoch: Epoch) -> Option; + + fn next_chunk_index_and_start_epoch( + chunk_index: usize, + start_epoch: Epoch, + config: &Config, + ) -> Result<(usize, Epoch), Error>; + + fn select_db(db: &SlasherDB) -> lmdb::Database; + + fn load( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, + ) -> Result, Error> { + let disk_key = config.disk_key(validator_chunk_index, chunk_index); + match txn.get(Self::select_db(db), &disk_key.to_be_bytes()) { + Ok(chunk_bytes) => Ok(Some(bincode::deserialize(chunk_bytes)?)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e.into()), + } + } + + fn store( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, + ) -> Result<(), Error> { + let disk_key = config.disk_key(validator_chunk_index, chunk_index); + let value = bincode::serialize(self)?; + txn.put( + Self::select_db(db), + &disk_key.to_be_bytes(), + &value, + SlasherDB::::write_flags(), + )?; + Ok(()) + } +} + +impl TargetArrayChunk for MinTargetChunk { + fn empty(config: &Config) -> Self { + MinTargetChunk { + chunk: Chunk { + data: vec![MAX_DISTANCE; config.chunk_size * config.validator_chunk_size], + }, + } + } + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error> { + let min_target = + self.chunk + .get_target(validator_index, attestation.data.source.epoch, config)?; + if attestation.data.target.epoch > min_target { + let attestation = db + .get_attestation_for_validator(txn, validator_index, min_target)? + .ok_or_else(|| Error::MissingAttesterRecord { + validator_index, + target_epoch: min_target, + })?; + Ok(SlashingStatus::SurroundsExisting(Box::new(attestation))) + } else { + Ok(SlashingStatus::NotSlashable) + } + } + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result { + let min_epoch = Epoch::from( + current_epoch + .as_usize() + .saturating_sub(config.history_length - 1), + ); + let mut epoch = start_epoch; + while config.chunk_index(epoch) == chunk_index { + if new_target_epoch < self.chunk.get_target(validator_index, epoch, config)? { + self.chunk + .set_target(validator_index, epoch, new_target_epoch, config)?; + } else { + // We can stop. + return Ok(false); + } + if epoch == min_epoch { + return Ok(false); + } + epoch -= 1; + } + // Continue to the next chunk. + assert_ne!(chunk_index, 0); + Ok(true) + } + + fn first_start_epoch(source_epoch: Epoch, _current_epoch: Epoch) -> Option { + if source_epoch > 0 { + Some(source_epoch - 1) + } else { + None + } + } + + fn next_chunk_index_and_start_epoch( + chunk_index: usize, + start_epoch: Epoch, + config: &Config, + ) -> Result<(usize, Epoch), Error> { + let chunk_size = config.chunk_size as u64; + Ok(( + chunk_index.safe_sub(1)?, + start_epoch / chunk_size * chunk_size - 1, + )) + } + + fn select_db(db: &SlasherDB) -> lmdb::Database { + db.min_targets_db + } +} + +impl TargetArrayChunk for MaxTargetChunk { + fn empty(config: &Config) -> Self { + MaxTargetChunk { + chunk: Chunk { + data: vec![0; config.chunk_size * config.validator_chunk_size], + }, + } + } + + fn check_slashable( + &self, + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + config: &Config, + ) -> Result, Error> { + let max_target = + self.chunk + .get_target(validator_index, attestation.data.source.epoch, config)?; + if attestation.data.target.epoch < max_target { + let attestation = db + .get_attestation_for_validator(txn, validator_index, max_target)? + .ok_or_else(|| Error::MissingAttesterRecord { + validator_index, + target_epoch: max_target, + })?; + Ok(SlashingStatus::SurroundedByExisting(Box::new(attestation))) + } else { + Ok(SlashingStatus::NotSlashable) + } + } + + fn update( + &mut self, + chunk_index: usize, + validator_index: u64, + start_epoch: Epoch, + new_target_epoch: Epoch, + current_epoch: Epoch, + config: &Config, + ) -> Result { + let mut epoch = start_epoch; + while config.chunk_index(epoch) == chunk_index { + if new_target_epoch > self.chunk.get_target(validator_index, epoch, config)? { + self.chunk + .set_target(validator_index, epoch, new_target_epoch, config)?; + } else { + // We can stop. + return Ok(false); + } + if epoch == current_epoch { + return Ok(false); + } + epoch += 1; + } + // Continue to the next chunk. + Ok(true) + } + + fn first_start_epoch(source_epoch: Epoch, current_epoch: Epoch) -> Option { + if source_epoch < current_epoch { + Some(source_epoch + 1) + } else { + None + } + } + + // Go to next chunk, and first epoch of that chunk + fn next_chunk_index_and_start_epoch( + chunk_index: usize, + start_epoch: Epoch, + config: &Config, + ) -> Result<(usize, Epoch), Error> { + let chunk_size = config.chunk_size as u64; + Ok(( + chunk_index.safe_add(1)?, + (start_epoch / chunk_size + 1) * chunk_size, + )) + } + + fn select_db(db: &SlasherDB) -> lmdb::Database { + db.max_targets_db + } +} + +pub fn get_chunk_for_update<'a, E: EthSpec, T: TargetArrayChunk>( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + updated_chunks: &'a mut BTreeMap, + validator_chunk_index: usize, + chunk_index: usize, + config: &Config, +) -> Result<&'a mut T, Error> { + Ok(match updated_chunks.entry(chunk_index) { + Entry::Occupied(occupied) => occupied.into_mut(), + Entry::Vacant(vacant) => { + let chunk = if let Some(disk_chunk) = + T::load(db, txn, validator_chunk_index, chunk_index, config)? + { + disk_chunk + } else { + T::empty(config) + }; + vacant.insert(chunk) + } + }) +} + +pub fn apply_attestation_for_validator( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + updated_chunks: &mut BTreeMap, + validator_chunk_index: usize, + validator_index: u64, + attestation: &IndexedAttestation, + current_epoch: Epoch, + config: &Config, +) -> Result, Error> { + let mut chunk_index = config.chunk_index(attestation.data.source.epoch); + let mut current_chunk = get_chunk_for_update( + db, + txn, + updated_chunks, + validator_chunk_index, + chunk_index, + config, + )?; + + let slashing_status = + current_chunk.check_slashable(db, txn, validator_index, attestation, config)?; + + // TODO: consider removing this early return and updating the array + if slashing_status != SlashingStatus::NotSlashable { + return Ok(slashing_status); + } + + let mut start_epoch = if let Some(start_epoch) = + T::first_start_epoch(attestation.data.source.epoch, current_epoch) + { + start_epoch + } else { + return Ok(slashing_status); + }; + chunk_index = config.chunk_index(start_epoch); + + loop { + current_chunk = get_chunk_for_update( + db, + txn, + updated_chunks, + validator_chunk_index, + chunk_index, + config, + )?; + let keep_going = current_chunk.update( + chunk_index, + validator_index, + start_epoch, + attestation.data.target.epoch, + current_epoch, + config, + )?; + if !keep_going { + break; + } + + let (next_chunk_index, next_start_epoch) = + T::next_chunk_index_and_start_epoch(chunk_index, start_epoch, config)?; + chunk_index = next_chunk_index; + start_epoch = next_start_epoch; + } + + Ok(SlashingStatus::NotSlashable) +} + +pub fn update( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + batch: Vec>>, + current_epoch: Epoch, + config: &Config, +) -> Result>, Error> { + // Split the batch up into horizontal segments. + // Map chunk indexes in the range `0..self.config.chunk_size` to attestations + // for those chunks. + let mut chunk_attestations = BTreeMap::new(); + for attestation in batch { + chunk_attestations + .entry(config.chunk_index(attestation.data.source.epoch)) + .or_insert_with(Vec::new) + .push(attestation); + } + + let mut slashings = update_array::<_, MinTargetChunk>( + db, + txn, + validator_chunk_index, + &chunk_attestations, + current_epoch, + config, + )?; + slashings.extend(update_array::<_, MaxTargetChunk>( + db, + txn, + validator_chunk_index, + &chunk_attestations, + current_epoch, + config, + )?); + Ok(slashings) +} + +pub fn update_array( + db: &SlasherDB, + txn: &mut RwTransaction<'_>, + validator_chunk_index: usize, + chunk_attestations: &BTreeMap>>>, + current_epoch: Epoch, + config: &Config, +) -> Result>, Error> { + let mut slashings = vec![]; + // Map from chunk index to updated chunk at that index. + let mut updated_chunks = BTreeMap::new(); + + for attestations in chunk_attestations.values() { + for attestation in attestations { + for validator_index in + config.attesting_validators_for_chunk(attestation, validator_chunk_index) + { + let slashing_status = apply_attestation_for_validator::( + db, + txn, + &mut updated_chunks, + validator_chunk_index, + validator_index, + attestation, + current_epoch, + config, + )?; + if let Some(slashing) = slashing_status.into_slashing(attestation) { + slashings.push(slashing); + } + } + } + } + + // Store chunks on disk. + for (chunk_index, chunk) in updated_chunks { + chunk.store(db, txn, validator_chunk_index, chunk_index, config)?; + } + + Ok(slashings) +} diff --git a/slasher/src/attestation_queue.rs b/slasher/src/attestation_queue.rs new file mode 100644 index 00000000000..5ed9adb3328 --- /dev/null +++ b/slasher/src/attestation_queue.rs @@ -0,0 +1,90 @@ +use parking_lot::{Mutex, RwLock}; +use std::collections::BTreeSet; +use std::sync::Arc; +use types::{EthSpec, IndexedAttestation}; + +/// Staging area for attestations received from the network. +/// +/// To be added to the database in batches, for efficiency and to prevent data races. +#[derive(Debug)] +pub struct AttestationQueue { + /// All attestations (unique) for storage on disk. + attestations_to_store: Mutex>>>, + /// Attestations group by validator index range. + pub(crate) subqueues: RwLock>>, + pub(crate) validators_per_chunk: usize, +} + +/// A queue of attestations for a range of validator indices. +#[derive(Debug)] +pub struct SubQueue { + pub(crate) attestations: Mutex>>>, +} + +impl SubQueue { + pub fn new() -> Self { + SubQueue { + attestations: Mutex::new(vec![]), + } + } + + /// Empty the queue. + pub fn take(&self) -> Vec>> { + std::mem::replace(&mut self.attestations.lock(), vec![]) + } + + pub fn len(&self) -> usize { + self.attestations.lock().len() + } +} + +impl AttestationQueue { + pub fn new(validators_per_chunk: usize) -> Self { + Self { + attestations_to_store: Mutex::new(vec![]), + subqueues: RwLock::new(vec![]), + validators_per_chunk, + } + } + + /// Add an attestation to all relevant queues, creating them if necessary. + pub fn queue(&self, attestation: IndexedAttestation) { + let attestation = Arc::new(attestation); + + self.attestations_to_store.lock().push(attestation.clone()); + + let subqueue_ids = attestation + .attesting_indices + .iter() + .map(|validator_index| *validator_index as usize / self.validators_per_chunk) + .collect::>(); + + if let Some(max_subqueue_id) = subqueue_ids.iter().max() { + if *max_subqueue_id >= self.subqueues.read().len() { + self.subqueues + .write() + .resize_with(max_subqueue_id + 1, SubQueue::new); + } + } + + for subqueue_id in subqueue_ids { + let subqueues_lock = self.subqueues.read(); + subqueues_lock[subqueue_id] + .attestations + .lock() + .push(attestation.clone()); + } + } + + pub fn get_attestations_to_store(&self) -> Vec>> { + std::mem::replace(&mut self.attestations_to_store.lock(), vec![]) + } + + /// Return `(num_queues, num_attestations)`. + pub fn stats(&self) -> (usize, usize) { + let subqueues = self.subqueues.read(); + let num_queues = subqueues.len(); + let num_attestations = subqueues.iter().map(SubQueue::len).sum(); + (num_queues, num_attestations) + } +} diff --git a/slasher/src/config.rs b/slasher/src/config.rs new file mode 100644 index 00000000000..d41c78c60b9 --- /dev/null +++ b/slasher/src/config.rs @@ -0,0 +1,79 @@ +use crate::Error; +use serde_derive::{Deserialize, Serialize}; +use std::path::PathBuf; +use types::{Epoch, EthSpec, IndexedAttestation}; + +pub const DEFAULT_CHUNK_SIZE: usize = 16; +pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; +pub const DEFAULT_HISTORY_LENGTH: usize = 54_000; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub database_path: PathBuf, + pub chunk_size: usize, + pub validator_chunk_size: usize, + /// Number of epochs of history to keep. + pub history_length: usize, +} + +impl Config { + pub fn new(database_path: PathBuf) -> Self { + Self { + database_path, + chunk_size: DEFAULT_CHUNK_SIZE, + validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE, + history_length: DEFAULT_HISTORY_LENGTH, + } + } + + pub fn validate(&self) -> Result<(), Error> { + if self.history_length % self.chunk_size != 0 { + Err(Error::ConfigInvalidChunkSize { + chunk_size: self.chunk_size, + history_length: self.history_length, + }) + } else { + Ok(()) + } + } + + pub fn chunk_index(&self, epoch: Epoch) -> usize { + (epoch.as_usize() % self.history_length) / self.chunk_size + } + + pub fn validator_chunk_index(&self, validator_index: u64) -> usize { + validator_index as usize / self.validator_chunk_size + } + + pub fn chunk_offset(&self, epoch: Epoch) -> usize { + epoch.as_usize() % self.chunk_size + } + + pub fn validator_offset(&self, validator_index: u64) -> usize { + validator_index as usize % self.validator_chunk_size + } + + /// Map the validator and epoch chunk indexes into a single value for use as a database key. + pub fn disk_key(&self, validator_chunk_index: usize, chunk_index: usize) -> usize { + let width = self.history_length / self.chunk_size; + validator_chunk_index * width + chunk_index + } + + /// Map the validator and epoch offsets into an index for `Chunk::data`. + pub fn cell_index(&self, validator_offset: usize, chunk_offset: usize) -> usize { + validator_offset * self.chunk_size + chunk_offset + } + + /// Iterate over the attesting indices which belong to the `validator_chunk_index` chunk. + pub fn attesting_validators_for_chunk<'a, E: EthSpec>( + &'a self, + attestation: &'a IndexedAttestation, + validator_chunk_index: usize, + ) -> impl Iterator + 'a { + attestation + .attesting_indices + .iter() + .filter(move |v| self.validator_chunk_index(**v) == validator_chunk_index) + .copied() + } +} diff --git a/slasher/src/database.rs b/slasher/src/database.rs new file mode 100644 index 00000000000..3d787b71e8b --- /dev/null +++ b/slasher/src/database.rs @@ -0,0 +1,196 @@ +use crate::{Config, Error, SlashingStatus}; +use lmdb::{Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags}; +use ssz::{Decode, Encode}; +use std::marker::PhantomData; +use std::sync::Arc; +use tree_hash::TreeHash; +use types::{Epoch, EthSpec, Hash256, IndexedAttestation}; + +/// Map from `(validator_index, target_epoch)` to `indexed_attestation_hash`. +const ATTESTER_DB: &str = "attester"; +/// Map from `indexed_attestation_hash` to `IndexedAttestation`. +const INDEXED_ATTESTATION_DB: &str = "indexed_attestations"; +const MIN_TARGETS_DB: &str = "min_targets"; +const MAX_TARGETS_DB: &str = "max_targets"; + +/// The number of DBs for LMDB to use (equal to the number of DBs defined above). +const LMDB_MAX_DBS: u32 = 4; +/// The size of the in-memory map for LMDB (larger than the maximum size of the database). +const LMDB_MAP_SIZE: usize = 256 * (1 << 30); // 256GiB + +const ATTESTER_KEY_SIZE: usize = 16; + +#[derive(Debug)] +pub struct SlasherDB { + pub(crate) env: Environment, + pub(crate) indexed_attestation_db: Database, + pub(crate) attester_db: Database, + pub(crate) min_targets_db: Database, + pub(crate) max_targets_db: Database, + config: Arc, + _phantom: PhantomData, +} + +#[derive(Debug)] +pub struct AttesterKey { + data: [u8; ATTESTER_KEY_SIZE], +} + +impl AttesterKey { + pub fn new(validator_index: u64, target_epoch: Epoch, config: &Config) -> Self { + let mut data = [0; ATTESTER_KEY_SIZE]; + let epoch_offset = target_epoch.as_usize() % config.history_length; + data[0..8].copy_from_slice(&validator_index.to_be_bytes()); + data[8..ATTESTER_KEY_SIZE].copy_from_slice(&epoch_offset.to_be_bytes()); + AttesterKey { data } + } +} + +impl AsRef<[u8]> for AttesterKey { + fn as_ref(&self) -> &[u8] { + &self.data + } +} + +impl SlasherDB { + pub fn open(config: Arc) -> Result { + // TODO: open_with_permissions + std::fs::create_dir_all(&config.database_path)?; + let env = Environment::new() + .set_max_dbs(LMDB_MAX_DBS) + .set_map_size(LMDB_MAP_SIZE) + .open(&config.database_path)?; + let indexed_attestation_db = + env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; + let attester_db = env.create_db(Some(ATTESTER_DB), Self::db_flags())?; + let min_targets_db = env.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; + let max_targets_db = env.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; + Ok(Self { + env, + indexed_attestation_db, + attester_db, + min_targets_db, + max_targets_db, + config, + _phantom: PhantomData, + }) + } + + pub fn db_flags() -> DatabaseFlags { + DatabaseFlags::default() + } + + pub fn write_flags() -> WriteFlags { + WriteFlags::default() + } + + pub fn begin_rw_txn(&self) -> Result, Error> { + Ok(self.env.begin_rw_txn()?) + } + + pub fn store_indexed_attestation( + &self, + txn: &mut RwTransaction<'_>, + indexed_attestation: &IndexedAttestation, + ) -> Result<(), Error> { + let indexed_attestation_hash = indexed_attestation.tree_hash_root(); + let data = indexed_attestation.as_ssz_bytes(); + + txn.put( + self.indexed_attestation_db, + &indexed_attestation_hash.as_bytes(), + &data, + Self::write_flags(), + )?; + Ok(()) + } + + pub fn get_indexed_attestation( + &self, + txn: &mut RwTransaction<'_>, + indexed_attestation_hash: Hash256, + ) -> Result, Error> { + match txn.get(self.indexed_attestation_db, &indexed_attestation_hash) { + Ok(bytes) => Ok(IndexedAttestation::from_ssz_bytes(bytes)?), + Err(lmdb::Error::NotFound) => Err(Error::MissingIndexedAttestation { + root: indexed_attestation_hash, + }), + Err(e) => Err(e.into()), + } + } + + pub fn check_and_update_attester_record( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + attestation: &IndexedAttestation, + indexed_attestation_hash: Hash256, + ) -> Result, Error> { + // See if there's an existing indexed attestation for this attester. + if let Some(existing_hash) = self.get_attestation_hash_for_validator( + txn, + validator_index, + attestation.data.target.epoch, + )? { + // If the existing indexed attestation is identical, then this attestation is not + // slashable and no update is required. + if existing_hash == indexed_attestation_hash { + return Ok(SlashingStatus::NotSlashable); + } + + // Otherwise, load the indexed attestation so we can check if it's slashable against + // the new one. + let existing_attestation = self.get_indexed_attestation(txn, existing_hash)?; + if attestation.is_double_vote(&existing_attestation) { + Ok(SlashingStatus::DoubleVote(Box::new(existing_attestation))) + } else { + Ok(SlashingStatus::NotSlashable) + } + } + // If no indexed attestation exists, insert one for this attester. + else { + txn.put( + self.attester_db, + &AttesterKey::new(validator_index, attestation.data.target.epoch, &self.config), + &indexed_attestation_hash, + Self::write_flags(), + )?; + Ok(SlashingStatus::NotSlashable) + } + } + + pub fn get_attestation_for_validator( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + target: Epoch, + ) -> Result>, Error> { + if let Some(hash) = self.get_attestation_hash_for_validator(txn, validator_index, target)? { + Ok(Some(self.get_indexed_attestation(txn, hash)?)) + } else { + Ok(None) + } + } + + pub fn get_attestation_hash_for_validator( + &self, + txn: &mut RwTransaction<'_>, + validator_index: u64, + target: Epoch, + ) -> Result, Error> { + let attester_key = AttesterKey::new(validator_index, target, &self.config); + match txn.get(self.attester_db, &attester_key) { + Ok(hash_bytes) => Ok(Some(hash256_from_slice(hash_bytes)?)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e.into()), + } + } +} + +fn hash256_from_slice(data: &[u8]) -> Result { + if data.len() == 32 { + Ok(Hash256::from_slice(data)) + } else { + Err(Error::AttesterRecordCorrupt { length: data.len() }) + } +} diff --git a/slasher/src/error.rs b/slasher/src/error.rs new file mode 100644 index 00000000000..1a9e23ebf97 --- /dev/null +++ b/slasher/src/error.rs @@ -0,0 +1,62 @@ +use std::io; +use types::{Epoch, Hash256}; + +#[derive(Debug)] +pub enum Error { + DatabaseError(lmdb::Error), + DatabaseIOError(io::Error), + SszDecodeError(ssz::DecodeError), + BincodeError(bincode::Error), + ArithError(safe_arith::ArithError), + ChunkIndexOutOfBounds(usize), + ConfigInvalidChunkSize { + chunk_size: usize, + history_length: usize, + }, + DistanceTooLarge, + DistanceCalculationOverflow, + /// Missing an attester record that we expected to exist. + MissingAttesterRecord { + validator_index: u64, + target_epoch: Epoch, + }, + AttesterRecordCorrupt { + length: usize, + }, + MissingIndexedAttestation { + root: Hash256, + }, +} + +impl From for Error { + fn from(e: lmdb::Error) -> Self { + match e { + lmdb::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), + _ => Error::DatabaseError(e), + } + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::DatabaseIOError(e) + } +} + +impl From for Error { + fn from(e: ssz::DecodeError) -> Self { + Error::SszDecodeError(e) + } +} + +impl From for Error { + fn from(e: bincode::Error) -> Self { + Error::BincodeError(e) + } +} + +impl From for Error { + fn from(e: safe_arith::ArithError) -> Self { + Error::ArithError(e) + } +} diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs new file mode 100644 index 00000000000..dfecf3a593e --- /dev/null +++ b/slasher/src/lib.rs @@ -0,0 +1,45 @@ +#![deny(missing_debug_implementations)] + +mod array; +mod attestation_queue; +pub mod config; +mod database; +mod error; +mod slasher; +mod slasher_server; + +pub use crate::slasher::Slasher; +pub use attestation_queue::AttestationQueue; +pub use config::Config; +pub use database::SlasherDB; +pub use error::Error; +pub use slasher_server::SlasherServer; + +use types::{AttesterSlashing, EthSpec, IndexedAttestation}; + +#[derive(Debug, PartialEq)] +pub enum SlashingStatus { + NotSlashable, + DoubleVote(Box>), + SurroundsExisting(Box>), + SurroundedByExisting(Box>), +} + +impl SlashingStatus { + pub fn into_slashing( + self, + new_attestation: &IndexedAttestation, + ) -> Option> { + use SlashingStatus::*; + + match self { + NotSlashable => None, + DoubleVote(existing) | SurroundsExisting(existing) | SurroundedByExisting(existing) => { + Some(AttesterSlashing { + attestation_1: *existing, + attestation_2: new_attestation.clone(), + }) + } + } + } +} diff --git a/slasher/src/slasher.rs b/slasher/src/slasher.rs new file mode 100644 index 00000000000..35ce44e4df4 --- /dev/null +++ b/slasher/src/slasher.rs @@ -0,0 +1,168 @@ +use crate::{array, AttestationQueue, Config, Error, SlasherDB}; +use lmdb::{RwTransaction, Transaction}; +use parking_lot::Mutex; +use slog::{debug, error, info, Logger}; +use std::sync::Arc; +use tree_hash::TreeHash; +use types::{AttesterSlashing, Epoch, EthSpec, IndexedAttestation}; + +#[derive(Debug)] +pub struct Slasher { + db: SlasherDB, + pub(crate) attestation_queue: AttestationQueue, + // TODO: consider using a set + attester_slashings: Mutex>>, + // TODO: consider removing Arc + config: Arc, + pub(crate) log: Logger, +} + +impl Slasher { + pub fn open(config: Config, log: Logger) -> Result { + config.validate()?; + let config = Arc::new(config); + let db = SlasherDB::open(config.clone())?; + let attester_slashings = Mutex::new(vec![]); + let attestation_queue = AttestationQueue::new(config.validator_chunk_size); + Ok(Self { + db, + attester_slashings, + attestation_queue, + config, + log, + }) + } + + pub fn get_attester_slashings(&self) -> Vec> { + std::mem::replace(&mut self.attester_slashings.lock(), vec![]) + } + + pub fn config(&self) -> &Config { + &self.config + } + + /// Accept an attestation from the network and queue it for processing. + pub fn accept_attestation(&self, attestation: IndexedAttestation) { + self.attestation_queue.queue(attestation); + } + + /// Apply queued attestations to the on-disk database. + pub fn process_attestations(&self, current_epoch: Epoch) -> Result<(), Error> { + let mut txn = self.db.begin_rw_txn()?; + + // Insert attestations into database. + for attestation in self.attestation_queue.get_attestations_to_store() { + self.db.store_indexed_attestation(&mut txn, &attestation)?; + } + + // Dequeue attestations in batches and process them. + let subqueues_lock = self.attestation_queue.subqueues.read(); + for (subqueue_id, subqueue) in subqueues_lock.iter().enumerate() { + let batch = subqueue.take(); + self.process_batch(&mut txn, subqueue_id, batch, current_epoch); + } + txn.commit()?; + Ok(()) + } + + /// Process a batch of attestations for a range of validator indices. + fn process_batch( + &self, + txn: &mut RwTransaction<'_>, + subqueue_id: usize, + batch: Vec>>, + current_epoch: Epoch, + ) { + // First, check for double votes. + for attestation in &batch { + match self.check_double_votes(txn, subqueue_id, &attestation) { + Ok(slashings) => { + if !slashings.is_empty() { + info!( + self.log, + "Found {} new double-vote slashings!", + slashings.len() + ); + } + self.attester_slashings.lock().extend(slashings); + } + Err(e) => { + error!( + self.log, + "Error checking for double votes"; + "error" => format!("{:?}", e) + ); + } + } + } + + // Then check for surrounds using the min-max arrays. + match array::update( + &self.db, + txn, + subqueue_id, + batch, + current_epoch, + &self.config, + ) { + Ok(slashings) => { + if !slashings.is_empty() { + info!( + self.log, + "Found {} new surround slashings!", + slashings.len() + ); + } + self.attester_slashings.lock().extend(slashings); + } + Err(e) => { + error!( + self.log, + "Error processing array update"; + "error" => format!("{:?}", e), + ); + } + } + } + + /// Check for double votes from all validators on `attestation` who match the `subqueue_id`. + fn check_double_votes( + &self, + txn: &mut RwTransaction<'_>, + subqueue_id: usize, + attestation: &IndexedAttestation, + ) -> Result>, Error> { + let indexed_attestation_hash = attestation.tree_hash_root(); + + let mut slashings = vec![]; + + for validator_index in self + .config + .attesting_validators_for_chunk(attestation, subqueue_id) + { + let slashing_status = self.db.check_and_update_attester_record( + txn, + validator_index, + &attestation, + indexed_attestation_hash, + )?; + + if let Some(slashing) = slashing_status.into_slashing(attestation) { + debug!( + self.log, + "Found double-vote slashing"; + "validator_index" => validator_index, + "epoch" => slashing.attestation_1.data.target.epoch, + ); + + // Avoid creating duplicate slashings for the same attestation. + // PERF: this is O(n) instead of O(1), but n should be small. + if !slashings.contains(&slashing) { + slashings.push(slashing); + } + } + } + + Ok(slashings) + } +} diff --git a/slasher/src/slasher_server.rs b/slasher/src/slasher_server.rs new file mode 100644 index 00000000000..6029f3f071a --- /dev/null +++ b/slasher/src/slasher_server.rs @@ -0,0 +1,64 @@ +use crate::Slasher; +use environment::TaskExecutor; +use slog::{debug, error, info, trace}; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::stream::StreamExt; +use tokio::time::{interval_at, Duration, Instant}; +use types::EthSpec; + +#[derive(Debug)] +pub struct SlasherServer; + +impl SlasherServer { + pub fn new( + slasher: Arc>, + slot_clock: C, + executor: &TaskExecutor, + ) { + info!(slasher.log, "Starting slasher to detect misbehaviour"); + let sub_executor = executor.clone(); + executor.spawn( + async move { + // FIXME: read slot time from config, align to some fraction of each slot + let slot_clock = Arc::new(slot_clock); + let mut interval = interval_at(Instant::now(), Duration::from_secs(12)); + while interval.next().await.is_some() { + let slot_clock = slot_clock.clone(); + let slasher = slasher.clone(); + sub_executor.spawn_blocking( + move || { + if let Some(current_slot) = slot_clock.now() { + let t = Instant::now(); + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let (num_validator_chunks, num_attestations) = + slasher.attestation_queue.stats(); + if let Err(e) = slasher.process_attestations(current_epoch) { + error!( + slasher.log, + "Error during scheduled slasher processing"; + "error" => format!("{:?}", e) + ); + } + debug!( + slasher.log, + "Completed slasher update"; + "time_taken" => format!("{}ms", t.elapsed().as_millis()), + "num_attestations" => num_attestations, + "num_validator_chunks" => num_validator_chunks, + ); + } else { + trace!( + slasher.log, + "Slasher has nothing to do: we are pre-genesis" + ); + } + }, + "slasher_server_process_attestations", + ); + } + }, + "slasher_server", + ); + } +} diff --git a/slasher/tests/slasher_tests.rs b/slasher/tests/slasher_tests.rs new file mode 100644 index 00000000000..882dcbe8c71 --- /dev/null +++ b/slasher/tests/slasher_tests.rs @@ -0,0 +1,236 @@ +use slasher::{config::DEFAULT_CHUNK_SIZE, Config, Slasher}; +use slog::{o, Drain, Logger}; +use tempdir::TempDir; +use types::{ + AggregateSignature, AttestationData, AttesterSlashing, Checkpoint, Epoch, Hash256, + IndexedAttestation, MainnetEthSpec, Slot, +}; + +type E = MainnetEthSpec; + +fn indexed_att( + attesting_indices: impl AsRef<[u64]>, + source_epoch: u64, + target_epoch: u64, + target_root: u64, +) -> IndexedAttestation { + IndexedAttestation { + attesting_indices: attesting_indices.as_ref().to_vec().into(), + data: AttestationData { + slot: Slot::new(0), + index: 0, + beacon_block_root: Hash256::zero(), + source: Checkpoint { + epoch: Epoch::new(source_epoch), + root: Hash256::from_low_u64_be(0), + }, + target: Checkpoint { + epoch: Epoch::new(target_epoch), + root: Hash256::from_low_u64_be(target_root), + }, + }, + signature: AggregateSignature::empty(), + } +} + +fn att_slashing( + attestation_1: &IndexedAttestation, + attestation_2: &IndexedAttestation, +) -> AttesterSlashing { + AttesterSlashing { + attestation_1: attestation_1.clone(), + attestation_2: attestation_2.clone(), + } +} + +#[test] +fn double_vote_single_val() { + let v = vec![99]; + let att1 = indexed_att(&v, 0, 1, 0); + let att2 = indexed_att(&v, 0, 1, 1); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +#[test] +fn double_vote_multi_vals() { + let v = vec![0, 1, 2]; + let att1 = indexed_att(&v, 0, 1, 0); + let att2 = indexed_att(&v, 0, 1, 1); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// A subset of validators double vote. +#[test] +fn double_vote_some_vals() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 2, 4, 6]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// A subset of validators double vote, others vote twice for the same thing. +#[test] +fn double_vote_some_vals_repeat() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 2, 4, 6]; + let v3 = vec![1, 3, 5]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let att3 = indexed_att(&v3, 0, 1, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2, att3]; + slasher_test_indiv(&attestations, &slashings, 1); + slasher_test_indiv(&attestations, &slashings, 1000); +} + +// Nobody double votes, nobody gets slashed. +#[test] +fn no_double_vote_same_target() { + let v1 = vec![0, 1, 2, 3, 4, 5, 6]; + let v2 = vec![0, 1, 2, 3, 4, 5, 7, 8]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 0); + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &[], 1); + slasher_test_indiv(&attestations, &[], 1000); +} + +// Two groups votes for different things, no slashings. +#[test] +fn no_double_vote_distinct_vals() { + let v1 = vec![0, 1, 2, 3]; + let v2 = vec![4, 5, 6, 7]; + let att1 = indexed_att(&v1, 0, 1, 0); + let att2 = indexed_att(&v2, 0, 1, 1); + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &[], 1); + slasher_test_indiv(&attestations, &[], 1000); +} + +#[test] +fn surrounds_existing_single_val_single_chunk() { + let v = vec![0]; + let att1 = indexed_att(&v, 1, 2, 0); + let att2 = indexed_att(&v, 0, 3, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + slasher_test_indiv(&[att1, att2], &slashings, 3); +} + +/* FIXME: refactor these tests +#[test] +fn surrounds_existing_multi_vals_single_chunk() { + let v = vec![0]; + let att1 = indexed_att(&v, 1, 2, 0); + let att2 = indexed_att(&v, 0, 3, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + slasher_test_indiv(&[att1, att2], &slashings, 3); + let validators = vec![0, 16, 1024, 300_000, 300_001]; + let att1 = indexed_att(validators.clone(), 1, 2, 0); + let att2 = indexed_att(validators.clone(), 0, 3, 0); + + slasher.accept_attestation(att1); + slasher.process_attestations(); + slasher.accept_attestation(att2); + slasher.process_attestations(); +} + + +#[test] +fn surrounds_existing_many_chunks() { + let v = vec![0]; + let chunk_size = Config::default().chunk_size as u64; + let att1 = indexed_att(&v, 3 * chunk_size, 3 * chunk_size + 1, 0); + let att2 = indexed_att(&v, 0, 3 * chunk_size + 2, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test(&attestations, &slashings, 4 * chunk_size, |_| true); +} +*/ + +#[test] +fn surrounded_by_single_val_single_chunk() { + let v = vec![0]; + let att1 = indexed_att(&v, 0, 15, 0); + let att2 = indexed_att(&v, 1, 14, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 15); +} + +#[test] +fn surrounded_by_single_val_multi_chunk() { + let v = vec![0]; + let chunk_size = DEFAULT_CHUNK_SIZE as u64; + let att1 = indexed_att(&v, 0, 3 * chunk_size, 0); + let att2 = indexed_att(&v, chunk_size, chunk_size + 1, 0); + let slashings = vec![att_slashing(&att1, &att2)]; + let attestations = vec![att1, att2]; + slasher_test_indiv(&attestations, &slashings, 3 * chunk_size); + slasher_test_indiv(&attestations, &slashings, 4 * chunk_size); +} + +/* +fn slasher_tests(attestations: &[IndexedAttestation], expected: &[AttesterSlashing]) { + // Process after every attestation. + // slasher_test(attestations, expected, |_| true); + // Process only at the end. + slasher_test(attestations, expected, |_| false); + // Process every second attestation. + // slasher_test(attestations, expected, |i| i % 2 == 0); +} +*/ + +// Process each attestation individually, and confirm that the slashings produced are as expected. +fn slasher_test_indiv( + attestations: &[IndexedAttestation], + expected: &[AttesterSlashing], + current_epoch: u64, +) { + slasher_test(attestations, expected, current_epoch, |_| true); +} + +// FIXME(sproul): move this somewhere else +fn logger() -> Logger { + let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); + let drain = slog_term::FullFormat::new(decorator).build(); + Logger::root(Box::new(std::sync::Mutex::new(drain)).fuse(), o!()) +} + +fn slasher_test( + attestations: &[IndexedAttestation], + expected: &[AttesterSlashing], + current_epoch: u64, + should_process_after: impl Fn(usize) -> bool, +) { + let tempdir = TempDir::new("slasher").unwrap(); + let config = Config::new(tempdir.path().into()); + let slasher = Slasher::open(config, logger()).unwrap(); + let current_epoch = Epoch::new(current_epoch); + + for (i, attestation) in attestations.iter().enumerate() { + slasher.accept_attestation(attestation.clone()); + + if should_process_after(i) { + slasher.process_attestations(current_epoch).unwrap(); + } + } + slasher.process_attestations(current_epoch).unwrap(); + + let slashings = slasher.get_attester_slashings(); + + for (i, slashing) in expected.iter().enumerate() { + assert_eq!(*slashing, slashings[i], "slashing {} should match", i); + } + + assert_eq!(expected, &slashings[..]); +}