Skip to content

Commit

Permalink
Add pool POST endpoints, fix error issue, bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 5, 2020
1 parent dba921f commit 0f3cdf3
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 32 deletions.
237 changes: 226 additions & 11 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,10 @@ pub fn serve<T: BeaconChainTypes>(
});

// GET beacon/states/{state_id}/finality_checkpoints
let get_beacon_state_finality_checkpoints = warp::any()
.and(warp::path(API_PREFIX))
.and(warp::path(API_VERSION))
.and(warp::path("beacon"))
.and(warp::path("states"))
.and(warp::path::param::<StateId>())
let get_beacon_state_finality_checkpoints = beacon_states_path
.clone()
.and(warp::path("finality_checkpoints"))
.and(warp::path::end())
.and(warp::get())
.and(chain_filter.clone())
.and_then(|state_id: StateId, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
state_id
Expand Down Expand Up @@ -453,7 +447,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
Expand Down Expand Up @@ -556,7 +549,6 @@ pub fn serve<T: BeaconChainTypes>(
attestation: Attestation<T::EthSpec>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
blocking_json_task(move || {
dbg!("this");
let attestation = chain
.verify_unaggregated_attestation_for_gossip(attestation.clone(), None)
.map_err(|e| {
Expand Down Expand Up @@ -604,11 +596,51 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let attestations = chain.op_pool.get_all_attestations();
let mut attestations = chain.op_pool.get_all_attestations();
attestations.extend(chain.naive_aggregation_pool.read().iter().cloned());
Ok(api_types::GenericResponse::from(attestations))
})
});

// POST beacon/pool/attester_slashings
let post_beacon_pool_attester_slashings = beacon_pool_path
.clone()
.and(warp::path("attester_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
slashing: AttesterSlashing<T::EthSpec>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
blocking_json_task(move || {
let outcome = chain
.verify_attester_slashing_for_gossip(slashing.clone())
.map_err(|e| {
crate::reject::object_invalid(format!(
"gossip verification failed: {:?}",
e
))
})?;

if let ObservationOutcome::New(slashing) = outcome {
publish_network_message(
&network_tx,
PubsubMessage::AttesterSlashing(Box::new(
slashing.clone().into_inner(),
)),
)?;

chain
.import_attester_slashing(slashing)
.map_err(crate::reject::beacon_chain_error)?;
}

Ok(())
})
},
);

// GET beacon/pool/attester_slashings
let get_beacon_pool_attester_slashings = beacon_pool_path
.clone()
Expand All @@ -621,6 +653,43 @@ pub fn serve<T: BeaconChainTypes>(
})
});

// POST beacon/pool/proposer_slashings
let post_beacon_pool_proposer_slashings = beacon_pool_path
.clone()
.and(warp::path("proposer_slashings"))
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
slashing: ProposerSlashing,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
blocking_json_task(move || {
let outcome = chain
.verify_proposer_slashing_for_gossip(slashing.clone())
.map_err(|e| {
crate::reject::object_invalid(format!(
"gossip verification failed: {:?}",
e
))
})?;

if let ObservationOutcome::New(slashing) = outcome {
publish_network_message(
&network_tx,
PubsubMessage::ProposerSlashing(Box::new(
slashing.clone().into_inner(),
)),
)?;

chain.import_proposer_slashing(slashing);
}

Ok(())
})
},
);

// GET beacon/pool/proposer_slashings
let get_beacon_pool_proposer_slashings = beacon_pool_path
.clone()
Expand All @@ -633,6 +702,41 @@ pub fn serve<T: BeaconChainTypes>(
})
});

// POST beacon/pool/voluntary_exits
let post_beacon_pool_voluntary_exits = beacon_pool_path
.clone()
.and(warp::path("voluntary_exits"))
.and(warp::path::end())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
exit: SignedVoluntaryExit,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
blocking_json_task(move || {
let outcome = chain
.verify_voluntary_exit_for_gossip(exit.clone())
.map_err(|e| {
crate::reject::object_invalid(format!(
"gossip verification failed: {:?}",
e
))
})?;

if let ObservationOutcome::New(exit) = outcome {
publish_network_message(
&network_tx,
PubsubMessage::VoluntaryExit(Box::new(exit.clone().into_inner())),
)?;

chain.import_voluntary_exit(exit);
}

Ok(())
})
},
);

// GET beacon/pool/voluntary_exits
let get_beacon_pool_voluntary_exits = beacon_pool_path
.clone()
Expand All @@ -645,6 +749,112 @@ pub fn serve<T: BeaconChainTypes>(
})
});

/*
let routes = warp::get()
.and(
get_beacon_genesis
.or(get_beacon_state_root.boxed())
.or(get_beacon_state_fork.boxed())
.or(get_beacon_state_finality_checkpoints.boxed())
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_committees.boxed())
.or(get_beacon_headers.boxed())
.or(get_beacon_headers_block_id.boxed())
.or(get_beacon_block.boxed())
.or(get_beacon_block_attestations.boxed())
.or(get_beacon_block_root.boxed())
.or(get_beacon_pool_attestations.boxed())
.or(get_beacon_pool_attester_slashings.boxed())
.or(get_beacon_pool_proposer_slashings.boxed())
.or(get_beacon_pool_voluntary_exits.boxed())
.boxed(),
)
.or(warp::post().and(
post_beacon_blocks, /*
.or(post_beacon_pool_attestations.boxed())
.or(post_beacon_pool_attester_slashings.boxed())
.or(post_beacon_pool_proposer_slashings.boxed())
.or(post_beacon_pool_voluntary_exits.boxed())
.boxed(),
*/
))
.recover(crate::reject::handle_rejection);
fn combine_methods(mut methods: Vec<warp::Rejection>) -> warp::Rejection {
let candidate = methods
.iter()
.position(|err| {
!err.is_not_found() && !err.find::<warp::reject::MethodNotAllowed>().is_some()
})
.or_else(|| {
methods
.iter()
.position(|err| !err.find::<warp::reject::MethodNotAllowed>().is_some())
});
candidate
.map(|i| methods.remove(i))
.unwrap_or_else(|| warp::reject::not_found())
}
fn combine_methods(mut methods: Vec<warp::Rejection>) -> warp::Rejection {
let candidate = methods
.iter()
.position(|err| {
!err.is_not_found() && !err.find::<warp::reject::MethodNotAllowed>().is_some()
})
.or_else(|| {
methods
.iter()
.position(|err| !err.find::<warp::reject::MethodNotAllowed>().is_some())
});
candidate
.map(|i| methods.remove(i))
.unwrap_or_else(|| warp::reject::not_found())
}
async fn ignore_bad_request(err: warp::Rejection) -> warp::Rejection {
if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
warp::reject::not_found()
} else {
err
}
}
*/

