Skip to content

Commit

Permalink
[Transaction Service Status] Batch status and memo writes to DB. (anz…
Browse files Browse the repository at this point in the history
…a-xyz#3026)

* Batch tx status writes to db.

* Batch tx status writes across multiple transactions to db.

* Add db batch writes to tx memos.

* Default derivation no longer needed.

* Fix blockstore unit tests.

* minor fix

* Add tss unit test. Split batching into new functions.

* Fix CI issue.

* Add cargo benches and more unit tests.

* Fix clippy error.

* Better loop indexer names. Better batching functions abstraction.

* Review feedback. Refactor common code.

Co-authored-by: Lijun Wang <[email protected]>

---------

Co-authored-by: Lijun Wang <[email protected]>
  • Loading branch information
fkouteib and lijunwangs authored Oct 17, 2024
1 parent 38404a6 commit 17d94c4
Show file tree
Hide file tree
Showing 4 changed files with 485 additions and 11 deletions.
121 changes: 120 additions & 1 deletion ledger/benches/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use {
blockstore::{entries_to_test_shreds, Blockstore},
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{clock::Slot, hash::Hash},
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature},
solana_transaction_status::TransactionStatusMeta,
std::path::Path,
test::Bencher,
};
Expand Down Expand Up @@ -154,3 +155,121 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) {
blockstore.insert_shreds(shreds, None, false).unwrap();
});
}

#[bench]
#[ignore]
fn bench_write_transaction_memos(b: &mut Bencher) {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..64).map(|_| Signature::new_unique()).collect();
b.iter(|| {
for (slot, signature) in signatures.iter().enumerate() {
blockstore
.write_transaction_memos(
signature,
slot as u64,
"bench_write_transaction_memos".to_string(),
)
.unwrap();
}
});
}

#[bench]
#[ignore]
fn bench_add_transaction_memos_to_batch(b: &mut Bencher) {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..64).map(|_| Signature::new_unique()).collect();
b.iter(|| {
let mut memos_batch = blockstore.get_write_batch().unwrap();

for (slot, signature) in signatures.iter().enumerate() {
blockstore
.add_transaction_memos_to_batch(
signature,
slot as u64,
"bench_write_transaction_memos".to_string(),
&mut memos_batch,
)
.unwrap();
}

blockstore.write_batch(memos_batch).unwrap();
});
}

#[bench]
#[ignore]
fn bench_write_transaction_status(b: &mut Bencher) {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..64).map(|_| Signature::new_unique()).collect();
let keys_with_writable: Vec<Vec<(Pubkey, bool)>> = (0..64)
.map(|_| {
vec![
(Pubkey::new_unique(), true),
(Pubkey::new_unique(), false),
(Pubkey::new_unique(), true),
(Pubkey::new_unique(), false),
]
})
.collect();
let slot = 5;

b.iter(|| {
for (tx_idx, signature) in signatures.iter().enumerate() {
blockstore
.write_transaction_status(
slot,
*signature,
keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)),
TransactionStatusMeta::default(),
tx_idx,
)
.unwrap();
}
});
}

#[bench]
#[ignore]
fn bench_add_transaction_status_to_batch(b: &mut Bencher) {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..64).map(|_| Signature::new_unique()).collect();
let keys_with_writable: Vec<Vec<(Pubkey, bool)>> = (0..64)
.map(|_| {
vec![
(Pubkey::new_unique(), true),
(Pubkey::new_unique(), false),
(Pubkey::new_unique(), true),
(Pubkey::new_unique(), false),
]
})
.collect();
let slot = 5;

b.iter(|| {
let mut status_batch = blockstore.get_write_batch().unwrap();

for (tx_idx, signature) in signatures.iter().enumerate() {
blockstore
.add_transaction_status_to_batch(
slot,
*signature,
keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)),
TransactionStatusMeta::default(),
tx_idx,
&mut status_batch,
)
.unwrap();
}

blockstore.write_batch(status_batch).unwrap();
});
}
220 changes: 214 additions & 6 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2892,30 +2892,79 @@ impl Blockstore {
}
}

pub fn write_transaction_status<'a>(
#[inline]
fn write_transaction_status_helper<'a, F>(
&self,
slot: Slot,
signature: Signature,
keys_with_writable: impl Iterator<Item = (&'a Pubkey, bool)>,
status: TransactionStatusMeta,
transaction_index: usize,
) -> Result<()> {
mut write_fn: F,
) -> Result<()>
where
F: FnMut(&Pubkey, Slot, u32, Signature, bool) -> Result<()>,
{
let status = status.into();
let transaction_index = u32::try_from(transaction_index)
.map_err(|_| BlockstoreError::TransactionIndexOverflow)?;
self.transaction_status_cf
.put_protobuf((signature, slot), &status)?;

for (address, writeable) in keys_with_writable {
self.address_signatures_cf.put(
(*address, slot, transaction_index, signature),
&AddressSignatureMeta { writeable },
)?;
write_fn(address, slot, transaction_index, signature, writeable)?;
}

Ok(())
}

pub fn write_transaction_status<'a>(
&self,
slot: Slot,
signature: Signature,
keys_with_writable: impl Iterator<Item = (&'a Pubkey, bool)>,
status: TransactionStatusMeta,
transaction_index: usize,
) -> Result<()> {
self.write_transaction_status_helper(
slot,
signature,
keys_with_writable,
status,
transaction_index,
|address, slot, tx_index, signature, writeable| {
self.address_signatures_cf.put(
(*address, slot, tx_index, signature),
&AddressSignatureMeta { writeable },
)
},
)
}

