Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger/blockstore: PerfSampleV2: num_non_vote_transactions #29404

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions core/src/sample_performance_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample},
solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSampleV2},
solana_runtime::bank_forks::BankForks,
std::{
sync::{
Expand All @@ -16,7 +16,8 @@ const SLEEP_INTERVAL: u64 = 500;

pub struct SamplePerformanceSnapshot {
pub num_transactions: u64,
pub num_slots: u64,
pub num_non_vote_transactions: u64,
pub highest_slot: u64,
}

pub struct SamplePerformanceService {
Expand Down Expand Up @@ -50,14 +51,15 @@ impl SamplePerformanceService {
blockstore: &Arc<Blockstore>,
exit: Arc<AtomicBool>,
) {
let forks = bank_forks.read().unwrap();
let bank = forks.root_bank();
let highest_slot = forks.highest_slot();
drop(forks);
let (bank, highest_slot) = {
let forks = bank_forks.read().unwrap();
(forks.root_bank(), forks.highest_slot())
};

let mut sample_snapshot = SamplePerformanceSnapshot {
let mut snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
num_non_vote_transactions: bank.non_vote_transaction_count_since_restart(),
highest_slot,
};

let mut now = Instant::now();
Expand All @@ -70,29 +72,34 @@ impl SamplePerformanceService {

if elapsed.as_secs() >= SAMPLE_INTERVAL {
now = Instant::now();
let bank_forks = bank_forks.read().unwrap();
let bank = bank_forks.root_bank().clone();
let highest_slot = bank_forks.highest_slot();
drop(bank_forks);

let perf_sample = PerfSample {
num_slots: highest_slot
.checked_sub(sample_snapshot.num_slots)
.unwrap_or_default(),
num_transactions: bank
.transaction_count()
.checked_sub(sample_snapshot.num_transactions)
.unwrap_or_default(),
let (bank, highest_slot) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.highest_slot())
};

let num_slots = highest_slot.saturating_sub(snapshot.highest_slot);
let num_transactions = bank
.transaction_count()
.saturating_sub(snapshot.num_transactions);
let num_non_vote_transactions = bank
.non_vote_transaction_count_since_restart()
.saturating_sub(snapshot.num_non_vote_transactions);

let perf_sample = PerfSampleV2 {
num_slots,
num_transactions,
num_non_vote_transactions,
sample_period_secs: elapsed.as_secs() as u16,
};

if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) {
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
}

sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
snapshot = SamplePerformanceSnapshot {
num_transactions,
num_non_vote_transactions,
highest_slot,
};
}

Expand Down
175 changes: 155 additions & 20 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
slot_stats::{ShredSource, SlotsStats},
},
assert_matches::debug_assert_matches,
bincode::deserialize,
bincode::{deserialize, serialize},
crossbeam_channel::{bounded, Receiver, Sender, TrySendError},
dashmap::DashSet,
log::*,
Expand Down Expand Up @@ -2712,19 +2712,40 @@ impl Blockstore {
}

pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self
// When reading `PerfSamples`, the database may contain samples with either `PerfSampleV1`
// or `PerfSampleV2` encoding. We expect `PerfSampleV1` to be a prefix of the
// `PerfSampleV2` encoding (see [`perf_sample_v1_is_prefix_of_perf_sample_v2`]), so we try
// them in order.
let samples = self
.db
.iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num)
.map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap();
(slot, perf_sample)
})
.collect())
deserialize::<PerfSampleV2>(&data)
.map(|sample| (slot, sample.into()))
.or_else(|err| {
match &*err {
bincode::ErrorKind::Io(io_err)
if matches!(io_err.kind(), ErrorKind::UnexpectedEof) =>
{
// Not enough bytes to deserialize as `PerfSampleV2`.
}
_ => return Err(err),
}

deserialize::<PerfSampleV1>(&data).map(|sample| (slot, sample.into()))
})
.map_err(Into::into)
});

samples.collect()
}

pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample)
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSampleV2) -> Result<()> {
// Always write as the current version.
let bytes =
serialize(&perf_sample).expect("`PerfSampleV2` can be serialized with `bincode`");
self.perf_samples_cf.put_bytes(index, &bytes)
}

