Skip to content

Commit

Permalink
Add LedgerColumn::multi_get() (#26354)
Browse files Browse the repository at this point in the history
#### Problem
Blockstore operations such as get_slots_since() issues multiple rocksdb::get()
at once which is not optimal for performance.

#### Summary of Changes
This PR adds LedgerColumn::multi_get() based on rocksdb::batched_multi_get(),
the optimized version of multi_get() where get requests are processed in batch
to minimize read I/O.
  • Loading branch information
yhchiang-sol authored Sep 12, 2022
1 parent 0c185d1 commit ba3d9cd
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
32 changes: 32 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4628,6 +4628,38 @@ pub mod tests {
assert_eq!(result, data);
}

#[test]
fn test_multi_get() {
const TEST_PUT_ENTRY_COUNT: usize = 100;
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

// Test meta column family
for i in 0..TEST_PUT_ENTRY_COUNT {
let k = u64::try_from(i).unwrap();
let meta = SlotMeta::new(k, Some(k + 1));
blockstore.meta_cf.put(k, &meta).unwrap();
let result = blockstore
.meta_cf
.get(k)
.unwrap()
.expect("Expected meta object to exist");
assert_eq!(result, meta);
}
let mut keys: Vec<u64> = vec![0; TEST_PUT_ENTRY_COUNT];
for (i, key) in keys.iter_mut().enumerate().take(TEST_PUT_ENTRY_COUNT) {
*key = u64::try_from(i).unwrap();
}
let values = blockstore.meta_cf.multi_get(keys);
for (i, value) in values.iter().enumerate().take(TEST_PUT_ENTRY_COUNT) {
let k = u64::try_from(i).unwrap();
assert_eq!(
value.as_ref().unwrap().as_ref().unwrap(),
&SlotMeta::new(k, Some(k + 1))
);
}
}

#[test]
fn test_read_shred_bytes() {
let slot = 0;
Expand Down
56 changes: 54 additions & 2 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use {
blockstore_metrics::{
maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf,
BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET,
PERF_METRIC_OP_NAME_PUT, PERF_METRIC_OP_NAME_WRITE_BATCH,
PERF_METRIC_OP_NAME_MULTI_GET, PERF_METRIC_OP_NAME_PUT,
PERF_METRIC_OP_NAME_WRITE_BATCH,
},
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, ShredStorageType,
Expand All @@ -20,7 +21,7 @@ use {
compaction_filter::CompactionFilter,
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision,
DBCompactionStyle, DBIterator, DBRawIterator, FifoCompactOptions,
DBCompactionStyle, DBIterator, DBPinnableSlice, DBRawIterator, FifoCompactOptions,
IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB,
},
serde::{de::DeserializeOwned, Serialize},
Expand Down Expand Up @@ -450,6 +451,23 @@ impl Rocks {
Ok(())
}

fn multi_get_cf(
&self,
cf: &ColumnFamily,
keys: Vec<&[u8]>,
) -> Vec<Result<Option<DBPinnableSlice>>> {
let values = self
.db
.batched_multi_get_cf(cf, keys, false)
.into_iter()
.map(|result| match result {
Ok(opt) => Ok(opt),
Err(e) => Err(BlockstoreError::RocksDb(e)),
})
.collect::<Vec<_>>();
values
}

fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> {
self.db.delete_cf(cf, key)?;
Ok(())
Expand Down Expand Up @@ -1289,6 +1307,40 @@ impl<C> LedgerColumn<C>
where
C: TypedColumn + ColumnName,
{
pub fn multi_get(&self, keys: Vec<C::Index>) -> Vec<Result<Option<C::Type>>> {
let rocks_keys: Vec<_> = keys.into_iter().map(|key| C::key(key)).collect();
{
let ref_rocks_keys: Vec<_> = rocks_keys.iter().map(|k| &k[..]).collect();
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
let result = self
.backend
.multi_get_cf(self.handle(), ref_rocks_keys)
.into_iter()
.map(|r| match r {
Ok(opt) => match opt {
Some(pinnable_slice) => Ok(Some(deserialize(pinnable_slice.as_ref())?)),
None => Ok(None),
},
Err(e) => Err(e),
})
.collect::<Vec<Result<Option<_>>>>();
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}

result
}
}

pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf(
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/blockstore_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell<PerfContext> = RefC
// The minimum time duration between two RocksDB perf samples of the same operation.
const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1);
pub(crate) const PERF_METRIC_OP_NAME_GET: &str = "get";

pub(crate) const PERF_METRIC_OP_NAME_MULTI_GET: &str = "multi_get";
pub(crate) const PERF_METRIC_OP_NAME_PUT: &str = "put";
pub(crate) const PERF_METRIC_OP_NAME_WRITE_BATCH: &str = "write_batch";

Expand Down

0 comments on commit ba3d9cd

Please sign in to comment.