Skip to content

Commit

Permalink
Update rpc getBlockTime to use new method, and refactor blockstore sl…
Browse files Browse the repository at this point in the history
…ightly
  • Loading branch information
CriesofCarrots committed Aug 31, 2020
1 parent d177ccf commit a346ced
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 135 deletions.
2 changes: 1 addition & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1875,7 +1875,7 @@ impl ReplayStage {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time2(*slot)
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
Expand Down
22 changes: 8 additions & 14 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use solana_sdk::{
signature::Signature,
stake_history::StakeHistory,
sysvar::{stake_history, Sysvar},
timing::slot_duration_from_slots_per_year,
transaction::{self, Transaction},
};
use solana_stake_program::stake_state::StakeState;
Expand Down Expand Up @@ -684,18 +683,7 @@ impl JsonRpcRequestProcessor {
.unwrap()
.highest_confirmed_root()
{
// This calculation currently assumes that bank.slots_per_year will remain unchanged after
// genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being
// queried). If these values will be variable in the future, those timing parameters will
// need to be stored persistently, and the slot_duration calculation will likely need to be
// moved upstream into blockstore. Also, an explicit commitment level will need to be set.
let bank = self.bank(None);
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(slot);
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);

let result = self.blockstore.get_block_time(slot, slot_duration, stakes);
let result = self.blockstore.get_block_time(slot);
self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok().unwrap_or(None))
} else {
Expand Down Expand Up @@ -2480,6 +2468,7 @@ pub mod tests {
nonce, rpc_port,
signature::{Keypair, Signer},
system_instruction, system_program, system_transaction,
timing::slot_duration_from_slots_per_year,
transaction::{self, TransactionError},
};
use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage};
Expand All @@ -2491,7 +2480,7 @@ pub mod tests {
option::COption, solana_sdk::pubkey::Pubkey as SplTokenPubkey,
state::AccountState as TokenAccountState, state::Mint,
};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

const TEST_MINT_LAMPORTS: u64 = 1_000_000;
const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1;
Expand Down Expand Up @@ -2592,6 +2581,11 @@ pub mod tests {

for root in roots.iter() {
bank_forks.write().unwrap().set_root(*root, &None, Some(0));
let mut stakes = HashMap::new();
stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default()));
blockstore
.cache_block_time(*root, Duration::from_millis(400), &stakes)
.unwrap();
}
}

Expand Down
202 changes: 82 additions & 120 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell<ThreadPool> = RefCell::ne
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
const TIMESTAMP_SLOT_INTERVAL: u64 = 4500;
const TIMESTAMP_SLOT_RANGE: usize = 16;

// An upper bound on maximum number of data shreds we can handle in a slot
Expand Down Expand Up @@ -1525,12 +1524,7 @@ impl Blockstore {
}
}

pub fn get_block_time(
&self,
slot: Slot,
slot_duration: Duration,
stakes: &HashMap<Pubkey, (u64, Account)>,
) -> Result<Option<UnixTimestamp>> {
pub fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_block_time".to_string(), String)
Expand All @@ -1541,96 +1535,30 @@ impl Blockstore {
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}

let mut get_unique_timestamps = Measure::start("get_unique_timestamps");
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
.into_iter()
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
.collect();
get_unique_timestamps.stop();

let mut calculate_timestamp = Measure::start("calculate_timestamp");
let stake_weighted_timestamps =
calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration);
calculate_timestamp.stop();
datapoint_info!(
"blockstore-get-block-time",
("slot", slot as i64, i64),
(
"get_unique_timestamps_us",
get_unique_timestamps.as_us() as i64,
i64
),
(
"calculate_stake_weighted_timestamp_us",
calculate_timestamp.as_us() as i64,
i64
)
);

Ok(stake_weighted_timestamps)
self.blocktime_cf.get(slot)
}