let routes = warp::get()
.and(
get_beacon_genesis
.or(get_beacon_state_root.boxed())
.or(get_beacon_state_fork.boxed())
.or(get_beacon_state_finality_checkpoints.boxed())
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_committees.boxed())
.or(get_beacon_headers.boxed())
.or(get_beacon_headers_block_id.boxed())
.or(get_beacon_block.boxed())
.or(get_beacon_block_attestations.boxed())
.or(get_beacon_block_root.boxed())
.or(get_beacon_pool_attestations.boxed())
.or(get_beacon_pool_attester_slashings.boxed())
.or(get_beacon_pool_proposer_slashings.boxed())
.or(get_beacon_pool_voluntary_exits.boxed())
.boxed(),
)
.or(warp::post().and(
post_beacon_blocks
.or(post_beacon_pool_attestations.boxed())
.or(post_beacon_pool_attester_slashings.boxed())
.or(post_beacon_pool_proposer_slashings.boxed())
.or(post_beacon_pool_voluntary_exits.boxed())
.boxed(),
))
.recover(crate::reject::handle_rejection);

/*
let routes = get_beacon_genesis
.or(get_beacon_state_root)
.or(get_beacon_state_fork)
Expand All @@ -658,11 +868,16 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_beacon_block)
.or(get_beacon_block_attestations)
.or(get_beacon_block_root)
.or(post_beacon_pool_attestations)
.or(get_beacon_pool_attestations)
.or(post_beacon_pool_attester_slashings)
.or(get_beacon_pool_attester_slashings)
.or(post_beacon_pool_proposer_slashings)
.or(get_beacon_pool_proposer_slashings)
.or(post_beacon_pool_voluntary_exits)
.or(get_beacon_pool_voluntary_exits)
.recover(crate::reject::handle_rejection);
*/

let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
SocketAddrV4::new(config.listen_addr, config.listen_port),
Expand Down
23 changes: 4 additions & 19 deletions beacon_node/http_api/src/reject.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use eth2::types::ErrorMessage;
use std::convert::Infallible;
use std::error::Error;
use warp::{http::StatusCode, reject::Reject};

#[derive(Debug)]
Expand Down Expand Up @@ -67,23 +66,8 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string();
} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
// This error happens if the body could not be deserialized correctly
// We can use the cause to analyze the error and customize the error message
message = match e.source() {
Some(cause) => {
if cause.to_string().contains("denom") {
"FIELD_ERROR: denom"
} else {
"BAD_REQUEST"
}
}
None => "BAD_REQUEST",
}
.to_string();
message = format!("BAD_REQUEST: body deserialize error: {}", e);
code = StatusCode::BAD_REQUEST;
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "METHOD_NOT_ALLOWED".to_string();
} else if let Some(e) = err.find::<warp::reject::InvalidQuery>() {
code = StatusCode::BAD_REQUEST;
message = format!("BAD_REQUEST (invalid query): {}", e);
Expand All @@ -109,9 +93,10 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
} else if let Some(e) = err.find::<crate::reject::ObjectInvalid>() {
code = StatusCode::BAD_REQUEST;
message = format!("BAD_REQUEST: Invalid object: {}", e.0);
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "METHOD_NOT_ALLOWED".to_string();
} else {
// We should have expected this... Just log and say its a 500
eprintln!("unhandled rejection: {:?}", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "UNHANDLED_REJECTION".to_string();
}
Expand Down
29 changes: 27 additions & 2 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,6 @@ impl ApiTester {
.await
.unwrap();

std::thread::sleep(std::time::Duration::from_secs(1));

assert!(
self.network_rx.try_recv().is_ok(),
"valid attestation should be sent to network"
Expand All @@ -707,6 +705,26 @@ impl ApiTester {
self
}

pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self {
for attestation in &self.attestations {
let mut attestation = attestation.clone();
attestation.data.slot += 1;

assert!(self
.client
.post_beacon_pool_attestations(&attestation)
.await
.is_err());

assert!(
self.network_rx.try_recv().is_ok(),
"invalid attestation should not be sent to network"
);
}

self
}

pub async fn test_get_beacon_pool_attestations(self) -> Self {
let result = self
.client
Expand Down Expand Up @@ -864,3 +882,10 @@ async fn beacon_pools_post_attestations_valid() {
.test_post_beacon_pool_attestations_valid()
.await;
}

#[tokio::test(core_threads = 2)]
async fn beacon_pools_post_attestations_invalid() {
ApiTester::new()
.test_post_beacon_pool_attestations_invalid()
.await;
}

0 comments on commit 0f3cdf3

Please sign in to comment.