From f8da151b0bec12a707a742260dfe635a6a1890f4 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 9 Nov 2020 23:13:56 +0000 Subject: [PATCH] Standard beacon api updates (#1831) ## Issue Addressed Resolves #1809 Resolves #1824 Resolves #1818 Resolves #1828 (hopefully) ## Proposed Changes - add `validator_index` to the proposer duties endpoint - add the ability to query for historical proposer duties - `StateId` deserialization now fails with a 400 warp rejection - add the `validator_balances` endpoint - update the `aggregate_and_proofs` endpoint to accept an array - updates the attester duties endpoint from a `GET` to a `POST` - reduces the number of times we query for proposer duties from once per slot per validator to only once per slot Co-authored-by: realbigsean Co-authored-by: Paul Hauner --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 +- .../http_api/src/beacon_proposer_cache.rs | 1 + beacon_node/http_api/src/lib.rs | 444 +++++++++++------- beacon_node/http_api/tests/tests.rs | 103 +++- common/eth2/src/lib.rs | 126 +++-- common/eth2/src/types.rs | 42 +- common/warp_utils/src/reject.rs | 27 +- validator_client/src/attestation_service.rs | 69 +-- validator_client/src/duties_service.rs | 77 ++- validator_client/src/validator_duty.rs | 151 ++++-- 10 files changed, 722 insertions(+), 322 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e387150fe80..1f3a18b3e3e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -984,9 +984,9 @@ impl BeaconChain { /// /// - `VerifiedUnaggregatedAttestation` /// - `VerifiedAggregatedAttestation` - pub fn apply_attestation_to_fork_choice<'a>( + pub fn apply_attestation_to_fork_choice( &self, - verified: &'a impl SignatureVerifiedAttestation, + verified: &impl SignatureVerifiedAttestation, ) -> Result<(), Error> { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); diff --git a/beacon_node/http_api/src/beacon_proposer_cache.rs b/beacon_node/http_api/src/beacon_proposer_cache.rs index b062119e578..1dbcef1f50f 100644 --- a/beacon_node/http_api/src/beacon_proposer_cache.rs +++ b/beacon_node/http_api/src/beacon_proposer_cache.rs @@ -89,6 +89,7 @@ impl BeaconProposerCache { Ok(ProposerData { pubkey: PublicKeyBytes::from(pubkey), + validator_index: i as u64, slot, }) }) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index deecae879e6..ba37150110a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -39,7 +39,7 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use types::{ Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec, - Hash256, ProposerSlashing, PublicKey, RelativeEpoch, SignedAggregateAndProof, + Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig, }; use warp::{http::Response, Filter}; @@ -390,7 +390,13 @@ pub fn serve( let beacon_states_path = eth1_v1 .and(warp::path("beacon")) .and(warp::path("states")) - .and(warp::path::param::()) + .and(warp::path::param::().or_else(|_| { + blocking_task(|| { + Err(warp_utils::reject::custom_bad_request( + "Invalid state ID".to_string(), + )) + }) + })) .and(chain_filter.clone()); // GET beacon/states/{state_id}/root @@ -435,6 +441,50 @@ pub fn serve( }) }); + // GET beacon/states/{state_id}/validator_balances?id + let get_beacon_state_validator_balances = beacon_states_path + .clone() + .and(warp::path("validator_balances")) + .and(warp::path::end()) + .and(warp::query::()) + .and_then( + |state_id: StateId, + chain: Arc>, + query: api_types::ValidatorBalancesQuery| { + blocking_json_task(move || { + state_id + .map_state(&chain, |state| { + Ok(state + .validators + .iter() + .zip(state.balances.iter()) + .enumerate() + // filter by validator id(s) if provided + .filter(|(index, (validator, _))| { + query.id.as_ref().map_or(true, |ids| { + ids.0.iter().any(|id| match id { + ValidatorId::PublicKey(pubkey) => { + &validator.pubkey == pubkey + } + ValidatorId::Index(param_index) => { + *param_index == *index as u64 + } + }) + }) + }) + .map(|(index, (_, balance))| { + Some(api_types::ValidatorBalanceData { + index: index as u64, + balance: *balance, + }) + }) + .collect::>()) + }) + .map(api_types::GenericResponse::from) + }) + }, + ); + // GET beacon/states/{state_id}/validators?id,status let get_beacon_state_validators = beacon_states_path .clone() @@ -747,7 +797,7 @@ pub fn serve( * beacon/blocks */ - // POST beacon/blocks/{block_id} + // POST beacon/blocks let post_beacon_blocks = eth1_v1 .and(warp::path("beacon")) .and(warp::path("blocks")) @@ -1370,18 +1420,158 @@ pub fn serve( * validator */ - // GET validator/duties/attester/{epoch} - let get_validator_duties_attester = eth1_v1 + // GET validator/duties/proposer/{epoch} + let get_validator_duties_proposer = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("proposer")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and(beacon_proposer_cache()) + .and_then( + |epoch: Epoch, + chain: Arc>, + beacon_proposer_cache: Arc>| { + blocking_json_task(move || { + let current_epoch = chain + .epoch() + .map_err(warp_utils::reject::beacon_chain_error)?; + + if epoch > current_epoch { + return Err(warp_utils::reject::custom_bad_request(format!( + "request epoch {} is ahead of the current epoch {}", + epoch, current_epoch + ))); + } + + if epoch == current_epoch { + beacon_proposer_cache + .lock() + .get_proposers(&chain, epoch) + .map(api_types::GenericResponse::from) + } else { + let state = + StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())) + .state(&chain)?; + + epoch + .slot_iter(T::EthSpec::slots_per_epoch()) + .map(|slot| { + state + .get_beacon_proposer_index(slot, &chain.spec) + .map_err(warp_utils::reject::beacon_state_error) + .and_then(|i| { + let pubkey = + chain.validator_pubkey(i) + .map_err(warp_utils::reject::beacon_chain_error)? + .ok_or_else(|| + warp_utils::reject::beacon_chain_error( + BeaconChainError::ValidatorPubkeyCacheIncomplete(i) + ) + )?; + + Ok(api_types::ProposerData { + pubkey: PublicKeyBytes::from(pubkey), + validator_index: i as u64, + slot, + }) + }) + }) + .collect::, _>>() + .map(api_types::GenericResponse::from) + } + }) + }, + ); + + // GET validator/blocks/{slot} + let get_validator_blocks = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("blocks")) + .and(warp::path::param::()) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp::query::()) + .and(chain_filter.clone()) + .and_then( + |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| { + blocking_json_task(move || { + let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "randao reveal is not valid BLS signature: {:?}", + e + )) + })?; + + chain + .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) + .map(|block_and_state| block_and_state.0) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::block_production_error) + }) + }, + ); + + // GET validator/attestation_data?slot,committee_index + let get_validator_attestation_data = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("attestation_data")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and_then( + |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { + blocking_json_task(move || { + chain + .produce_unaggregated_attestation(query.slot, query.committee_index) + .map(|attestation| attestation.data) + .map(api_types::GenericResponse::from) + .map_err(warp_utils::reject::beacon_chain_error) + }) + }, + ); + + // GET validator/aggregate_attestation?attestation_data_root,slot + let get_validator_aggregate_attestation = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("aggregate_attestation")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and_then( + |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { + blocking_json_task(move || { + chain + .get_aggregated_attestation_by_slot_and_root( + query.slot, + &query.attestation_data_root, + ) + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching aggregate found".to_string(), + ) + }) + }) + }, + ); + + // POST validator/duties/attester/{epoch} + let post_validator_duties_attester = eth1_v1 .and(warp::path("validator")) .and(warp::path("duties")) .and(warp::path("attester")) .and(warp::path::param::()) .and(warp::path::end()) .and(not_while_syncing_filter.clone()) - .and(warp::query::()) + .and(warp::body::json()) .and(chain_filter.clone()) .and_then( - |epoch: Epoch, query: api_types::ValidatorDutiesQuery, chain: Arc>| { + |epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc>| { blocking_json_task(move || { let current_epoch = chain .epoch() @@ -1397,30 +1587,22 @@ pub fn serve( let validator_count = StateId::head() .map_state(&chain, |state| Ok(state.validators.len() as u64))?; - let indices = query - .index - .as_ref() - .map(|index| index.0.clone()) - .map(Result::Ok) - .unwrap_or_else(|| { - Ok::<_, warp::Rejection>((0..validator_count).collect()) - })?; - let pubkeys = indices - .into_iter() - .filter(|i| *i < validator_count as u64) + .0 + .iter() + .filter(|i| **i < validator_count as u64) .map(|i| { let pubkey = chain - .validator_pubkey(i as usize) + .validator_pubkey(*i as usize) .map_err(warp_utils::reject::beacon_chain_error)? .ok_or_else(|| { warp_utils::reject::custom_bad_request(format!( "unknown validator index {}", - i + *i )) })?; - Ok((i, pubkey)) + Ok((*i, pubkey)) }) .collect::, warp::Rejection>>()?; @@ -1536,103 +1718,6 @@ pub fn serve( }, ); - // GET validator/duties/proposer/{epoch} - let get_validator_duties_proposer = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("duties")) - .and(warp::path("proposer")) - .and(warp::path::param::()) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and(beacon_proposer_cache()) - .and_then( - |epoch: Epoch, - chain: Arc>, - beacon_proposer_cache: Arc>| { - blocking_json_task(move || { - beacon_proposer_cache - .lock() - .get_proposers(&chain, epoch) - .map(api_types::GenericResponse::from) - }) - }, - ); - - // GET validator/blocks/{slot} - let get_validator_blocks = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("blocks")) - .and(warp::path::param::()) - .and(warp::path::end()) - .and(not_while_syncing_filter.clone()) - .and(warp::query::()) - .and(chain_filter.clone()) - .and_then( - |slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc>| { - blocking_json_task(move || { - let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "randao reveal is not valid BLS signature: {:?}", - e - )) - })?; - - chain - .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) - .map(|block_and_state| block_and_state.0) - .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::block_production_error) - }) - }, - ); - - // GET validator/attestation_data?slot,committee_index - let get_validator_attestation_data = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("attestation_data")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and_then( - |query: api_types::ValidatorAttestationDataQuery, chain: Arc>| { - blocking_json_task(move || { - chain - .produce_unaggregated_attestation(query.slot, query.committee_index) - .map(|attestation| attestation.data) - .map(api_types::GenericResponse::from) - .map_err(warp_utils::reject::beacon_chain_error) - }) - }, - ); - - // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth1_v1 - .and(warp::path("validator")) - .and(warp::path("aggregate_attestation")) - .and(warp::path::end()) - .and(warp::query::()) - .and(not_while_syncing_filter.clone()) - .and(chain_filter.clone()) - .and_then( - |query: api_types::ValidatorAggregateAttestationQuery, chain: Arc>| { - blocking_json_task(move || { - chain - .get_aggregated_attestation_by_slot_and_root( - query.slot, - &query.attestation_data_root, - ) - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching aggregate found".to_string(), - ) - }) - }) - }, - ); - // POST validator/aggregate_and_proofs let post_validator_aggregate_and_proofs = eth1_v1 .and(warp::path("validator")) @@ -1642,53 +1727,81 @@ pub fn serve( .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone()) + .and(log_filter.clone()) .and_then( |chain: Arc>, - aggregate: SignedAggregateAndProof, - network_tx: UnboundedSender>| { + aggregates: Vec>, + network_tx: UnboundedSender>, log: Logger| { blocking_json_task(move || { - let aggregate = + let mut verified_aggregates = Vec::with_capacity(aggregates.len()); + let mut messages = Vec::with_capacity(aggregates.len()); + let mut failures = Vec::new(); + + // Verify that all messages in the post are valid before processing further + for (index, aggregate) in aggregates.as_slice().iter().enumerate() { match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) { - Ok(aggregate) => aggregate, + Ok(verified_aggregate) => { + messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new( + verified_aggregate.aggregate().clone(), + ))); + verified_aggregates.push((index, verified_aggregate)); + } // If we already know the attestation, don't broadcast it or attempt to // further verify it. Return success. // // It's reasonably likely that two different validators produce // identical aggregates, especially if they're using the same beacon // node. - Err(AttnError::AttestationAlreadyKnown(_)) => return Ok(()), + Err(AttnError::AttestationAlreadyKnown(_)) => continue, Err(e) => { - return Err(warp_utils::reject::object_invalid(format!( - "gossip verification failed: {:?}", - e - ))); - } - }; + error!(log, + "Failure verifying aggregate and proofs"; + "error" => format!("{:?}", e), + "request_index" => index, + "aggregator_index" => aggregate.message.aggregator_index, + "attestation_index" => aggregate.message.aggregate.data.index, + "attestation_slot" => aggregate.message.aggregate.data.slot, + ); + failures.push(api_types::Failure::new(index, format!("Verification: {:?}", e))); + }, + } + } - publish_pubsub_message( - &network_tx, - PubsubMessage::AggregateAndProofAttestation(Box::new( - aggregate.aggregate().clone(), - )), - )?; + // Publish aggregate attestations to the libp2p network + if !messages.is_empty() { + publish_network_message(&network_tx, NetworkMessage::Publish { messages })?; + } - chain - .apply_attestation_to_fork_choice(&aggregate) - .map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to fork choice: {:?}", - e - )) - })?; + // Import aggregate attestations + for (index, verified_aggregate) in verified_aggregates { + if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) { + error!(log, + "Failure applying verified aggregate attestation to fork choice"; + "error" => format!("{:?}", e), + "request_index" => index, + "aggregator_index" => verified_aggregate.aggregate().message.aggregator_index, + "attestation_index" => verified_aggregate.attestation().data.index, + "attestation_slot" => verified_aggregate.attestation().data.slot, + ); + failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e))); + } + if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) { + warn!(log, + "Could not add verified aggregate attestation to the inclusion pool"; + "error" => format!("{:?}", e), + "request_index" => index, + ); + failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); + } + } - chain.add_to_block_inclusion_pool(aggregate).map_err(|e| { - warp_utils::reject::broadcast_without_import(format!( - "not applied to block inclusion pool: {:?}", - e + if !failures.is_empty() { + Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(), + failures )) - })?; - - Ok(()) + } else { + Ok(()) + } }) }, ); @@ -1935,9 +2048,11 @@ pub fn serve( let routes = warp::get() .and( get_beacon_genesis + .boxed() .or(get_beacon_state_root.boxed()) .or(get_beacon_state_fork.boxed()) .or(get_beacon_state_finality_checkpoints.boxed()) + .or(get_beacon_state_validator_balances.boxed()) .or(get_beacon_state_validators.boxed()) .or(get_beacon_state_validators_id.boxed()) .or(get_beacon_state_committees.boxed()) @@ -1961,7 +2076,6 @@ pub fn serve( .or(get_node_health.boxed()) .or(get_node_peers_by_id.boxed()) .or(get_node_peers.boxed()) - .or(get_validator_duties_attester.boxed()) .or(get_validator_duties_proposer.boxed()) .or(get_validator_blocks.boxed()) .or(get_validator_attestation_data.boxed()) @@ -1976,23 +2090,19 @@ pub fn serve( .or(get_lighthouse_eth1_syncing.boxed()) .or(get_lighthouse_eth1_block_cache.boxed()) .or(get_lighthouse_eth1_deposit_cache.boxed()) - .or(get_lighthouse_beacon_states_ssz.boxed()) - .boxed(), + .or(get_lighthouse_beacon_states_ssz.boxed()), ) - .or(warp::post() - .and( - post_beacon_blocks - .or(post_beacon_pool_attestations.boxed()) - .or(post_beacon_pool_attester_slashings.boxed()) - .or(post_beacon_pool_proposer_slashings.boxed()) - .or(post_beacon_pool_voluntary_exits.boxed()) - .or(post_validator_aggregate_and_proofs.boxed()) - .or(post_validator_beacon_committee_subscriptions.boxed()) - .boxed(), - ) - .boxed()) - .boxed() - // Maps errors into HTTP responses. + .or(warp::post().and( + post_beacon_blocks + .boxed() + .or(post_beacon_pool_attestations.boxed()) + .or(post_beacon_pool_attester_slashings.boxed()) + .or(post_beacon_pool_proposer_slashings.boxed()) + .or(post_beacon_pool_voluntary_exits.boxed()) + .or(post_validator_duties_attester.boxed()) + .or(post_validator_aggregate_and_proofs.boxed()) + .or(post_validator_beacon_committee_subscriptions.boxed()), + )) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) .with(prometheus_metrics()) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 295807ded77..f80ece9fece 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -411,6 +411,73 @@ impl ApiTester { self } + pub async fn test_beacon_states_validator_balances(self) -> Self { + for state_id in self.interesting_state_ids() { + for validator_indices in self.interesting_validator_indices() { + let state_opt = self.get_state(state_id); + let validators: Vec = match state_opt.as_ref() { + Some(state) => state.validators.clone().into(), + None => vec![], + }; + let validator_index_ids = validator_indices + .iter() + .cloned() + .map(|i| ValidatorId::Index(i)) + .collect::>(); + let validator_pubkey_ids = validator_indices + .iter() + .cloned() + .map(|i| { + ValidatorId::PublicKey( + validators + .get(i as usize) + .map_or(PublicKeyBytes::empty(), |val| val.pubkey.clone()), + ) + }) + .collect::>(); + + let result_index_ids = self + .client + .get_beacon_states_validator_balances( + state_id, + Some(validator_index_ids.as_slice()), + ) + .await + .unwrap() + .map(|res| res.data); + let result_pubkey_ids = self + .client + .get_beacon_states_validator_balances( + state_id, + Some(validator_pubkey_ids.as_slice()), + ) + .await + .unwrap() + .map(|res| res.data); + + let expected = state_opt.map(|state| { + let mut validators = Vec::with_capacity(validator_indices.len()); + + for i in validator_indices { + if i < state.balances.len() as u64 { + validators.push(ValidatorBalanceData { + index: i as u64, + balance: state.balances[i as usize], + }); + } + } + + validators + }); + + assert_eq!(result_index_ids, expected, "{:?}", state_id); + assert_eq!(result_pubkey_ids, expected, "{:?}", state_id); + } + } + + self + } + pub async fn test_beacon_states_validators(self) -> Self { for state_id in self.interesting_state_ids() { for statuses in self.interesting_validator_statuses() { @@ -1235,7 +1302,7 @@ impl ApiTester { if epoch > current_epoch + 1 { assert_eq!( self.client - .get_validator_duties_attester(epoch, Some(&indices)) + .post_validator_duties_attester(epoch, indices.as_slice()) .await .unwrap_err() .status() @@ -1247,7 +1314,7 @@ impl ApiTester { let results = self .client - .get_validator_duties_attester(epoch, Some(&indices)) + .post_validator_duties_attester(epoch, indices.as_slice()) .await .unwrap() .data; @@ -1336,7 +1403,11 @@ impl ApiTester { .unwrap(); let pubkey = state.validators[index].pubkey.clone().into(); - ProposerData { pubkey, slot } + ProposerData { + pubkey, + validator_index: index as u64, + slot, + } }) .collect::>(); @@ -1473,17 +1544,17 @@ impl ApiTester { let fork = head.beacon_state.fork; let genesis_validators_root = self.chain.genesis_validators_root; - let mut duties = vec![]; - for i in 0..self.validator_keypairs.len() { - duties.push( - self.client - .get_validator_duties_attester(epoch, Some(&[i as u64])) - .await - .unwrap() - .data[0] - .clone(), + let duties = self + .client + .post_validator_duties_attester( + epoch, + (0..self.validator_keypairs.len() as u64) + .collect::>() + .as_slice(), ) - } + .await + .unwrap() + .data; let (i, kp, duty, proof) = self .validator_keypairs @@ -1554,7 +1625,7 @@ impl ApiTester { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&aggregate) + .post_validator_aggregate_and_proof::(&[aggregate]) .await .unwrap(); @@ -1569,7 +1640,7 @@ impl ApiTester { aggregate.message.aggregate.data.slot += 1; self.client - .post_validator_aggregate_and_proof::(&aggregate) + .post_validator_aggregate_and_proof::(&[aggregate]) .await .unwrap_err(); @@ -1700,6 +1771,8 @@ async fn beacon_get() { .await .test_beacon_states_validators() .await + .test_beacon_states_validator_balances() + .await .test_beacon_states_committees() .await .test_beacon_states_validator_id() diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 6582e63695f..38d86ddd74b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -28,6 +28,8 @@ pub enum Error { Reqwest(reqwest::Error), /// The server returned an error message where the body was able to be parsed. ServerMessage(ErrorMessage), + /// The server returned an error message with an array of errors. + ServerIndexedMessage(IndexedErrorMessage), /// The server returned an error message where the body was unable to be parsed. StatusCode(StatusCode), /// The supplied URL is badly formatted. It should look something like `http://127.0.0.1:5052`. @@ -50,6 +52,7 @@ impl Error { match self { Error::Reqwest(error) => error.status(), Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(), + Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::StatusCode(status) => Some(*status), Error::InvalidUrl(_) => None, Error::InvalidSecret(_) => None, @@ -137,6 +140,26 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Perform a HTTP POST request, returning a JSON response. + async fn post_with_response( + &self, + url: U, + body: &V, + ) -> Result { + let response = self + .client + .post(url) + .json(body) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_error(response) + .await? + .json() + .await + .map_err(Error::Reqwest) + } + /// `GET beacon/genesis` /// /// ## Errors @@ -210,6 +233,35 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET beacon/states/{state_id}/validator_balances?id` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_states_validator_balances( + &self, + state_id: StateId, + ids: Option<&[ValidatorId]>, + ) -> Result>>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("states") + .push(&state_id.to_string()) + .push("validator_balances"); + + if let Some(ids) = ids { + let id_string = ids + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + path.query_pairs_mut().append_pair("id", &id_string); + } + + self.get_opt(path).await + } + /// `GET beacon/states/{state_id}/validators?id,status` /// /// Returns `Ok(None)` on a 404 error. @@ -713,37 +765,6 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `GET validator/duties/attester/{epoch}?index` - /// - /// ## Note - /// - /// The `index` query parameter accepts a list of validator indices. - pub async fn get_validator_duties_attester( - &self, - epoch: Epoch, - index: Option<&[u64]>, - ) -> Result>, Error> { - let mut path = self.eth_path()?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("validator") - .push("duties") - .push("attester") - .push(&epoch.to_string()); - - if let Some(index) = index { - let string = index - .iter() - .map(|i| i.to_string()) - .collect::>() - .join(","); - path.query_pairs_mut().append_pair("index", &string); - } - - self.get(path).await - } - /// `GET validator/duties/proposer/{epoch}` pub async fn get_validator_duties_proposer( &self, @@ -830,10 +851,28 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `POST validator/duties/attester/{epoch}` + pub async fn post_validator_duties_attester( + &self, + epoch: Epoch, + indices: &[u64], + ) -> Result>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("duties") + .push("attester") + .push(&epoch.to_string()); + + self.post_with_response(path, &indices).await + } + /// `POST validator/aggregate_and_proofs` pub async fn post_validator_aggregate_and_proof( &self, - aggregate: &SignedAggregateAndProof, + aggregates: &[SignedAggregateAndProof], ) -> Result<(), Error> { let mut path = self.eth_path()?; @@ -842,7 +881,14 @@ impl BeaconNodeHttpClient { .push("validator") .push("aggregate_and_proofs"); - self.post(path, aggregate).await?; + let response = self + .client + .post(path) + .json(aggregates) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_indexed_error(response).await?; Ok(()) } @@ -878,3 +924,17 @@ async fn ok_or_error(response: Response) -> Result { Err(Error::StatusCode(status)) } } + +/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an +/// appropriate indexed error message. +async fn ok_or_indexed_error(response: Response) -> Result { + let status = response.status(); + + if status == StatusCode::OK { + Ok(response) + } else if let Ok(message) = response.json().await { + Err(Error::ServerIndexedMessage(message)) + } else { + Err(Error::StatusCode(status)) + } +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 6ba16b190d5..56a83cfd556 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -18,6 +18,30 @@ pub struct ErrorMessage { pub stacktraces: Vec, } +/// An indexed API error serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct IndexedErrorMessage { + pub code: u16, + pub message: String, + pub failures: Vec, +} + +/// A single failure in an index of API errors, serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Failure { + pub index: u64, + pub message: String, +} + +impl Failure { + pub fn new(index: usize, message: String) -> Self { + Self { + index: index as u64, + message, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct GenesisData { #[serde(with = "serde_utils::quoted_u64")] @@ -206,6 +230,14 @@ pub struct ValidatorData { pub validator: Validator, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ValidatorBalanceData { + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub balance: u64, +} + // TODO: This does not currently match the spec, but I'm going to try and change the spec using // this proposal: // @@ -415,10 +447,14 @@ impl TryFrom for QueryVec { } #[derive(Clone, Deserialize)] -pub struct ValidatorDutiesQuery { - pub index: Option>, +pub struct ValidatorBalancesQuery { + pub id: Option>, } +#[derive(Clone, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ValidatorIndexData(#[serde(with = "serde_utils::quoted_u64_vec")] pub Vec); + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct AttesterData { pub pubkey: PublicKeyBytes, @@ -438,6 +474,8 @@ pub struct AttesterData { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ProposerData { pub pubkey: PublicKeyBytes, + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, pub slot: Slot, } diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 020fa19d8b8..9a5a8ea5cfd 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -1,4 +1,4 @@ -use eth2::types::ErrorMessage; +use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage}; use std::convert::Infallible; use warp::{http::StatusCode, reject::Reject}; @@ -110,12 +110,37 @@ pub fn invalid_auth(msg: String) -> warp::reject::Rejection { warp::reject::custom(InvalidAuthorization(msg)) } +#[derive(Debug)] +pub struct IndexedBadRequestErrors { + pub message: String, + pub failures: Vec, +} + +impl Reject for IndexedBadRequestErrors {} + +pub fn indexed_bad_request(message: String, failures: Vec) -> warp::reject::Rejection { + warp::reject::custom(IndexedBadRequestErrors { message, failures }) +} + /// This function receives a `Rejection` and tries to return a custom /// value, otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { let code; let message; + if let Some(e) = err.find::() { + message = format!("BAD_REQUEST: {}", e.message); + code = StatusCode::BAD_REQUEST; + + let json = warp::reply::json(&IndexedErrorMessage { + code: code.as_u16(), + message, + failures: e.failures.clone(), + }); + + return Ok(warp::reply::with_status(json, code)); + } + if err.is_not_found() { code = StatusCode::NOT_FOUND; message = "NOT_FOUND".to_string(); diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index d675ebda2e8..f4f4eb636ee 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -437,6 +437,8 @@ impl AttestationService { .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))? .data; + let mut signed_aggregate_and_proofs = Vec::new(); + for duty_and_proof in validator_duties { let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() { proof @@ -462,44 +464,53 @@ impl AttestationService { continue; } - let signed_aggregate_and_proof = if let Some(aggregate) = - self.validator_store.produce_signed_aggregate_and_proof( - pubkey, - validator_index, - aggregated_attestation.clone(), - selection_proof.clone(), - ) { - aggregate + if let Some(aggregate) = self.validator_store.produce_signed_aggregate_and_proof( + pubkey, + validator_index, + aggregated_attestation.clone(), + selection_proof.clone(), + ) { + signed_aggregate_and_proofs.push(aggregate); } else { crit!(log, "Failed to sign attestation"); continue; }; + } - let attestation = &signed_aggregate_and_proof.message.aggregate; - + if !signed_aggregate_and_proofs.is_empty() { match self .beacon_node - .post_validator_aggregate_and_proof(&signed_aggregate_and_proof) + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs.as_slice()) .await { - Ok(()) => info!( - log, - "Successfully published attestation"; - "aggregator" => signed_aggregate_and_proof.message.aggregator_index, - "signatures" => attestation.aggregation_bits.num_set_bits(), - "head_block" => format!("{:?}", attestation.data.beacon_block_root), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), - Err(e) => crit!( - log, - "Failed to publish attestation"; - "error" => e.to_string(), - "committee_index" => attestation.data.index, - "slot" => attestation.data.slot.as_u64(), - "type" => "aggregated", - ), + Ok(()) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message.aggregate; + info!( + log, + "Successfully published attestations"; + "aggregator" => signed_aggregate_and_proof.message.aggregator_index, + "signatures" => attestation.aggregation_bits.num_set_bits(), + "head_block" => format!("{:?}", attestation.data.beacon_block_root), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ); + } + } + Err(e) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message.aggregate; + crit!( + log, + "Failed to publish attestation"; + "error" => e.to_string(), + "committee_index" => attestation.data.index, + "slot" => attestation.data.slot.as_u64(), + "type" => "aggregated", + ); + } + } } } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 7f6d33fe85b..8bba3930074 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -106,6 +106,10 @@ impl DutyAndProof { pub fn validator_pubkey(&self) -> &PublicKey { &self.duty.validator_pubkey } + + pub fn validator_index(&self) -> Option { + self.duty.validator_index + } } impl Into for ValidatorDuty { @@ -229,6 +233,14 @@ impl DutiesStore { .collect() } + fn get_index(&self, pubkey: &PublicKey, epoch: Epoch) -> Option { + self.store + .read() + .get(pubkey)? + .get(&epoch)? + .validator_index() + } + fn is_aggregator(&self, validator_pubkey: &PublicKey, epoch: Epoch) -> Option { Some( self.store @@ -588,29 +600,42 @@ impl DutiesService { let mut replaced = 0; let mut invalid = 0; + // Determine which pubkeys we already know the index of by checking the duties store for + // the current epoch. + let pubkeys: Vec<(PublicKey, Option)> = self + .validator_store + .voting_pubkeys() + .into_iter() + .map(|pubkey| { + let index = self.store.get_index(&pubkey, current_epoch); + (pubkey, index) + }) + .collect(); + let mut validator_subscriptions = vec![]; - for pubkey in self.validator_store.voting_pubkeys() { - let remote_duties = match ValidatorDuty::download( - &self.beacon_node, - current_epoch, - request_epoch, - pubkey, - ) - .await - { - Ok(duties) => duties, - Err(e) => { - error!( - log, - "Failed to download validator duties"; - "error" => e - ); - continue; - } - }; + let remote_duties: Vec = match ValidatorDuty::download( + &self.beacon_node, + current_epoch, + request_epoch, + pubkeys, + &log, + ) + .await + { + Ok(duties) => duties, + Err(e) => { + error!( + log, + "Failed to download validator duties"; + "error" => e + ); + vec![] + } + }; + remote_duties.iter().for_each(|remote_duty| { // Convert the remote duties into our local representation. - let duties: DutyAndProof = remote_duties.clone().into(); + let duties: DutyAndProof = remote_duty.clone().into(); let validator_pubkey = duties.duty.validator_pubkey.clone(); @@ -628,9 +653,9 @@ impl DutiesService { debug!( log, "First duty assignment for validator"; - "proposal_slots" => format!("{:?}", &remote_duties.block_proposal_slots), - "attestation_slot" => format!("{:?}", &remote_duties.attestation_slot), - "validator" => format!("{:?}", &remote_duties.validator_pubkey) + "proposal_slots" => format!("{:?}", &remote_duty.block_proposal_slots), + "attestation_slot" => format!("{:?}", &remote_duty.attestation_slot), + "validator" => format!("{:?}", &remote_duty.validator_pubkey) ); new_validator += 1; } @@ -642,10 +667,10 @@ impl DutiesService { } if let Some(is_aggregator) = - self.store.is_aggregator(&validator_pubkey, request_epoch) + self.store.is_aggregator(&validator_pubkey, request_epoch) { if outcome.is_subscription_candidate() { - if let Some(subscription) = remote_duties.subscription(is_aggregator) { + if let Some(subscription) = remote_duty.subscription(is_aggregator) { validator_subscriptions.push(subscription) } } @@ -657,7 +682,7 @@ impl DutiesService { "error" => e ), } - } + }); if invalid > 0 { error!( diff --git a/validator_client/src/validator_duty.rs b/validator_client/src/validator_duty.rs index e5f56c38555..b87a9dbc34f 100644 --- a/validator_client/src/validator_duty.rs +++ b/validator_client/src/validator_duty.rs @@ -3,6 +3,8 @@ use eth2::{ BeaconNodeHttpClient, }; use serde::{Deserialize, Serialize}; +use slog::{error, Logger}; +use std::collections::HashMap; use types::{CommitteeIndex, Epoch, PublicKey, PublicKeyBytes, Slot}; /// This struct is being used as a shim since we deprecated the `rest_api` in favour of `http_api`. @@ -33,10 +35,10 @@ pub struct ValidatorDuty { impl ValidatorDuty { /// Instantiate `Self` as if there are no known dutes for `validator_pubkey`. - fn no_duties(validator_pubkey: PublicKey) -> Self { + fn no_duties(validator_pubkey: PublicKey, validator_index: Option) -> Self { ValidatorDuty { validator_pubkey, - validator_index: None, + validator_index, attestation_slot: None, attestation_committee_index: None, attestation_committee_position: None, @@ -53,58 +55,113 @@ impl ValidatorDuty { beacon_node: &BeaconNodeHttpClient, current_epoch: Epoch, request_epoch: Epoch, - pubkey: PublicKey, - ) -> Result { - let pubkey_bytes = PublicKeyBytes::from(&pubkey); + mut pubkeys: Vec<(PublicKey, Option)>, + log: &Logger, + ) -> Result, String> { + for (pubkey, index_opt) in &mut pubkeys { + if index_opt.is_none() { + *index_opt = beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(PublicKeyBytes::from(&*pubkey)), + ) + .await + .map_err(|e| { + error!( + log, + "Failed to obtain validator index"; + "pubkey" => ?pubkey, + "error" => ?e + ) + }) + // Supress the error since we've already logged an error and we don't want to + // stop the rest of the code. + .ok() + .and_then(|body_opt| body_opt.map(|body| body.data.index)); + } + } - let validator_index = if let Some(index) = beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey_bytes.clone()), - ) - .await - .map_err(|e| format!("Failed to get validator index: {}", e))? - .map(|body| body.data.index) - { - index + // Query for all block proposer duties in the current epoch and map the response by index. + let proposal_slots_by_index: HashMap> = if current_epoch == request_epoch { + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + .map(|resp| resp.data) + // Exit early if there's an error. + .map_err(|e| format!("Failed to get proposer indices: {:?}", e))? + .into_iter() + .fold( + HashMap::with_capacity(pubkeys.len()), + |mut map, proposer_data| { + map.entry(proposer_data.validator_index) + .or_insert_with(Vec::new) + .push(proposer_data.slot); + map + }, + ) } else { - return Ok(Self::no_duties(pubkey)); + HashMap::new() }; - if let Some(attester) = beacon_node - .get_validator_duties_attester(request_epoch, Some(&[validator_index])) + let query_indices = pubkeys + .iter() + .filter_map(|(_, index_opt)| *index_opt) + .collect::>(); + let attester_data_map = beacon_node + .post_validator_duties_attester(request_epoch, query_indices.as_slice()) .await - .map_err(|e| format!("Failed to get attester duties: {}", e))? - .data - .first() - { - let block_proposal_slots = if current_epoch == request_epoch { - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - .map_err(|e| format!("Failed to get proposer indices: {}", e))? - .data - .into_iter() - .filter(|data| data.pubkey == pubkey_bytes) - .map(|data| data.slot) - .collect() - } else { - vec![] - }; + .map(|resp| resp.data) + // Exit early if there's an error. + .map_err(|e| format!("Failed to get attester duties: {:?}", e))? + .into_iter() + .fold( + HashMap::with_capacity(pubkeys.len()), + |mut map, attester_data| { + map.insert(attester_data.validator_index, attester_data); + map + }, + ); - Ok(ValidatorDuty { - validator_pubkey: pubkey, - validator_index: Some(attester.validator_index), - attestation_slot: Some(attester.slot), - attestation_committee_index: Some(attester.committee_index), - attestation_committee_position: Some(attester.validator_committee_index as usize), - committee_count_at_slot: Some(attester.committees_at_slot), - committee_length: Some(attester.committee_length), - block_proposal_slots: Some(block_proposal_slots), + let duties = pubkeys + .into_iter() + .map(|(pubkey, index_opt)| { + if let Some(index) = index_opt { + if let Some(attester_data) = attester_data_map.get(&index) { + match attester_data.pubkey.decompress() { + Ok(pubkey) => ValidatorDuty { + validator_pubkey: pubkey, + validator_index: Some(attester_data.validator_index), + attestation_slot: Some(attester_data.slot), + attestation_committee_index: Some(attester_data.committee_index), + attestation_committee_position: Some( + attester_data.validator_committee_index as usize, + ), + committee_count_at_slot: Some(attester_data.committees_at_slot), + committee_length: Some(attester_data.committee_length), + block_proposal_slots: proposal_slots_by_index + .get(&attester_data.validator_index) + .cloned(), + }, + Err(e) => { + error!( + log, + "Could not deserialize validator public key"; + "error" => format!("{:?}", e), + "validator_index" => attester_data.validator_index + ); + Self::no_duties(pubkey, Some(index)) + } + } + } else { + Self::no_duties(pubkey, Some(index)) + } + } else { + Self::no_duties(pubkey, None) + } }) - } else { - Ok(Self::no_duties(pubkey)) - } + .collect(); + + Ok(duties) } /// Return `true` if these validator duties are equal, ignoring their `block_proposal_slots`.