Skip to content

Commit

Permalink
Implement gradual blob publication.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Oct 29, 2024
1 parent 756230e commit 0f32b73
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 59 deletions.
14 changes: 8 additions & 6 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ pub struct ChainConfig {
pub malicious_withhold_count: usize,
/// Enable peer sampling on blocks.
pub enable_sampling: bool,
/// Number of batches that supernodes split data columns into during publishing by a non-proposer.
pub supernode_data_column_publication_batches: usize,
/// The delay applied by supernodes between the sending of each data column batch.
pub supernode_data_column_publication_batch_interval: Duration,
/// Number of batches that the node splits blobs or data columns into during publication.
/// This doesn't apply if the node is the block proposer. For PeerDAS only.
pub blob_publication_batches: usize,
/// The delay in milliseconds applied by the node between sending each blob or data column batch.
/// This doesn't apply if the node is the block proposer.
pub blob_publication_batch_interval: Duration,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -125,8 +127,8 @@ impl Default for ChainConfig {
enable_light_client_server: false,
malicious_withhold_count: 0,
enable_sampling: false,
supernode_data_column_publication_batches: 4,
supernode_data_column_publication_batch_interval: Duration::from_millis(200),
blob_publication_batches: 4,
blob_publication_batch_interval: Duration::from_millis(300),
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes
use execution_layer::json_structures::BlobAndProofV1;
use execution_layer::Error as ExecutionLayerError;
use itertools::Either;
use lighthouse_metrics::{inc_counter, inc_counter_by, TryExt};
use metrics::{inc_counter, inc_counter_by, TryExt};
use slog::{debug, error, o, Logger};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
Expand Down Expand Up @@ -108,7 +108,10 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
&kzg_commitments_proof,
)?;

let num_fetched_blobs = fixed_blob_sidecar_list.filter(|b| b.is_some()).count();
let num_fetched_blobs = fixed_blob_sidecar_list
.iter()
.filter(|b| b.is_some())
.count();

inc_counter_by(
&metrics::BLOBS_FROM_EL_EXPECTED_TOTAL,
Expand Down
104 changes: 84 additions & 20 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub struct NetworkBeaconProcessor<T: BeaconChainTypes> {
pub log: Logger,
}

// Publish blobs in batches of exponentially increasing size.
const BLOB_PUBLICATION_EXP_FACTOR: usize = 2;

impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
fn try_send(&self, event: BeaconWorkEvent<T::EthSpec>) -> Result<(), Error<T::EthSpec>> {
self.beacon_processor_send
Expand Down Expand Up @@ -887,17 +890,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) {
match blobs_or_data_column {
BlobsOrDataColumns::Blobs(blobs) => {
debug!(
self.log,
"Publishing blobs from EL";
"count" => blobs.len(),
"block_root" => ?block_root,
);
let messages = blobs
.into_iter()
.map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob))))
.collect();
self.send_network_message(NetworkMessage::Publish { messages });
self.publish_blobs_gradually(blobs, block_root);
}
BlobsOrDataColumns::DataColumns(columns) => {
self.publish_data_columns_gradually(columns, block_root);
Expand Down Expand Up @@ -1016,6 +1009,81 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

/// This function gradually publishes blobs to the network in randomised batches.
///
/// This is an optimisation to reduce outbound bandwidth and ensures each blob is published
/// by some nodes on the network as soon as possible. Our hope is that some blobs arrive from
/// other nodes in the meantime, obviating the need for us to publish them. If no other
/// publisher exists for a blob, it will eventually get published here.
fn publish_blobs_gradually(
self: &Arc<Self>,
mut blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
block_root: Hash256,
) {
let self_clone = self.clone();

self.executor.spawn(
async move {
let chain = self_clone.chain.clone();
let log = self_clone.chain.logger();
let publish_fn = |blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>| {
self_clone.send_network_message(NetworkMessage::Publish {
messages: blobs
.into_iter()
.map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob))))
.collect(),
});
};

// Permute the blobs and split them into batches.
// The hope is that we won't need to publish some blobs because we will receive them
// on gossip from other nodes.
blobs.shuffle(&mut rand::thread_rng());

let blob_publication_batch_interval = chain.config.blob_publication_batch_interval;
let mut publish_count = 0usize;
let mut blobs_iter = blobs.iter().peekable();
let mut batch_size = 1usize;

while blobs_iter.peek().is_some() {
let batch = blobs_iter.by_ref().take(batch_size);
let already_seen = chain
.data_availability_checker
.cached_blob_indexes(&block_root)
.unwrap_or_default();
let publishable = batch
.filter(|col| !already_seen.contains(&col.index))
.cloned()
.collect::<Vec<_>>();

if !publishable.is_empty() {
debug!(
log,
"Publishing blob batch";
"publish_count" => publishable.len(),
"block_root" => ?block_root,
);
publish_count += publishable.len();
publish_fn(publishable);
}

tokio::time::sleep(blob_publication_batch_interval).await;
batch_size *= BLOB_PUBLICATION_EXP_FACTOR;
}

