From 0f32b73ebc1505b3495d3719e158ac4630d7020b Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Oct 2024 16:36:33 +1100 Subject: [PATCH] Implement gradual blob publication. --- beacon_node/beacon_chain/src/chain_config.rs | 14 ++- beacon_node/beacon_chain/src/fetch_blobs.rs | 7 +- .../src/network_beacon_processor/mod.rs | 104 ++++++++++++++---- beacon_node/src/cli.rs | 12 +- beacon_node/src/config.rs | 15 +-- lighthouse/tests/beacon_node.rs | 21 ++-- 6 files changed, 114 insertions(+), 59 deletions(-) diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 5cae99484b..b8a607c886 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -88,10 +88,12 @@ pub struct ChainConfig { pub malicious_withhold_count: usize, /// Enable peer sampling on blocks. pub enable_sampling: bool, - /// Number of batches that supernodes split data columns into during publishing by a non-proposer. - pub supernode_data_column_publication_batches: usize, - /// The delay applied by supernodes between the sending of each data column batch. - pub supernode_data_column_publication_batch_interval: Duration, + /// Number of batches that the node splits blobs or data columns into during publication. + /// This doesn't apply if the node is the block proposer. For PeerDAS only. + pub blob_publication_batches: usize, + /// The delay in milliseconds applied by the node between sending each blob or data column batch. + /// This doesn't apply if the node is the block proposer. + pub blob_publication_batch_interval: Duration, } impl Default for ChainConfig { @@ -125,8 +127,8 @@ impl Default for ChainConfig { enable_light_client_server: false, malicious_withhold_count: 0, enable_sampling: false, - supernode_data_column_publication_batches: 4, - supernode_data_column_publication_batch_interval: Duration::from_millis(200), + blob_publication_batches: 4, + blob_publication_batch_interval: Duration::from_millis(300), } } } diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index 9118d63095..836d2b85cc 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -13,7 +13,7 @@ use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes use execution_layer::json_structures::BlobAndProofV1; use execution_layer::Error as ExecutionLayerError; use itertools::Either; -use lighthouse_metrics::{inc_counter, inc_counter_by, TryExt}; +use metrics::{inc_counter, inc_counter_by, TryExt}; use slog::{debug, error, o, Logger}; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; @@ -108,7 +108,10 @@ pub async fn fetch_and_process_engine_blobs( &kzg_commitments_proof, )?; - let num_fetched_blobs = fixed_blob_sidecar_list.filter(|b| b.is_some()).count(); + let num_fetched_blobs = fixed_blob_sidecar_list + .iter() + .filter(|b| b.is_some()) + .count(); inc_counter_by( &metrics::BLOBS_FROM_EL_EXPECTED_TOTAL, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 30d834a4e8..6ccbca8ef7 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -69,6 +69,9 @@ pub struct NetworkBeaconProcessor { pub log: Logger, } +// Publish blobs in batches of exponentially increasing size. +const BLOB_PUBLICATION_EXP_FACTOR: usize = 2; + impl NetworkBeaconProcessor { fn try_send(&self, event: BeaconWorkEvent) -> Result<(), Error> { self.beacon_processor_send @@ -887,17 +890,7 @@ impl NetworkBeaconProcessor { ) { match blobs_or_data_column { BlobsOrDataColumns::Blobs(blobs) => { - debug!( - self.log, - "Publishing blobs from EL"; - "count" => blobs.len(), - "block_root" => ?block_root, - ); - let messages = blobs - .into_iter() - .map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob)))) - .collect(); - self.send_network_message(NetworkMessage::Publish { messages }); + self.publish_blobs_gradually(blobs, block_root); } BlobsOrDataColumns::DataColumns(columns) => { self.publish_data_columns_gradually(columns, block_root); @@ -1016,6 +1009,81 @@ impl NetworkBeaconProcessor { } } + /// This function gradually publishes blobs to the network in randomised batches. + /// + /// This is an optimisation to reduce outbound bandwidth and ensures each blob is published + /// by some nodes on the network as soon as possible. Our hope is that some blobs arrive from + /// other nodes in the meantime, obviating the need for us to publish them. If no other + /// publisher exists for a blob, it will eventually get published here. + fn publish_blobs_gradually( + self: &Arc, + mut blobs: Vec>>, + block_root: Hash256, + ) { + let self_clone = self.clone(); + + self.executor.spawn( + async move { + let chain = self_clone.chain.clone(); + let log = self_clone.chain.logger(); + let publish_fn = |blobs: Vec>>| { + self_clone.send_network_message(NetworkMessage::Publish { + messages: blobs + .into_iter() + .map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob)))) + .collect(), + }); + }; + + // Permute the blobs and split them into batches. + // The hope is that we won't need to publish some blobs because we will receive them + // on gossip from other nodes. + blobs.shuffle(&mut rand::thread_rng()); + + let blob_publication_batch_interval = chain.config.blob_publication_batch_interval; + let mut publish_count = 0usize; + let mut blobs_iter = blobs.iter().peekable(); + let mut batch_size = 1usize; + + while blobs_iter.peek().is_some() { + let batch = blobs_iter.by_ref().take(batch_size); + let already_seen = chain + .data_availability_checker + .cached_blob_indexes(&block_root) + .unwrap_or_default(); + let publishable = batch + .filter(|col| !already_seen.contains(&col.index)) + .cloned() + .collect::>(); + + if !publishable.is_empty() { + debug!( + log, + "Publishing blob batch"; + "publish_count" => publishable.len(), + "block_root" => ?block_root, + ); + publish_count += publishable.len(); + publish_fn(publishable); + } + + tokio::time::sleep(blob_publication_batch_interval).await; + batch_size *= BLOB_PUBLICATION_EXP_FACTOR; + } + + debug!( + log, + "Batch blob publication complete"; + "batch_interval" => blob_publication_batch_interval.as_millis(), + "blob_count" => blobs.len(), + "published_count" => publish_count, + "block_root" => ?block_root, + ) + }, + "gradual_blob_publication", + ); + } + /// This function gradually publishes data columns to the network in randomised batches. /// /// This is an optimisation to reduce outbound bandwidth and ensures each column is published @@ -1053,13 +1121,9 @@ impl NetworkBeaconProcessor { // on gossip from other supernodes. data_columns_to_publish.shuffle(&mut rand::thread_rng()); - let supernode_data_column_publication_batch_interval = chain - .config - .supernode_data_column_publication_batch_interval; - let supernode_data_column_publication_batches = - chain.config.supernode_data_column_publication_batches; - let batch_size = - data_columns_to_publish.len() / supernode_data_column_publication_batches; + let blob_publication_batch_interval = chain.config.blob_publication_batch_interval; + let blob_publication_batches = chain.config.blob_publication_batches; + let batch_size = chain.spec.number_of_columns / blob_publication_batches; let mut publish_count = 0usize; for batch in data_columns_to_publish.chunks(batch_size) { @@ -1084,14 +1148,14 @@ impl NetworkBeaconProcessor { publish_fn(publishable); } - tokio::time::sleep(supernode_data_column_publication_batch_interval).await; + tokio::time::sleep(blob_publication_batch_interval).await; } debug!( log, "Batch data column publishing complete"; "batch_size" => batch_size, - "batch_interval" => supernode_data_column_publication_batch_interval.as_millis(), + "batch_interval" => blob_publication_batch_interval.as_millis(), "data_columns_to_publish_count" => data_columns_to_publish.len(), "published_count" => publish_count, "block_root" => ?block_root, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 4800d3e1b9..c5427e0ce0 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -87,20 +87,20 @@ pub fn cli_app() -> Command { .display_order(0) ) .arg( - Arg::new("supernode-data-column-publication-batches") - .long("supernode-data-column-publication-batches") + Arg::new("blob-publication-batches") + .long("blob-publication-batches") .action(ArgAction::Set) .help_heading(FLAG_HEADER) - .help("Number of batches that supernodes split data columns into during publishing by a non-proposer. For PeerDAS only.") + .help("Number of batches that the node splits blobs or data columns into during publication. This doesn't apply if the node is the block proposer. Used in PeerDAS only.") .display_order(0) .hide(true) ) .arg( - Arg::new("supernode-data-column-publication-batch-interval") - .long("supernode-data-column-publication-batch-interval") + Arg::new("blob-publication-batch-interval") + .long("blob-publication-batch-interval") .action(ArgAction::Set) .help_heading(FLAG_HEADER) - .help("The delay in milliseconds applied by supernodes between the sending of each data column batch. For PeerDAS only.") + .help("The delay in milliseconds applied by the node between sending each blob or data column batch. This doesn't apply if the node is the block proposer.") .display_order(0) .hide(true) ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 625e3a8ab2..0f17c36800 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -192,20 +192,13 @@ pub fn get_config( client_config.chain.enable_sampling = true; } - if let Some(batches) = - clap_utils::parse_optional(cli_args, "supernode-data-column-publication-batches")? - { - client_config - .chain - .supernode_data_column_publication_batches = batches; + if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? { + client_config.chain.blob_publication_batches = batches; } - if let Some(interval) = - clap_utils::parse_optional(cli_args, "supernode-data-column-publication-batch-interval")? + if let Some(interval) = clap_utils::parse_optional(cli_args, "blob-publication-batch-interval")? { - client_config - .chain - .supernode_data_column_publication_batch_interval = Duration::from_millis(interval); + client_config.chain.blob_publication_batch_interval = Duration::from_millis(interval); } /* diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 1dc760857b..83a4367252 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -801,29 +801,22 @@ fn network_enable_sampling_flag() { .with_config(|config| assert!(config.chain.enable_sampling)); } #[test] -fn supernode_data_column_publication_batches() { +fn blob_publication_batches() { CommandLineTest::new() - .flag("supernode-data-column-publication-batches", Some("3")) + .flag("blob-publication-batches", Some("3")) .run_with_zero_port() - .with_config(|config| { - assert_eq!(config.chain.supernode_data_column_publication_batches, 3) - }); + .with_config(|config| assert_eq!(config.chain.blob_publication_batches, 3)); } #[test] -fn supernode_data_column_publication_batch_interval() { +fn blob_publication_batch_interval() { CommandLineTest::new() - .flag( - "supernode-data-column-publication-batch-interval", - Some("300"), - ) + .flag("blob-publication-batch-interval", Some("400")) .run_with_zero_port() .with_config(|config| { assert_eq!( - config - .chain - .supernode_data_column_publication_batch_interval, - Duration::from_millis(300) + config.chain.blob_publication_batch_interval, + Duration::from_millis(400) ) }); }