diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e326c0bf98d..d0da2915bc1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -145,16 +145,10 @@ pub fn serve( }); // GET beacon/states/{state_id}/finality_checkpoints - let get_beacon_state_finality_checkpoints = warp::any() - .and(warp::path(API_PREFIX)) - .and(warp::path(API_VERSION)) - .and(warp::path("beacon")) - .and(warp::path("states")) - .and(warp::path::param::()) + let get_beacon_state_finality_checkpoints = beacon_states_path + .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and(warp::get()) - .and(chain_filter.clone()) .and_then(|state_id: StateId, chain: Arc>| { blocking_json_task(move || { state_id @@ -453,7 +447,6 @@ pub fn serve( .and(warp::path("beacon")) .and(warp::path("blocks")) .and(warp::path::end()) - .and(warp::post()) .and(warp::body::json()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) @@ -556,7 +549,6 @@ pub fn serve( attestation: Attestation, network_tx: UnboundedSender>| { blocking_json_task(move || { - dbg!("this"); let attestation = chain .verify_unaggregated_attestation_for_gossip(attestation.clone(), None) .map_err(|e| { @@ -604,11 +596,51 @@ pub fn serve( .and(warp::path::end()) .and_then(|chain: Arc>| { blocking_json_task(move || { - let attestations = chain.op_pool.get_all_attestations(); + let mut attestations = chain.op_pool.get_all_attestations(); + attestations.extend(chain.naive_aggregation_pool.read().iter().cloned()); Ok(api_types::GenericResponse::from(attestations)) }) }); + // POST beacon/pool/attester_slashings + let post_beacon_pool_attester_slashings = beacon_pool_path + .clone() + .and(warp::path("attester_slashings")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(network_tx_filter.clone()) + .and_then( + |chain: Arc>, + slashing: AttesterSlashing, + network_tx: UnboundedSender>| { + blocking_json_task(move || { + let outcome = chain + .verify_attester_slashing_for_gossip(slashing.clone()) + .map_err(|e| { + crate::reject::object_invalid(format!( + "gossip verification failed: {:?}", + e + )) + })?; + + if let ObservationOutcome::New(slashing) = outcome { + publish_network_message( + &network_tx, + PubsubMessage::AttesterSlashing(Box::new( + slashing.clone().into_inner(), + )), + )?; + + chain + .import_attester_slashing(slashing) + .map_err(crate::reject::beacon_chain_error)?; + } + + Ok(()) + }) + }, + ); + // GET beacon/pool/attester_slashings let get_beacon_pool_attester_slashings = beacon_pool_path .clone() @@ -621,6 +653,43 @@ pub fn serve( }) }); + // POST beacon/pool/proposer_slashings + let post_beacon_pool_proposer_slashings = beacon_pool_path + .clone() + .and(warp::path("proposer_slashings")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(network_tx_filter.clone()) + .and_then( + |chain: Arc>, + slashing: ProposerSlashing, + network_tx: UnboundedSender>| { + blocking_json_task(move || { + let outcome = chain + .verify_proposer_slashing_for_gossip(slashing.clone()) + .map_err(|e| { + crate::reject::object_invalid(format!( + "gossip verification failed: {:?}", + e + )) + })?; + + if let ObservationOutcome::New(slashing) = outcome { + publish_network_message( + &network_tx, + PubsubMessage::ProposerSlashing(Box::new( + slashing.clone().into_inner(), + )), + )?; + + chain.import_proposer_slashing(slashing); + } + + Ok(()) + }) + }, + ); + // GET beacon/pool/proposer_slashings let get_beacon_pool_proposer_slashings = beacon_pool_path .clone() @@ -633,6 +702,41 @@ pub fn serve( }) }); + // POST beacon/pool/voluntary_exits + let post_beacon_pool_voluntary_exits = beacon_pool_path + .clone() + .and(warp::path("voluntary_exits")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(network_tx_filter.clone()) + .and_then( + |chain: Arc>, + exit: SignedVoluntaryExit, + network_tx: UnboundedSender>| { + blocking_json_task(move || { + let outcome = chain + .verify_voluntary_exit_for_gossip(exit.clone()) + .map_err(|e| { + crate::reject::object_invalid(format!( + "gossip verification failed: {:?}", + e + )) + })?; + + if let ObservationOutcome::New(exit) = outcome { + publish_network_message( + &network_tx, + PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())), + )?; + + chain.import_voluntary_exit(exit); + } + + Ok(()) + }) + }, + ); + // GET beacon/pool/voluntary_exits let get_beacon_pool_voluntary_exits = beacon_pool_path .clone() @@ -645,6 +749,112 @@ pub fn serve( }) }); + /* + let routes = warp::get() + .and( + get_beacon_genesis + .or(get_beacon_state_root.boxed()) + .or(get_beacon_state_fork.boxed()) + .or(get_beacon_state_finality_checkpoints.boxed()) + .or(get_beacon_state_validators.boxed()) + .or(get_beacon_state_validators_id.boxed()) + .or(get_beacon_state_committees.boxed()) + .or(get_beacon_headers.boxed()) + .or(get_beacon_headers_block_id.boxed()) + .or(get_beacon_block.boxed()) + .or(get_beacon_block_attestations.boxed()) + .or(get_beacon_block_root.boxed()) + .or(get_beacon_pool_attestations.boxed()) + .or(get_beacon_pool_attester_slashings.boxed()) + .or(get_beacon_pool_proposer_slashings.boxed()) + .or(get_beacon_pool_voluntary_exits.boxed()) + .boxed(), + ) + .or(warp::post().and( + post_beacon_blocks, /* + .or(post_beacon_pool_attestations.boxed()) + .or(post_beacon_pool_attester_slashings.boxed()) + .or(post_beacon_pool_proposer_slashings.boxed()) + .or(post_beacon_pool_voluntary_exits.boxed()) + .boxed(), + */ + )) + .recover(crate::reject::handle_rejection); + + fn combine_methods(mut methods: Vec) -> warp::Rejection { + let candidate = methods + .iter() + .position(|err| { + !err.is_not_found() && !err.find::().is_some() + }) + .or_else(|| { + methods + .iter() + .position(|err| !err.find::().is_some()) + }); + + candidate + .map(|i| methods.remove(i)) + .unwrap_or_else(|| warp::reject::not_found()) + } + + fn combine_methods(mut methods: Vec) -> warp::Rejection { + let candidate = methods + .iter() + .position(|err| { + !err.is_not_found() && !err.find::().is_some() + }) + .or_else(|| { + methods + .iter() + .position(|err| !err.find::().is_some()) + }); + + candidate + .map(|i| methods.remove(i)) + .unwrap_or_else(|| warp::reject::not_found()) + } + + async fn ignore_bad_request(err: warp::Rejection) -> warp::Rejection { + if let Some(_) = err.find::() { + warp::reject::not_found() + } else { + err + } + } + */ + + let routes = warp::get() + .and( + get_beacon_genesis + .or(get_beacon_state_root.boxed()) + .or(get_beacon_state_fork.boxed()) + .or(get_beacon_state_finality_checkpoints.boxed()) + .or(get_beacon_state_validators.boxed()) + .or(get_beacon_state_validators_id.boxed()) + .or(get_beacon_state_committees.boxed()) + .or(get_beacon_headers.boxed()) + .or(get_beacon_headers_block_id.boxed()) + .or(get_beacon_block.boxed()) + .or(get_beacon_block_attestations.boxed()) + .or(get_beacon_block_root.boxed()) + .or(get_beacon_pool_attestations.boxed()) + .or(get_beacon_pool_attester_slashings.boxed()) + .or(get_beacon_pool_proposer_slashings.boxed()) + .or(get_beacon_pool_voluntary_exits.boxed()) + .boxed(), + ) + .or(warp::post().and( + post_beacon_blocks + .or(post_beacon_pool_attestations.boxed()) + .or(post_beacon_pool_attester_slashings.boxed()) + .or(post_beacon_pool_proposer_slashings.boxed()) + .or(post_beacon_pool_voluntary_exits.boxed()) + .boxed(), + )) + .recover(crate::reject::handle_rejection); + + /* let routes = get_beacon_genesis .or(get_beacon_state_root) .or(get_beacon_state_fork) @@ -658,11 +868,16 @@ pub fn serve( .or(get_beacon_block) .or(get_beacon_block_attestations) .or(get_beacon_block_root) + .or(post_beacon_pool_attestations) .or(get_beacon_pool_attestations) + .or(post_beacon_pool_attester_slashings) .or(get_beacon_pool_attester_slashings) + .or(post_beacon_pool_proposer_slashings) .or(get_beacon_pool_proposer_slashings) + .or(post_beacon_pool_voluntary_exits) .or(get_beacon_pool_voluntary_exits) .recover(crate::reject::handle_rejection); + */ let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( SocketAddrV4::new(config.listen_addr, config.listen_port), diff --git a/beacon_node/http_api/src/reject.rs b/beacon_node/http_api/src/reject.rs index 7af0c783ad5..c2bb5210a5c 100644 --- a/beacon_node/http_api/src/reject.rs +++ b/beacon_node/http_api/src/reject.rs @@ -1,6 +1,5 @@ use eth2::types::ErrorMessage; use std::convert::Infallible; -use std::error::Error; use warp::{http::StatusCode, reject::Reject}; #[derive(Debug)] @@ -67,23 +66,8 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { - // This error happens if the body could not be deserialized correctly - // We can use the cause to analyze the error and customize the error message - message = match e.source() { - Some(cause) => { - if cause.to_string().contains("denom") { - "FIELD_ERROR: denom" - } else { - "BAD_REQUEST" - } - } - None => "BAD_REQUEST", - } - .to_string(); + message = format!("BAD_REQUEST: body deserialize error: {}", e); code = StatusCode::BAD_REQUEST; - } else if let Some(_) = err.find::() { - code = StatusCode::METHOD_NOT_ALLOWED; - message = "METHOD_NOT_ALLOWED".to_string(); } else if let Some(e) = err.find::() { code = StatusCode::BAD_REQUEST; message = format!("BAD_REQUEST (invalid query): {}", e); @@ -109,9 +93,10 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result() { code = StatusCode::BAD_REQUEST; message = format!("BAD_REQUEST: Invalid object: {}", e.0); + } else if let Some(_) = err.find::() { + code = StatusCode::METHOD_NOT_ALLOWED; + message = "METHOD_NOT_ALLOWED".to_string(); } else { - // We should have expected this... Just log and say its a 500 - eprintln!("unhandled rejection: {:?}", err); code = StatusCode::INTERNAL_SERVER_ERROR; message = "UNHANDLED_REJECTION".to_string(); } diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 07eff7910b7..2978a5e4f3d 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -696,8 +696,6 @@ impl ApiTester { .await .unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); - assert!( self.network_rx.try_recv().is_ok(), "valid attestation should be sent to network" @@ -707,6 +705,26 @@ impl ApiTester { self } + pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self { + for attestation in &self.attestations { + let mut attestation = attestation.clone(); + attestation.data.slot += 1; + + assert!(self + .client + .post_beacon_pool_attestations(&attestation) + .await + .is_err()); + + assert!( + self.network_rx.try_recv().is_ok(), + "invalid attestation should not be sent to network" + ); + } + + self + } + pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client @@ -864,3 +882,10 @@ async fn beacon_pools_post_attestations_valid() { .test_post_beacon_pool_attestations_valid() .await; } + +#[tokio::test(core_threads = 2)] +async fn beacon_pools_post_attestations_invalid() { + ApiTester::new() + .test_post_beacon_pool_attestations_invalid() + .await; +}