Skip to content

Commit

Permalink
[Phase0] Attestation processing optimizations for calculating rewards
Browse files Browse the repository at this point in the history
  • Loading branch information
EchoAlice committed Nov 11, 2024
1 parent de0648b commit 9f95fa3
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 39 deletions.
3 changes: 2 additions & 1 deletion ethereum-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ license = "MIT OR Apache-2.0"
default = ["serde", "async"]
serde = ["hex", "serde_json", "serde_yaml"]
async = ["tokio", "tokio-stream"]
optimized = ["shuffling"]
optimized = ["shuffling", "attestation-processing"]
shuffling = [] # supports optimized shuffling routines
attestation-processing = [] # supports optimized attestation processing
secret-key-debug = [
] # enable if you want to be able to print `crypto::SecretKey`
spec-tests = [] # enable extra features for testing
Expand Down
46 changes: 26 additions & 20 deletions ethereum-consensus/examples/state_transition_mainnet_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,52 @@ use ethereum_consensus::{
types::{mainnet::SignedBeaconBlock, BeaconState},
};
use ssz_rs::prelude::*;
use std::{error::Error, fs};
use std::{error::Error, fs, time::Instant};

fn main() -> std::result::Result<(), Box<dyn Error>> {
println!("this example illustrates how the spec applies state transitions to mainnet data.");

// Read and deserialize prestate
let state_path = "./ethereum-consensus/examples/data/beacon_states/state_1999.ssz";
let f = fs::read(state_path).unwrap();
let prestate = spec::BeaconState::deserialize(&f)?;
let prestate = BeaconState::Phase0(prestate);

// Create executor
let context = Context::for_mainnet();
let mut executor = Executor::new(prestate, context);

// Read and process blocks 2000-2034
// Read and process blocks at slots 2000-2034
for slot in 2000..=2034 {
let block_path =
format!("./ethereum-consensus/examples/data/beacon_blocks/block_{}.ssz", slot);
let block_bytes = fs::read(&block_path)?;

// Error handling: skip missed slots
if block_bytes.len() < 100 {
match std::str::from_utf8(&block_bytes) {
Ok(text) if text.contains("NOT_FOUND") => {
println!("Slot {} was skipped (no block produced)", slot);
continue;
}
_ => {
println!("Unexpected small file for slot {}", slot);
continue;
}
}
if is_skipped_slot(&block_bytes, slot) {
continue;
}

// Process block
let signed_block = spec::SignedBeaconBlock::deserialize(&block_bytes)?;
let block = SignedBeaconBlock::Phase0(signed_block);

let start = Instant::now();
executor.apply_block(&block)?;
println!("Block at slot {} was processed.", slot)
println!("Block at slot {slot} took {:?} to process", start.elapsed());
}

Ok(())
}

fn is_skipped_slot(block_bytes: &[u8], slot: u64) -> bool {
if block_bytes.len() < 100 {
match std::str::from_utf8(block_bytes) {
Ok(text) if text.contains("NOT_FOUND") => {
println!("Slot {} was skipped (no block produced)", slot);
println!("\n");
true
}
_ => {
println!("Unexpected small file for slot {}", slot);
true
}
}
} else {
false
}
}
97 changes: 79 additions & 18 deletions ethereum-consensus/src/phase0/epoch_processing.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Instant;

use crate::{
phase0::{
beacon_state::{BeaconState, HistoricalSummary},
Expand Down Expand Up @@ -919,27 +921,82 @@ pub fn get_inclusion_delay_deltas<
>,
context: &Context,
) -> Result<(Vec<Gwei>, Vec<Gwei>)> {
// Return proposer and inclusion delay micro-rewards/penalties for each validator.
let previous_epoch = get_previous_epoch(state, context);
let validator_count = state.validators.len();
let mut rewards = vec![0; validator_count];
let matching_source_attestations =
get_matching_source_attestations(state, previous_epoch, context)?;
for i in get_unslashed_attesting_indices(state, matching_source_attestations.iter(), context)? {
let mut attestations = Vec::new();
for a in matching_source_attestations.iter() {
if get_attesting_indices(state, &a.data, &a.aggregation_bits, context)?.contains(&i) {
attestations.push(a)

#[cfg(feature = "attestation-processing")]
{
#[derive(Default, Clone)]
struct AttesterStatus {
min_inclusion_delay: u64,
proposer_index: usize,
}

let eligible_validators: Vec<bool> = state.validators.iter().map(|v| !v.slashed).collect();
let matching_source_attestations =
get_matching_source_attestations(state, previous_epoch, context)?;
let mut attester_statuses: Vec<AttesterStatus> =
vec![AttesterStatus::default(); validator_count];

// Process all attestations once and store minimum inclusion delays
for attestation in matching_source_attestations.iter() {
let attesting_indices = get_attesting_indices(
state,
&attestation.data,
&attestation.aggregation_bits,
context,
)?;
for &validator_index in &attesting_indices {
if !eligible_validators[validator_index] {
continue;
}
let current_status = &mut attester_statuses[validator_index];
if current_status.min_inclusion_delay == 0 ||
attestation.inclusion_delay < current_status.min_inclusion_delay
{
current_status.min_inclusion_delay = attestation.inclusion_delay;
current_status.proposer_index = attestation.proposer_index;
}
}
}
let attestation = attestations
.iter()
.min_by(|&a, &b| a.inclusion_delay.cmp(&b.inclusion_delay))
.expect("at least one attestation in collection");
rewards[attestation.proposer_index] += get_proposer_reward(state, i, context)?;
let max_attester_reward =
get_base_reward(state, i, context)? - get_proposer_reward(state, i, context)?;
rewards[i] += max_attester_reward / attestation.inclusion_delay;

// Calculate rewards based on pre-computed data
for (validator_index, status) in attester_statuses.iter().enumerate() {
if status.min_inclusion_delay == 0 || !eligible_validators[validator_index] {
continue;
}
let proposer_reward = get_proposer_reward(state, validator_index, context)?;
rewards[status.proposer_index] += proposer_reward;
let max_attester_reward =
get_base_reward(state, validator_index, context)? - proposer_reward;
rewards[validator_index] += max_attester_reward / status.min_inclusion_delay;
}
}

#[cfg(not(feature = "attestation-processing"))]
{
let matching_source_attestations =
get_matching_source_attestations(state, previous_epoch, context)?;
for i in
get_unslashed_attesting_indices(state, matching_source_attestations.iter(), context)?
{
let mut attestations = Vec::new();
for a in matching_source_attestations.iter() {
if get_attesting_indices(state, &a.data, &a.aggregation_bits, context)?.contains(&i)
{
attestations.push(a)
}
}
let attestation = attestations
.iter()
.min_by(|&a, &b| a.inclusion_delay.cmp(&b.inclusion_delay))
.expect("at least one attestation in collection");
rewards[attestation.proposer_index] += get_proposer_reward(state, i, context)?;
let max_attester_reward =
get_base_reward(state, i, context)? - get_proposer_reward(state, i, context)?;
rewards[i] += max_attester_reward / attestation.inclusion_delay;
}
}
Ok((rewards, vec![0; validator_count]))
}
Expand Down Expand Up @@ -1014,13 +1071,11 @@ pub fn get_attestation_deltas<
>,
context: &Context,
) -> Result<(Vec<Gwei>, Vec<Gwei>)> {
// Return attestation reward/penalty deltas for each validator.
let (source_rewards, source_penalties) = get_source_deltas(state, context)?;
let (target_rewards, target_penalties) = get_target_deltas(state, context)?;
let (head_rewards, head_penalties) = get_head_deltas(state, context)?;
let (inclusion_delay_rewards, _) = get_inclusion_delay_deltas(state, context)?;
let (_, inactivity_penalties) = get_inactivity_penalty_deltas(state, context)?;

let validator_count = state.validators.len();
let mut rewards = vec![0; validator_count];
for i in 0..validator_count {
Expand Down Expand Up @@ -1058,8 +1113,12 @@ pub fn process_epoch<
>,
context: &Context,
) -> Result<()> {
let epoch_processing_time = Instant::now();

process_justification_and_finalization(state, context)?;
let start = Instant::now();
process_rewards_and_penalties(state, context)?;
println!("Rewards and penalties processed in {:?}", start.elapsed());
process_registry_updates(state, context)?;
process_slashings(state, context)?;
process_eth1_data_reset(state, context);
Expand All @@ -1068,5 +1127,7 @@ pub fn process_epoch<
process_randao_mixes_reset(state, context);
process_historical_roots_update(state, context)?;
process_participation_record_updates(state);

println!("\nEpoch processing complete! Total time: {:?}", epoch_processing_time.elapsed());
Ok(())
}

0 comments on commit 9f95fa3

Please sign in to comment.