pub fn read_program_costs(&self) -> Result<Vec<(Pubkey, u64)>> {
Expand Down Expand Up @@ -8503,25 +8524,139 @@ pub mod tests {
}

#[test]
fn test_write_get_perf_samples() {
fn test_get_recent_perf_samples_v1_only() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let num_entries: usize = 10;

let slot_sample = |i: u64| PerfSampleV1 {
num_transactions: 1406 + i,
num_slots: 34 + i / 2,
sample_period_secs: (40 + i / 5) as u16,
};

let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
perf_samples.push((
x as u64 * 50,
PerfSample {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
},
));
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;
let sample = slot_sample(i as u64);

let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}

for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
for (slot, sample) in perf_samples.iter() {
blockstore.write_perf_sample(*slot, sample).unwrap();
}

#[test]
fn test_get_recent_perf_samples_v2_only() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let num_entries: usize = 10;

let slot_sample = |i: u64| PerfSampleV2 {
num_transactions: 2495 + i,
num_slots: 167 + i / 2,
sample_period_secs: (37 + i / 5) as u16,
num_non_vote_transactions: 1672 + i,
};

let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;
let sample = slot_sample(i as u64);

let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}

for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
}

#[test]
fn test_get_recent_perf_samples_v1_and_v2() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let num_entries: usize = 10;

let slot_sample_v1 = |i: u64| PerfSampleV1 {
num_transactions: 1599 + i,
num_slots: 123 + i / 2,
sample_period_secs: (42 + i / 5) as u16,
};

let slot_sample_v2 = |i: u64| PerfSampleV2 {
num_transactions: 5809 + i,
num_slots: 81 + i / 2,
sample_period_secs: (35 + i / 5) as u16,
num_non_vote_transactions: 2209 + i,
};

let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;

if i % 3 == 0 {
let sample = slot_sample_v1(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
} else {
let sample = slot_sample_v2(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}
}

for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
}

#[test]
fn test_write_perf_samples() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
let slot = x as u64 * 50;
let sample = PerfSampleV2 {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
num_non_vote_transactions: 300 + x as u64,
};

blockstore.write_perf_sample(slot, &sample).unwrap();
perf_samples.push((slot, PerfSample::V2(sample)));
}

for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
Expand Down
3 changes: 0 additions & 3 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,6 @@ impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}

impl SlotColumn for columns::BlockHeight {}
impl ColumnName for columns::BlockHeight {
Expand Down
63 changes: 61 additions & 2 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,48 @@ pub struct AddressSignatureMeta {
pub writeable: bool,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct PerfSample {
/// Performance information about validator execution during a time slice.
///
/// Older versions should only arise as a result of deserialization of entries stored by a previous
/// version of the validator. Current version should only produce [`PerfSampleV2`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PerfSample {
V1(PerfSampleV1),
V2(PerfSampleV2),
}

impl From<PerfSampleV1> for PerfSample {
fn from(value: PerfSampleV1) -> PerfSample {
PerfSample::V1(value)
}
}

impl From<PerfSampleV2> for PerfSample {
fn from(value: PerfSampleV2) -> PerfSample {
PerfSample::V2(value)
}
}

/// Version of [`PerfSample`] used before 1.15.x.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct PerfSampleV1 {
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,
}

/// Version of the [`PerfSample`] introduced in 1.15.x.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct PerfSampleV2 {
// `PerfSampleV1` part
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,

// New fields.
pub num_non_vote_transactions: u64,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct ProgramCost {
pub cost: u64,
Expand Down Expand Up @@ -569,4 +604,28 @@ mod test {
expected.next_slots = vec![6, 7];
assert_eq!(slot_meta, expected);
}

// `PerfSampleV2` should contain `PerfSampleV1` as a prefix, in order for the column to be
// backward and forward compatible.
#[test]
fn perf_sample_v1_is_prefix_of_perf_sample_v2() {
let v2 = PerfSampleV2 {
num_transactions: 4190143848,
num_slots: 3607325588,
sample_period_secs: 31263,
num_non_vote_transactions: 4056116066,
};

let v2_bytes = bincode::serialize(&v2).expect("`PerfSampleV2` can be serialized");

let actual: PerfSampleV1 = bincode::deserialize(&v2_bytes)
.expect("Bytes encoded as `PerfSampleV2` can be decoded as `PerfSampleV1`");
let expected = PerfSampleV1 {
num_transactions: v2.num_transactions,
num_slots: v2.num_slots,
sample_period_secs: v2.sample_period_secs,
};

assert_eq!(actual, expected);
}
}
Loading