fn get_timestamp_slots(
&self,
slot: Slot,
timestamp_interval: u64,
timestamp_sample_range: usize,
) -> Vec<Slot> {
let baseline_slot = slot - (slot % timestamp_interval);
let root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
baseline_slot,
IteratorDirection::Forward,
));
fn get_timestamp_slots(&self, slot: Slot, timestamp_sample_range: usize) -> Vec<Slot> {
let root_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Reverse));
if !self.is_root(slot) || root_iterator.is_err() {
return vec![];
}
let mut get_slots = Measure::start("get_slots");
let mut slots: Vec<Slot> = root_iterator
let mut timestamp_slots: Vec<Slot> = root_iterator
.unwrap()
.map(|(iter_slot, _)| iter_slot)
.take(timestamp_sample_range)
.filter(|&iter_slot| iter_slot <= slot)
.collect();

if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval {
let earlier_baseline = baseline_slot - timestamp_interval;
let earlier_root_iterator = self.db.iter::<cf::Root>(IteratorMode::From(
earlier_baseline,
IteratorDirection::Forward,
));
if let Ok(iterator) = earlier_root_iterator {
slots = iterator
.map(|(iter_slot, _)| iter_slot)
.take(timestamp_sample_range)
.collect();
}
}
timestamp_slots.sort();
get_slots.stop();
datapoint_info!(
"blockstore-get-timestamp-slots",
("slot", slot as i64, i64),
("get_slots_us", get_slots.as_us() as i64, i64)
);
slots
}

pub fn get_block_time2(
&self,
slot: Slot,
) -> Result<Option<UnixTimestamp>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_block_time".to_string(), String)
);
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
self.blocktime_cf.get(slot)
timestamp_slots
}

pub fn cache_block_time(
Expand All @@ -1643,20 +1571,15 @@ impl Blockstore {
return Err(BlockstoreError::SlotNotRooted);
}
let mut get_unique_timestamps = Measure::start("get_unique_timestamps");
let root_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Reverse));
let mut timestamp_slots: Vec<Slot> = root_iterator
.unwrap()
.map(|(iter_slot, _)| iter_slot)
.take(TIMESTAMP_SLOT_RANGE)
.collect();
timestamp_slots.sort();
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = timestamp_slots
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
.get_timestamp_slots(slot, TIMESTAMP_SLOT_RANGE)
.into_iter()
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
.collect();
get_unique_timestamps.stop();
if unique_timestamps.is_empty() {
return Err(BlockstoreError::NoVoteTimestampsInRange);
}

let mut calculate_timestamp = Measure::start("calculate_timestamp");
let stake_weighted_timestamp =
Expand Down Expand Up @@ -5442,8 +5365,6 @@ pub mod tests {
fn test_get_timestamp_slots() {
let timestamp_sample_range = 5;
let ticks_per_slot = 5;
// Smaller interval than TIMESTAMP_SLOT_INTERVAL for convenience of building blockstore
let timestamp_interval = 7;
/*
Build a blockstore with < TIMESTAMP_SLOT_RANGE roots
*/
Expand All @@ -5470,26 +5391,25 @@ pub mod tests {
blockstore.set_roots(&[1, 2, 3]).unwrap();

assert_eq!(
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
blockstore.get_timestamp_slots(2, timestamp_sample_range),
vec![0, 1, 2]
);
assert_eq!(
blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range),
blockstore.get_timestamp_slots(3, timestamp_sample_range),
vec![0, 1, 2, 3]
);

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");

/*
Build a blockstore in the ledger with the following rooted slots:
[0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 14, 15, 16, 17]
Build a blockstore in the ledger with gaps in rooted slot sequence
*/
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_roots(&[0]).unwrap();
let desired_roots = vec![1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19];
let desired_roots = vec![1, 2, 3, 5, 6, 8, 11];
let mut last_entry_hash = Hash::default();
for (i, slot) in desired_roots.iter().enumerate() {
let parent = {
Expand All @@ -5510,28 +5430,20 @@ pub mod tests {
blockstore.set_roots(&desired_roots).unwrap();

assert_eq!(
blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range),
blockstore.get_timestamp_slots(2, timestamp_sample_range),
vec![0, 1, 2]
);
assert_eq!(
blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range),
vec![0, 1, 2, 3, 4]
);
assert_eq!(
blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range),
vec![0, 1, 2, 3, 4]
blockstore.get_timestamp_slots(6, timestamp_sample_range),
vec![1, 2, 3, 5, 6]
);
assert_eq!(
blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range),
vec![8, 9, 10, 11, 12]
blockstore.get_timestamp_slots(8, timestamp_sample_range),
vec![2, 3, 5, 6, 8]
);
assert_eq!(
blockstore.get_timestamp_slots(18, timestamp_interval, timestamp_sample_range),
vec![8, 9, 10, 11, 12]
);
assert_eq!(
blockstore.get_timestamp_slots(19, timestamp_interval, timestamp_sample_range),
vec![14, 16, 17, 18, 19]
blockstore.get_timestamp_slots(11, timestamp_sample_range),
vec![3, 5, 6, 8, 11]
);
}

