diff --git a/rpc-client-api/src/custom_error.rs b/rpc-client-api/src/custom_error.rs index b6175a9230bdcc..62857b1ee55c16 100644 --- a/rpc-client-api/src/custom_error.rs +++ b/rpc-client-api/src/custom_error.rs @@ -24,6 +24,7 @@ pub const JSON_RPC_SERVER_ERROR_TRANSACTION_SIGNATURE_LEN_MISMATCH: i64 = -32013 pub const JSON_RPC_SERVER_ERROR_BLOCK_STATUS_NOT_AVAILABLE_YET: i64 = -32014; pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015; pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016; +pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017; #[derive(Error, Debug)] pub enum RpcCustomError { @@ -65,6 +66,12 @@ pub enum RpcCustomError { UnsupportedTransactionVersion(u8), #[error("MinContextSlotNotReached")] MinContextSlotNotReached { context_slot: Slot }, + #[error("EpochRewardsPeriodActive")] + EpochRewardsPeriodActive { + slot: Slot, + current_block_height: u64, + rewards_complete_block_height: u64, + }, } #[derive(Debug, Serialize, Deserialize)] @@ -79,6 +86,13 @@ pub struct MinContextSlotNotReachedErrorData { pub context_slot: Slot, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EpochRewardsPeriodActiveErrorData { + pub current_block_height: u64, + pub rewards_complete_block_height: u64, +} + impl From for RpcCustomError { fn from(err: EncodeError) -> Self { match err { @@ -206,6 +220,14 @@ impl From for Error { context_slot, })), }, + RpcCustomError::EpochRewardsPeriodActive { slot, current_block_height, rewards_complete_block_height } => Self { + code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE), + message: format!("Epoch rewards period still active at slot {slot}"), + data: Some(serde_json::json!(EpochRewardsPeriodActiveErrorData { + current_block_height, + rewards_complete_block_height, + })), + }, } } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index b81bb21d0eaa58..cb7d1c961aa2f7 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -66,6 +66,7 @@ use { clock::{Slot, UnixTimestamp, MAX_PROCESSING_AGE}, commitment_config::{CommitmentConfig, CommitmentLevel}, epoch_info::EpochInfo, + epoch_rewards_hasher::EpochRewardsHasher, epoch_schedule::EpochSchedule, exit::Exit, feature_set, @@ -93,8 +94,9 @@ use { solana_transaction_status::{ map_inner_instructions, BlockEncodingOptions, ConfirmedBlock, ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta, - EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, TransactionBinaryEncoding, - TransactionConfirmationStatus, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding, + EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, Rewards, + TransactionBinaryEncoding, TransactionConfirmationStatus, TransactionStatus, + UiConfirmedBlock, UiTransactionEncoding, }, solana_vote_program::vote_state::{VoteState, MAX_LOCKOUT_HISTORY}, spl_token_2022::{ @@ -548,6 +550,34 @@ impl JsonRpcRequestProcessor { }) } + fn filter_map_rewards<'a, F>( + rewards: &'a Option, + slot: Slot, + addresses: &'a [String], + reward_type_filter: &'a F, + ) -> HashMap + where + F: Fn(RewardType) -> bool, + { + Self::filter_rewards(rewards, reward_type_filter) + .filter(|reward| addresses.contains(&reward.pubkey)) + .map(|reward| (reward.pubkey.clone(), (reward.clone(), slot))) + .collect() + } + + fn filter_rewards<'a, F>( + rewards: &'a Option, + reward_type_filter: &'a F, + ) -> impl Iterator + where + F: Fn(RewardType) -> bool, + { + rewards + .iter() + .flatten() + .filter(move |reward| reward.reward_type.is_some_and(reward_type_filter)) + } + pub async fn get_inflation_reward( &self, addresses: Vec, @@ -592,7 +622,22 @@ impl JsonRpcRequestProcessor { slot: first_slot_in_epoch, })?; - let Ok(Some(first_confirmed_block)) = self + // Determine if partitioned epoch rewards were enabled for the desired + // epoch + let bank = self.get_bank_with_config(context_config)?; + + // DO NOT CLEAN UP with feature_set::enable_partitioned_epoch_reward + // This logic needs to be retained indefinitely to support historical + // rewards before and after feature activation. + let partitioned_epoch_reward_enabled_slot = bank + .feature_set + .activated_slot(&feature_set::enable_partitioned_epoch_reward::id()); + let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot + .map(|slot| slot <= first_confirmed_block_in_epoch) + .unwrap_or(false); + + // Get first block in the epoch + let Ok(Some(epoch_boundary_block)) = self .get_block( first_confirmed_block_in_epoch, Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), @@ -605,30 +650,109 @@ impl JsonRpcRequestProcessor { .into()); }; - let addresses: Vec = addresses - .into_iter() - .map(|pubkey| pubkey.to_string()) - .collect(); + // Collect rewards from first block in the epoch if partitioned epoch + // rewards not enabled, or address is a vote account + let mut reward_map: HashMap = { + let addresses: Vec = + addresses.iter().map(|pubkey| pubkey.to_string()).collect(); + Self::filter_map_rewards( + &epoch_boundary_block.rewards, + first_confirmed_block_in_epoch, + &addresses, + &|reward_type| -> bool { + reward_type == RewardType::Voting + || (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking) + }, + ) + }; - let reward_hash: HashMap = first_confirmed_block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, reward)), - _ => None, - }) - .collect(); + // Append stake account rewards from partitions if partitions epoch + // rewards is enabled + if partitioned_epoch_reward_enabled { + let num_partitions = epoch_boundary_block.num_reward_partitions.expect( + "epoch-boundary block should have num_reward_partitions after partitioned epoch \ + rewards enabled", + ); + + let num_partitions = usize::try_from(num_partitions) + .expect("num_partitions should never exceed usize::MAX"); + let hasher = EpochRewardsHasher::new( + num_partitions, + &Hash::from_str(&epoch_boundary_block.previous_blockhash) + .expect("UiConfirmedBlock::previous_blockhash should be properly formed"), + ); + let mut partition_index_addresses: HashMap> = HashMap::new(); + for address in addresses.iter() { + let address_string = address.to_string(); + // Skip this address if (Voting) rewards were already found in + // the first block of the epoch + if !reward_map.contains_key(&address_string) { + let partition_index = hasher.clone().hash_address_to_partition(address); + partition_index_addresses + .entry(partition_index) + .and_modify(|list| list.push(address_string.clone())) + .or_insert(vec![address_string]); + } + } + + let block_list = self + .get_blocks_with_limit( + first_confirmed_block_in_epoch + 1, + num_partitions, + Some(context_config), + ) + .await?; + + for (partition_index, addresses) in partition_index_addresses.iter() { + let slot = *block_list.get(*partition_index).ok_or_else(|| { + // If block_list.len() too short to contain + // partition_index, the epoch rewards period must be + // currently active. + let rewards_complete_block_height = epoch_boundary_block + .block_height + .map(|block_height| { + block_height + .saturating_add(num_partitions as u64) + .saturating_add(1) + }) + .expect( + "every block after partitioned_epoch_reward_enabled should have a \ + populated block_height", + ); + RpcCustomError::EpochRewardsPeriodActive { + slot: bank.slot(), + current_block_height: bank.block_height(), + rewards_complete_block_height, + } + })?; + + let Ok(Some(block)) = self + .get_block( + slot, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { slot }.into()); + }; + + let index_reward_map = Self::filter_map_rewards( + &block.rewards, + slot, + addresses, + &|reward_type| -> bool { reward_type == RewardType::Staking }, + ); + reward_map.extend(index_reward_map); + } + } let rewards = addresses .iter() .map(|address| { - if let Some(reward) = reward_hash.get(address) { + if let Some((reward, slot)) = reward_map.get(&address.to_string()) { return Some(RpcInflationReward { epoch, - effective_slot: first_confirmed_block_in_epoch, + effective_slot: *slot, amount: reward.lamports.unsigned_abs(), post_balance: reward.post_balance, commission: reward.commission, diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 580a6a14027518..7f7865d8ac18e8 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -23,7 +23,6 @@ use { account::AccountSharedData, clock::Slot, epoch_schedule::EpochSchedule, - feature_set, native_token::sol_to_lamports, pubkey::Pubkey, rent::Rent, @@ -352,9 +351,7 @@ fn main() { exit(1); }); - let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); - // Remove this when client support is ready for the enable_partitioned_epoch_reward feature - features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id()); + let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); if TestValidatorGenesis::ledger_exists(&ledger_path) { for (name, long) in &[