pub fn add_transaction_status_to_batch<'a>(
&self,
slot: Slot,
signature: Signature,
keys_with_writable: impl Iterator<Item = (&'a Pubkey, bool)>,
status: TransactionStatusMeta,
transaction_index: usize,
db_write_batch: &mut WriteBatch<'_>,
) -> Result<()> {
self.write_transaction_status_helper(
slot,
signature,
keys_with_writable,
status,
transaction_index,
|address, slot, tx_index, signature, writeable| {
db_write_batch.put::<cf::AddressSignatures>(
(*address, slot, tx_index, signature),
&AddressSignatureMeta { writeable },
)
},
)
}

pub fn read_transaction_memos(
&self,
signature: Signature,
Expand Down Expand Up @@ -2943,6 +2992,16 @@ impl Blockstore {
self.transaction_memos_cf.put((*signature, slot), &memos)
}

pub fn add_transaction_memos_to_batch(
&self,
signature: &Signature,
slot: Slot,
memos: String,
db_write_batch: &mut WriteBatch<'_>,
) -> Result<()> {
db_write_batch.put::<cf::TransactionMemos>((*signature, slot), &memos)
}

/// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock
/// and lowest available slot.
///
Expand Down Expand Up @@ -4573,6 +4632,14 @@ impl Blockstore {
*index_meta_time_us += total_start.as_us();
res
}

pub fn get_write_batch(&self) -> std::result::Result<WriteBatch<'_>, BlockstoreError> {
self.db.batch()
}

pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> {
self.db.write(write_batch)
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
Expand Down Expand Up @@ -12253,4 +12320,145 @@ pub mod tests {
);
}
}

#[test]
fn test_write_transaction_memos() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let signature: Signature = Signature::new_unique();

blockstore
.write_transaction_memos(&signature, 4, "test_write_transaction_memos".to_string())
.unwrap();

let memo = blockstore
.read_transaction_memos(signature, 4)
.expect("Expected to find memo");
assert_eq!(memo, Some("test_write_transaction_memos".to_string()));
}

#[test]
fn test_add_transaction_memos_to_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..2).map(|_| Signature::new_unique()).collect();
let mut memos_batch = blockstore.get_write_batch().unwrap();

blockstore
.add_transaction_memos_to_batch(
&signatures[0],
4,
"test_write_transaction_memos1".to_string(),
&mut memos_batch,
)
.unwrap();

blockstore
.add_transaction_memos_to_batch(
&signatures[1],
5,
"test_write_transaction_memos2".to_string(),
&mut memos_batch,
)
.unwrap();

blockstore.write_batch(memos_batch).unwrap();

let memo1 = blockstore
.read_transaction_memos(signatures[0], 4)
.expect("Expected to find memo");
assert_eq!(memo1, Some("test_write_transaction_memos1".to_string()));

let memo2 = blockstore
.read_transaction_memos(signatures[1], 5)
.expect("Expected to find memo");
assert_eq!(memo2, Some("test_write_transaction_memos2".to_string()));
}

#[test]
fn test_write_transaction_status() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..2).map(|_| Signature::new_unique()).collect();
let keys_with_writable: Vec<(Pubkey, bool)> =
vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)];
let slot = 5;

blockstore
.write_transaction_status(
slot,
signatures[0],
keys_with_writable
.iter()
.map(|&(ref pubkey, writable)| (pubkey, writable)),
TransactionStatusMeta {
fee: 4200,
..TransactionStatusMeta::default()
},
0,
)
.unwrap();

let tx_status = blockstore
.read_transaction_status((signatures[0], slot))
.unwrap()
.unwrap();
assert_eq!(tx_status.fee, 4200);
}

#[test]
fn test_add_transaction_status_to_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let signatures: Vec<Signature> = (0..2).map(|_| Signature::new_unique()).collect();
let keys_with_writable: Vec<Vec<(Pubkey, bool)>> = (0..2)
.map(|_| vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)])
.collect();
let slot = 5;
let mut status_batch = blockstore.get_write_batch().unwrap();

for (tx_idx, signature) in signatures.iter().enumerate() {
blockstore
.add_transaction_status_to_batch(
slot,
*signature,
keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)),
TransactionStatusMeta {
fee: 5700 + tx_idx as u64,
status: if tx_idx % 2 == 0 {
Ok(())
} else {
Err(TransactionError::InsufficientFundsForFee)
},
..TransactionStatusMeta::default()
},
tx_idx,
&mut status_batch,
)
.unwrap();
}

blockstore.write_batch(status_batch).unwrap();

let tx_status1 = blockstore
.read_transaction_status((signatures[0], slot))
.unwrap()
.unwrap();
assert_eq!(tx_status1.fee, 5700);
assert_eq!(tx_status1.status, Ok(()));

let tx_status2 = blockstore
.read_transaction_status((signatures[1], slot))
.unwrap()
.unwrap();
assert_eq!(tx_status2.fee, 5701);
assert_eq!(
tx_status2.status,
Err(TransactionError::InsufficientFundsForFee)
);
}
}
Loading

0 comments on commit 17d94c4

Please sign in to comment.