diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 3c0f70fb570..106fed006c8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1034,6 +1034,8 @@ pub fn serve( */ // POST beacon/blocks + + // TODO: THIS IS NOT THE RIGHT CODE let post_beacon_blocks = eth_v1 .and(warp::path("beacon")) .and(warp::path("blocks")) @@ -1047,12 +1049,11 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| async move { - publish_blocks::publish_block(None, block, None, chain, &network_tx, log) + publish_blocks::publish_block(None, block, chain, &network_tx, log) .await .map(|()| warp::reply()) }, ); - /* * beacon/blocks */ diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 0305213cc98..754d7d91f10 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,7 +1,7 @@ use crate::metrics; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; -use lighthouse_network::{PubsubMessage, SignedBeaconBlockAndBlobsSidecar}; +use lighthouse_network::PubsubMessage; use network::NetworkMessage; use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -31,6 +31,8 @@ pub async fn publish_block( // Send the block, regardless of whether or not it is valid. The API // specification is very clear that this is the desired behaviour. + let message = todo!(""); + /* let message = if matches!(block, SignedBeaconBlock::Eip4844(_)) { if let Some(sidecar) = chain.blob_cache.pop(&block_root) { PubsubMessage::BeaconBlockAndBlobsSidecars(Arc::new(SignedBeaconBlockAndBlobsSidecar { @@ -44,6 +46,7 @@ pub async fn publish_block( } else { PubsubMessage::BeaconBlock(block.clone()) }; + */ crate::publish_pubsub_message(network_tx, message)?; // Determine the delay after the start of the slot, register it with metrics. diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 048fa670521..da86fa7d051 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -957,7 +957,7 @@ mod tests { OutboundRequest::BlobsByRange(blbrange) => { assert_eq!(decoded, InboundRequest::BlobsByRange(blbrange)) } - OutboundRequest::BlobsByRoot(blbroot) => { + OutboundRequest::BlobsByRoot(bbroot) => { assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) } OutboundRequest::Ping(ping) => { diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 7025711e597..3cda2c1a9a6 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1334,6 +1334,11 @@ impl BeaconProcessor { Work::UnknownBlockAggregate { .. } => { unknown_block_aggregate_queue.push(work) } + Work::BlobsByRootsRequest { + peer_id, + request_id, + request, + } => todo!(), } } } diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 53a3d700bda..7bcf0dcf8d0 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -842,7 +842,8 @@ impl Worker { "gossip_block_low", ); return None; - } + } + Err(blob_errors) => unimplemented!("handle") }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 4a22266ab47..86feddec515 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -499,6 +499,7 @@ impl Worker { //FIXME(sean) create the blobs iter + /* let forwards_blob_root_iter = match self .chain .forwards_iter_block_roots(Slot::from(req.start_slot)) @@ -511,12 +512,13 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + // return self.send_error_response( + // peer_id, + // RPCResponseErrorCode::ResourceUnavailable, + // "Backfilling".into(), + // request_id, + // ); + todo!("stuff") } Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), }; @@ -546,7 +548,9 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); + */ // Fetching blocks is async because it may have to hit the execution layer for payloads. + /* executor.spawn( async move { let mut blocks_sent = 0; @@ -623,5 +627,7 @@ impl Worker { }, "load_blocks_by_range_blocks", ); + */ + unimplemented!("") } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index f6cdb098260..176fbde96ca 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -210,6 +210,7 @@ impl Processor { unreachable!("Block lookups do not request BBRange requests") } id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, + SyncId::RangeBlockBlob { id } => unimplemented!("do it"), }, RequestId::Router => unreachable!("All BBRange requests belong to sync"), }; @@ -268,6 +269,8 @@ impl Processor { SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } + + SyncId::RangeBlockBlob { id } => unimplemented!("do it"), }, RequestId::Router => unreachable!("All BBRoot requests belong to sync"), }; @@ -298,6 +301,8 @@ impl Processor { SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } + + SyncId::RangeBlockBlob { id } => unimplemented!("do it"), }, RequestId::Router => unreachable!("All BBRoot requests belong to sync"), }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3d3be5c0970..5f03d54ab5d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -39,7 +39,7 @@ use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; -use crate::service::{NetworkMessage, RequestId}; +use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; @@ -68,6 +68,17 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; pub type Id = u32; +#[derive(Debug)] +pub struct SeansBlob {} + +#[derive(Debug)] +pub struct SeansBlock {} + +#[derive(Debug)] +pub struct SeansBlockBlob { + blob: SeansBlob, + block: SeansBlock, +} /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { @@ -312,8 +323,7 @@ impl SyncManager { } } RequestId::RangeBlockBlob { id } => { - if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(request_id) - { + if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(id) { self.range_sync.inject_error( &mut self.network, peer_id, @@ -617,6 +627,18 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, + SyncMessage::RpcBlob { + request_id, + peer_id, + blob_sidecar, + seen_timestamp, + } => todo!(), + SyncMessage::RpcBlockAndBlob { + request_id, + peer_id, + block_and_blobs, + seen_timestamp, + } => todo!(), } } @@ -736,7 +758,7 @@ impl SyncManager { } RequestId::RangeBlockBlob { id } => { // do stuff - self.network.block_blob_block_response(id, block); + // self.network.block_blob_block_response(id, block); } } } @@ -749,10 +771,9 @@ impl SyncManager { seen_timestamp: Duration, ) { let RequestId::RangeBlockBlob { id } = request_id else { - return error!("bad stuff"); + panic!("Wrong things going on "); }; // get the paired block blob from the network context and send it to range - self.network.block_blob_blob_response(request_id, blob) } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b8d4b81c9ce..15003caa1a4 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -178,10 +178,10 @@ impl SyncNetworkContext { request: blocks_request, request_id, }) - .and_then(|| { + .and_then(|_| { self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request: blocks_request, + request: blobs_request, request_id, }) })?; @@ -247,55 +247,57 @@ impl SyncNetworkContext { request_id: Id, block: Option, ) -> Option<(ChainId, BatchId, Option)> { - let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?; - let response = match block { - Some(block) => match info.accumulated_blobs.pop_front() { - Some(blob) => Some(SeansBlockBlob { block, blob }), - None => { - // accumulate the block - info.accumulated_blocks.push_back(block); - None - } - }, - None => { - info.is_blocks_rpc_finished = true; - - if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { - // this is the coupled stream termination - Some((chain_id, batch_id, None)) - } else { - None - } - } - }; + unimplemented!() + // let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?; + // match block { + // Some(block) => match info.accumulated_blobs.pop_front() { + // Some(blob) => Some(SeansBlockBlob { block, blob }), + // None => { + // // accumulate the block + // info.accumulated_blocks.push_back(block); + // None + // } + // }, + // None => { + // info.is_blocks_rpc_finished = true; + // + // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { + // // this is the coupled stream termination + // Some((chain_id, batch_id, None)) + // } else { + // None + // } + // } + // } } pub fn block_blob_blob_response( &mut self, request_id: Id, blob: Option, - ) -> Option<(ChainId, Option)> { - let (chain_id, info) = self.block_blob_requests.get_mut(&request_id)?; - let response = match blob { - Some(blob) => match info.accumulated_blocks.pop_front() { - Some(block) => Some(SeansBlockBlob { block, blob }), - None => { - // accumulate the blob - info.accumulated_blobs.push_back(blob); - None - } - }, - None => { - info.is_blobs_rpc_finished = true; - - if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { - // this is the coupled stream termination - Some((chain_id, batch_id, None)) - } else { - None - } - } - }; + ) -> Option<(ChainId, BatchId, Option)> { + // let (batch_id, chain_id, info) = self.block_blob_requests.get_mut(&request_id)?; + // match blob { + // Some(blob) => match info.accumulated_blocks.pop_front() { + // Some(block) => Some(SeansBlockBlob { block, blob }), + // None => { + // // accumulate the blob + // info.accumulated_blobs.push_back(blob); + // None + // } + // }, + // None => { + // info.is_blobs_rpc_finished = true; + // + // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { + // // this is the coupled stream termination + // Some((chain_id, batch_id, None)) + // } else { + // None + // } + // } + // } + unimplemented!("do it") } /// Received a blocks by range response.