debug!(
log,
"Batch blob publication complete";
"batch_interval" => blob_publication_batch_interval.as_millis(),
"blob_count" => blobs.len(),
"published_count" => publish_count,
"block_root" => ?block_root,
)
},
"gradual_blob_publication",
);
}

/// This function gradually publishes data columns to the network in randomised batches.
///
/// This is an optimisation to reduce outbound bandwidth and ensures each column is published
Expand Down Expand Up @@ -1053,13 +1121,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// on gossip from other supernodes.
data_columns_to_publish.shuffle(&mut rand::thread_rng());

let supernode_data_column_publication_batch_interval = chain
.config
.supernode_data_column_publication_batch_interval;
let supernode_data_column_publication_batches =
chain.config.supernode_data_column_publication_batches;
let batch_size =
data_columns_to_publish.len() / supernode_data_column_publication_batches;
let blob_publication_batch_interval = chain.config.blob_publication_batch_interval;
let blob_publication_batches = chain.config.blob_publication_batches;
let batch_size = chain.spec.number_of_columns / blob_publication_batches;
let mut publish_count = 0usize;

for batch in data_columns_to_publish.chunks(batch_size) {
Expand All @@ -1084,14 +1148,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
publish_fn(publishable);
}

tokio::time::sleep(supernode_data_column_publication_batch_interval).await;
tokio::time::sleep(blob_publication_batch_interval).await;
}

debug!(
log,
"Batch data column publishing complete";
"batch_size" => batch_size,
"batch_interval" => supernode_data_column_publication_batch_interval.as_millis(),
"batch_interval" => blob_publication_batch_interval.as_millis(),
"data_columns_to_publish_count" => data_columns_to_publish.len(),
"published_count" => publish_count,
"block_root" => ?block_root,
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ pub fn cli_app() -> Command {
.display_order(0)
)
.arg(
Arg::new("supernode-data-column-publication-batches")
.long("supernode-data-column-publication-batches")
Arg::new("blob-publication-batches")
.long("blob-publication-batches")
.action(ArgAction::Set)
.help_heading(FLAG_HEADER)
.help("Number of batches that supernodes split data columns into during publishing by a non-proposer. For PeerDAS only.")
.help("Number of batches that the node splits blobs or data columns into during publication. This doesn't apply if the node is the block proposer. Used in PeerDAS only.")
.display_order(0)
.hide(true)
)
.arg(
Arg::new("supernode-data-column-publication-batch-interval")
.long("supernode-data-column-publication-batch-interval")
Arg::new("blob-publication-batch-interval")
.long("blob-publication-batch-interval")
.action(ArgAction::Set)
.help_heading(FLAG_HEADER)
.help("The delay in milliseconds applied by supernodes between the sending of each data column batch. For PeerDAS only.")
.help("The delay in milliseconds applied by the node between sending each blob or data column batch. This doesn't apply if the node is the block proposer.")
.display_order(0)
.hide(true)
)
Expand Down
15 changes: 4 additions & 11 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,13 @@ pub fn get_config<E: EthSpec>(
client_config.chain.enable_sampling = true;
}

if let Some(batches) =
clap_utils::parse_optional(cli_args, "supernode-data-column-publication-batches")?
{
client_config
.chain
.supernode_data_column_publication_batches = batches;
if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {
client_config.chain.blob_publication_batches = batches;
}

if let Some(interval) =
clap_utils::parse_optional(cli_args, "supernode-data-column-publication-batch-interval")?
if let Some(interval) = clap_utils::parse_optional(cli_args, "blob-publication-batch-interval")?
{
client_config
.chain
.supernode_data_column_publication_batch_interval = Duration::from_millis(interval);
client_config.chain.blob_publication_batch_interval = Duration::from_millis(interval);
}

/*
Expand Down
21 changes: 7 additions & 14 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,29 +801,22 @@ fn network_enable_sampling_flag() {
.with_config(|config| assert!(config.chain.enable_sampling));
}
#[test]
fn supernode_data_column_publication_batches() {
fn blob_publication_batches() {
CommandLineTest::new()
.flag("supernode-data-column-publication-batches", Some("3"))
.flag("blob-publication-batches", Some("3"))
.run_with_zero_port()
.with_config(|config| {
assert_eq!(config.chain.supernode_data_column_publication_batches, 3)
});
.with_config(|config| assert_eq!(config.chain.blob_publication_batches, 3));
}

#[test]
fn supernode_data_column_publication_batch_interval() {
fn blob_publication_batch_interval() {
CommandLineTest::new()
.flag(
"supernode-data-column-publication-batch-interval",
Some("300"),
)
.flag("blob-publication-batch-interval", Some("400"))
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config
.chain
.supernode_data_column_publication_batch_interval,
Duration::from_millis(300)
config.chain.blob_publication_batch_interval,
Duration::from_millis(400)
)
});
}
Expand Down

0 comments on commit 0f32b73

Please sign in to comment.