Expand Down Expand Up @@ -5702,14 +5614,25 @@ pub mod tests {
);
assert_eq!(blockstore.get_block_timestamps(2).unwrap(), vec![]);

// Build epoch vote_accounts HashMap to test stake-weighted block time
blockstore.set_roots(&[3, 8]).unwrap();
let mut stakes = HashMap::new();
let slot_duration = Duration::from_millis(400);
for slot in &[1, 2, 3, 8] {
assert!(blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.is_err());
}

// Build epoch vote_accounts HashMap to test stake-weighted block time
for (i, keypair) in vote_keypairs.iter().enumerate() {
stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default()));
}
let slot_duration = Duration::from_millis(400);
let block_time_slot_3 = blockstore.get_block_time(3, slot_duration, &stakes);
for slot in &[1, 2, 3, 8] {
blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.unwrap();
}
let block_time_slot_3 = blockstore.get_block_time(3);

let mut total_stake = 0;
let mut expected_time: u64 = (0..6)
Expand All @@ -5725,14 +5648,53 @@ pub mod tests {
expected_time /= total_stake;
assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time);
assert_eq!(
blockstore
.get_block_time(8, slot_duration, &stakes)
.unwrap()
.unwrap() as u64,
blockstore.get_block_time(8).unwrap().unwrap() as u64,
expected_time + 2 // At 400ms block duration, 5 slots == 2sec
);
}

#[test]
fn test_get_block_time_no_timestamps() {
let vote_keypairs: Vec<Keypair> = (0..6).map(|_| Keypair::new()).collect();

// Populate slot 1 with vote transactions, none of which have timestamps
let mut vote_entries: Vec<Entry> = Vec::new();
for (i, keypair) in vote_keypairs.iter().enumerate() {
let vote = Vote {
slots: vec![1],
hash: Hash::default(),
timestamp: None,
};
let vote_ix = vote_instruction::vote(&keypair.pubkey(), &keypair.pubkey(), vote);
let vote_msg = Message::new(&[vote_ix], Some(&keypair.pubkey()));
let vote_tx = Transaction::new(&[keypair], vote_msg, Hash::default());

vote_entries.push(next_entry_mut(&mut Hash::default(), 0, vec![vote_tx]));
let mut tick = create_ticks(1, 0, hash(&serialize(&i).unwrap()));
vote_entries.append(&mut tick);
}
let shreds = entries_to_test_shreds(vote_entries, 1, 0, true, 0);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
// Populate slot 2 with ticks only
fill_blockstore_slot_with_ticks(&blockstore, 6, 2, 1, Hash::default());
blockstore.set_roots(&[0, 1, 2]).unwrap();

// Build epoch vote_accounts HashMap to test stake-weighted block time
let mut stakes = HashMap::new();
for (i, keypair) in vote_keypairs.iter().enumerate() {
stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default()));
}
let slot_duration = Duration::from_millis(400);
for slot in &[1, 2, 3, 8] {
assert!(blockstore
.cache_block_time(*slot, slot_duration, &stakes)
.is_err());
assert_eq!(blockstore.get_block_time(*slot).unwrap(), None);
}
}

#[test]
fn test_calculate_stake_weighted_timestamp() {
let recent_timestamp: UnixTimestamp = 1_578_909_061;
Expand Down
1 change: 1 addition & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub enum BlockstoreError {
UnableToSetOpenFileDescriptorLimit,
TransactionStatusSlotMismatch,
EmptyEpochStakes,
NoVoteTimestampsInRange,
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;

Expand Down

0 comments on commit a346ced

Please sign in to comment.