From ac7dd4176483c14b27bad2dbc9ffb8aff9c7f1f0 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 4 May 2020 13:46:48 -0600 Subject: [PATCH] Switch from CSV to a pickledb database (#8) * Switch from CSV to a pickledb database * Allow PickleDb errors to bubble up * Dedup * Hoist db * Add finalized field to TransactionInfo * Don't allow RPC client to resign transactions * Remove dead code * Use transport::Result * Record unconfirmed transaction * Fix: separate stake account per allocation * Catch transport errors * Panic if we attempt to replay a transaction that hasn't been finalized * Attempt to fix CI PickleDb isn't calling flush() or close() after writing to files. No issue on MacOS, but looks racy in CI. * Revert "Attempt to fix CI" This reverts commit 1632394f636c54402b3578120e8817dd1660e19b. * Poll for signature before returning --- Cargo.lock | 31 +++++++++ Cargo.toml | 2 + src/arg_parser.rs | 12 ++-- src/args.rs | 8 +-- src/thin_client.rs | 97 ++++++++++++--------------- src/tokens.rs | 160 +++++++++++++++++++++++++-------------------- 6 files changed, 176 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eda743bfedb95c..ec63fa88fb4499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1120,6 +1120,12 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "half" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36b5f248235f45773d4944f555f83ea61fe07b18b561ccf99d7483d7381e54d" + [[package]] name = "hash32" version = "0.1.1" @@ -2054,6 +2060,19 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pickledb" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9161694d67f6c5163519d42be942ae36bbdb55f439460144f105bc4f9f7d1d61" +dependencies = [ + "bincode", + "serde", + "serde_cbor", + "serde_json", + "serde_yaml", +] + [[package]] name = "pin-project" version = "0.4.9" @@ -2645,6 +2664,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.106" @@ -3416,6 +3445,7 @@ dependencies = [ "console 0.10.3", "csv", "indexmap", + "pickledb", "serde", "solana-clap-utils", "solana-cli-config", @@ -3426,6 +3456,7 @@ dependencies = [ "solana-sdk", "solana-stake-program", "tempfile", + "thiserror", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 27afb716176ef9..9657c00623b66d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ csv = "1.1.3" clap = "2.33.0" console = "0.10.3" indexmap = "1.3.2" +pickledb = "0.4.1" serde = { version = "1.0", features = ["derive"] } solana-clap-utils = "1.1.8" solana-cli-config = "1.1.8" @@ -22,6 +23,7 @@ solana-runtime = "1.1.8" solana-sdk = "1.1.8" solana-stake-program = "1.1.8" tempfile = "3.1.0" +thiserror = "1.0" [dev-dependencies] solana-core = "1.1.8" diff --git a/src/arg_parser.rs b/src/arg_parser.rs index 0c83f57aa5cfc2..47808c8abe70db 100644 --- a/src/arg_parser.rs +++ b/src/arg_parser.rs @@ -34,12 +34,12 @@ where SubCommand::with_name("distribute-tokens") .about("Distribute tokens") .arg( - Arg::with_name("transactions_csv") + Arg::with_name("transactions_db") .required(true) .index(1) .takes_value(true) .value_name("FILE") - .help("Transactions CSV file"), + .help("Transactions database file"), ) .arg( Arg::with_name("bids_csv") @@ -83,12 +83,12 @@ where SubCommand::with_name("distribute-stake") .about("Distribute stake accounts") .arg( - Arg::with_name("transactions_csv") + Arg::with_name("transactions_db") .required(true) .index(1) .takes_value(true) .value_name("FILE") - .help("Transactions CSV file"), + .help("Transactions database file"), ) .arg( Arg::with_name("allocations_csv") @@ -163,7 +163,7 @@ where fn parse_distribute_tokens_args(matches: &ArgMatches<'_>) -> DistributeTokensArgs { DistributeTokensArgs { bids_csv: value_t_or_exit!(matches, "bids_csv", String), - transactions_csv: value_t_or_exit!(matches, "transactions_csv", String), + transactions_db: value_t_or_exit!(matches, "transactions_db", String), dollars_per_sol: value_t_or_exit!(matches, "dollars_per_sol", f64), dry_run: matches.is_present("dry_run"), sender_keypair: value_t!(matches, "sender_keypair", String).ok(), @@ -174,7 +174,7 @@ fn parse_distribute_tokens_args(matches: &ArgMatches<'_>) -> DistributeTokensArg fn parse_distribute_stake_args(matches: &ArgMatches<'_>) -> DistributeStakeArgs { DistributeStakeArgs { allocations_csv: value_t_or_exit!(matches, "allocations_csv", String), - transactions_csv: value_t_or_exit!(matches, "transactions_csv", String), + transactions_db: value_t_or_exit!(matches, "transactions_db", String), dry_run: matches.is_present("dry_run"), stake_account_address: value_t_or_exit!(matches, "stake_account_address", String), stake_authority: value_t!(matches, "stake_authority", String).ok(), diff --git a/src/args.rs b/src/args.rs index 106cbd8239b1be..3e1d21055fdd1e 100644 --- a/src/args.rs +++ b/src/args.rs @@ -6,7 +6,7 @@ use std::error::Error; pub struct DistributeTokensArgs { pub bids_csv: String, - pub transactions_csv: String, + pub transactions_db: String, pub dollars_per_sol: f64, pub dry_run: bool, pub sender_keypair: Option, @@ -15,7 +15,7 @@ pub struct DistributeTokensArgs { pub struct DistributeStakeArgs { pub allocations_csv: String, - pub transactions_csv: String, + pub transactions_db: String, pub dry_run: bool, pub stake_account_address: P, pub stake_authority: Option, @@ -49,7 +49,7 @@ pub fn resolve_command( let matches = ArgMatches::default(); let resolved_args = DistributeTokensArgs { bids_csv: args.bids_csv, - transactions_csv: args.transactions_csv, + transactions_db: args.transactions_db, dollars_per_sol: args.dollars_per_sol, dry_run: args.dry_run, sender_keypair: args.sender_keypair.as_ref().map(|key_url| { @@ -66,7 +66,7 @@ pub fn resolve_command( let matches = ArgMatches::default(); let resolved_args = DistributeStakeArgs { allocations_csv: args.allocations_csv, - transactions_csv: args.transactions_csv, + transactions_db: args.transactions_db, dry_run: args.dry_run, stake_account_address: pubkey_from_path( &matches, diff --git a/src/thin_client.rs b/src/thin_client.rs index a9c5edcea50002..fc19e3b0031035 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -1,105 +1,94 @@ use solana_client::rpc_client::RpcClient; use solana_runtime::bank_client::BankClient; use solana_sdk::{ - client::SyncClient, + client::{AsyncClient, SyncClient}, + fee_calculator::FeeCalculator, + hash::Hash, message::Message, pubkey::Pubkey, - signature::{Signature, Signer}, + signature::{Keypair, Signature, Signer}, signers::Signers, system_instruction, transaction::Transaction, - transport::TransportError, + transport::{Result, TransportError}, }; pub trait Client { - fn send_and_confirm_message( - &self, - message: Message, - signers: &S, - ) -> Result; - - fn get_balance1(&self, pubkey: &Pubkey) -> Result; + fn send_and_confirm_transaction1(&self, transaction: Transaction) -> Result; + fn get_balance1(&self, pubkey: &Pubkey) -> Result; + fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)>; } impl Client for RpcClient { - fn send_and_confirm_message( - &self, - message: Message, - signers: &S, - ) -> Result { - let mut transaction = Transaction::new_unsigned(message); - self.resign_transaction(&mut transaction, signers) - .map_err(|e| TransportError::Custom(e.to_string()))?; - let initial_signature = transaction.signatures[0]; - println!("Sending transaction with signature {}", initial_signature); - let signature = self - .send_and_confirm_transaction_with_spinner(&mut transaction, signers) - .map_err(|e| TransportError::Custom(e.to_string()))?; - Ok(signature) + fn send_and_confirm_transaction1(&self, mut transaction: Transaction) -> Result { + let signers: Vec<&Keypair> = vec![]; // Don't allow resigning + self.send_and_confirm_transaction_with_spinner(&mut transaction, &signers) + .map_err(|e| TransportError::Custom(e.to_string())) } - fn get_balance1(&self, pubkey: &Pubkey) -> Result { + fn get_balance1(&self, pubkey: &Pubkey) -> Result { let balance = self .get_balance(pubkey) .map_err(|e| TransportError::Custom(e.to_string()))?; Ok(balance) } + + fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> { + let blockhash = self + .get_recent_blockhash() + .map_err(|e| TransportError::Custom(e.to_string()))?; + Ok(blockhash) + } } impl Client for BankClient { - fn send_and_confirm_message( - &self, - message: Message, - signers: &S, - ) -> Result { - self.send_message(signers, message) + fn send_and_confirm_transaction1(&self, transaction: Transaction) -> Result { + let signature = self.async_send_transaction(transaction)?; + self.poll_for_signature(&signature)?; + Ok(signature) } - fn get_balance1(&self, pubkey: &Pubkey) -> Result { + fn get_balance1(&self, pubkey: &Pubkey) -> Result { self.get_balance(pubkey) } -} -impl Client for () { - fn send_and_confirm_message( - &self, - _message: Message, - _signers: &S, - ) -> Result { - Ok(Signature::default()) - } - - fn get_balance1(&self, _pubkey: &Pubkey) -> Result { - Ok(0) + fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> { + self.get_recent_blockhash() } } pub struct ThinClient(pub C); impl ThinClient { + pub fn send_transaction(&self, transaction: Transaction) -> Result { + self.0.send_and_confirm_transaction1(transaction) + } + + pub fn send_message(&self, message: Message, signers: &S) -> Result { + let (blockhash, _fee_caluclator) = self.0.get_recent_blockhash_and_fees()?; + let transaction = Transaction::new(signers, message, blockhash); + let signature = transaction.signatures[0]; + self.send_transaction(transaction)?; + Ok(signature) + } + pub fn transfer( &self, lamports: u64, sender_keypair: &S, to_pubkey: &Pubkey, - ) -> Result { + ) -> Result { let create_instruction = system_instruction::transfer(&sender_keypair.pubkey(), &to_pubkey, lamports); let message = Message::new(&[create_instruction]); self.send_message(message, &[sender_keypair]) } -} -impl ThinClient { - pub fn send_message( - &self, - message: Message, - signers: &S, - ) -> Result { - self.0.send_and_confirm_message(message, signers) + pub fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> { + self.0.get_recent_blockhash_and_fees() } - pub fn get_balance(&self, pubkey: &Pubkey) -> Result { + pub fn get_balance(&self, pubkey: &Pubkey) -> Result { self.0.get_balance1(pubkey) } } diff --git a/src/tokens.rs b/src/tokens.rs index 091fc63c703859..6bccd3375c700d 100644 --- a/src/tokens.rs +++ b/src/tokens.rs @@ -3,18 +3,20 @@ use crate::thin_client::{Client, ThinClient}; use console::style; use csv::{ReaderBuilder, Trim}; use indexmap::IndexMap; +use pickledb::{PickleDb, PickleDbDumpPolicy}; use serde::{Deserialize, Serialize}; use solana_sdk::{ message::Message, native_token::{lamports_to_sol, sol_to_lamports}, signature::{Signature, Signer}, system_instruction, + transaction::Transaction, + transport::TransportError, }; use solana_stake_program::{ stake_instruction, stake_state::{Authorized, Lockup, StakeAuthorize}, }; -use std::fs; use std::path::Path; use std::process; @@ -34,8 +36,18 @@ struct Allocation { struct TransactionInfo { recipient: String, amount: f64, - signature: String, new_stake_account_address: String, + finalized: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("CSV error")] + CsvError(#[from] csv::Error), + #[error("PickleDb error")] + PickleDbError(#[from] pickledb::error::Error), + #[error("Transport error")] + TransportError(#[from] TransportError), } fn merge_allocations(allocations: &[Allocation]) -> Vec { @@ -57,6 +69,9 @@ fn apply_previous_transactions( transaction_infos: &[TransactionInfo], ) { for transaction_info in transaction_infos { + if !transaction_info.finalized { + panic!("Attempt to apply an unconfirmed transaction. Please confirm the trarnsaction has been finalized or that its blockhash has expired."); + } let mut amount = transaction_info.amount; for allocation in allocations.iter_mut() { if allocation.recipient != transaction_info.recipient { @@ -83,9 +98,10 @@ fn create_allocation(bid: &Bid, dollars_per_sol: f64) -> Allocation { fn distribute_tokens( client: &ThinClient, + db: &mut PickleDb, allocations: &[Allocation], args: &DistributeTokensArgs>, -) -> Result<(), csv::Error> { +) -> Result<(), Error> { let signers = if args.dry_run { vec![] } else { @@ -107,13 +123,17 @@ fn distribute_tokens( let lamports = sol_to_lamports(allocation.amount); let instruction = system_instruction::transfer(&from, &to, lamports); let message = Message::new_with_payer(&[instruction], Some(&fee_payer_pubkey)); - client.send_message(message, &signers) + let (blockhash, _fee_caluclator) = client.get_recent_blockhash_and_fees()?; + let transaction = Transaction::new(&signers, message, blockhash); + let signature = transaction.signatures[0]; + set_transaction_info(db, &allocation, &signature, None, false)?; + client.send_transaction(transaction) }; match result { Ok(signature) => { println!("Finalized transaction with signature {}", signature); if !args.dry_run { - append_transaction_info(&allocation, &signature, None, &args.transactions_csv)?; + set_transaction_info(db, &allocation, &signature, None, true)?; } } Err(e) => { @@ -126,23 +146,24 @@ fn distribute_tokens( fn distribute_stake( client: &ThinClient, + db: &mut PickleDb, allocations: &[Allocation], args: &DistributeStakeArgs>, -) -> Result<(), csv::Error> { - let new_stake_account_keypair = Keypair::new(); - let new_stake_account_address = new_stake_account_keypair.pubkey(); - let signers = if args.dry_run { - vec![] - } else { - vec![ - &**args.fee_payer.as_ref().unwrap(), - &**args.stake_authority.as_ref().unwrap(), - &**args.withdraw_authority.as_ref().unwrap(), - &new_stake_account_keypair, - ] - }; - +) -> Result<(), Error> { for allocation in allocations { + let new_stake_account_keypair = Keypair::new(); + let new_stake_account_address = new_stake_account_keypair.pubkey(); + let signers = if args.dry_run { + vec![] + } else { + vec![ + &**args.fee_payer.as_ref().unwrap(), + &**args.stake_authority.as_ref().unwrap(), + &**args.withdraw_authority.as_ref().unwrap(), + &new_stake_account_keypair, + ] + }; + println!("{:<44} {:>24.9}", allocation.recipient, allocation.amount); let result = if args.dry_run { Ok(Signature::default()) @@ -184,18 +205,28 @@ fn distribute_stake( )); let message = Message::new_with_payer(&instructions, Some(&fee_payer_pubkey)); - println!("Creating stake account {}", new_stake_account_address); - client.send_message(message, &signers) + let (blockhash, _fee_caluclator) = client.get_recent_blockhash_and_fees()?; + let transaction = Transaction::new(&signers, message, blockhash); + let signature = transaction.signatures[0]; + set_transaction_info( + db, + &allocation, + &signature, + Some(&new_stake_account_address), + false, + )?; + client.send_transaction(transaction) }; match result { Ok(signature) => { println!("Finalized transaction with signature {}", signature); if !args.dry_run { - append_transaction_info( + set_transaction_info( + db, &allocation, &signature, Some(&new_stake_account_address), - &args.transactions_csv, + true, )?; } } @@ -207,47 +238,44 @@ fn distribute_stake( Ok(()) } -fn read_transaction_infos(path: &str) -> Vec { - let mut rdr = ReaderBuilder::new() - .trim(Trim::All) - .from_path(&path) - .unwrap(); - rdr.deserialize().map(|x| x.unwrap()).collect() +fn open_db(path: &str) -> Result { + let policy = PickleDbDumpPolicy::AutoDump; + if Path::new(path).exists() { + PickleDb::load_yaml(path, policy) + } else { + Ok(PickleDb::new_yaml(path, policy)) + } +} + +fn read_transaction_infos(db: &PickleDb) -> Vec { + db.iter() + .map(|kv| kv.get_value::().unwrap()) + .collect() } -fn append_transaction_info( +fn set_transaction_info( + db: &mut PickleDb, allocation: &Allocation, signature: &Signature, new_stake_account_address: Option<&Pubkey>, - transactions_csv: &str, -) -> Result<(), csv::Error> { - let existed = Path::new(&transactions_csv).exists(); - let file = fs::OpenOptions::new() - .create_new(!existed) - .write(true) - .append(existed) - .open(&transactions_csv)?; - let mut wtr = csv::WriterBuilder::new() - .has_headers(!existed) - .from_writer(file); - + finalized: bool, +) -> Result<(), pickledb::error::Error> { let transaction_info = TransactionInfo { recipient: allocation.recipient.clone(), amount: allocation.amount, - signature: signature.to_string(), new_stake_account_address: new_stake_account_address .map(|pubkey| pubkey.to_string()) .unwrap_or("".to_string()), + finalized, }; - wtr.serialize(transaction_info)?; - wtr.flush()?; + db.set(&signature.to_string(), &transaction_info)?; Ok(()) } pub fn process_distribute_tokens( client: &ThinClient, args: &DistributeTokensArgs>, -) -> Result<(), csv::Error> { +) -> Result<(), Error> { let mut rdr = ReaderBuilder::new() .trim(Trim::All) .from_path(&args.bids_csv)?; @@ -265,12 +293,8 @@ pub fn process_distribute_tokens( starting_total_tokens * args.dollars_per_sol, ); - let transaction_infos = if Path::new(&args.transactions_csv).exists() { - read_transaction_infos(&args.transactions_csv) - } else { - vec![] - }; - + let mut db = open_db(&args.transactions_db)?; + let transaction_infos = read_transaction_infos(&db); apply_previous_transactions(&mut allocations, &transaction_infos); if allocations.is_empty() { @@ -329,7 +353,7 @@ pub fn process_distribute_tokens( (distributed_tokens + undistributed_tokens) * args.dollars_per_sol, ); - distribute_tokens(client, &allocations, args)?; + distribute_tokens(client, &mut db, &allocations, args)?; Ok(()) } @@ -337,7 +361,7 @@ pub fn process_distribute_tokens( pub fn process_distribute_stake( client: &ThinClient, args: &DistributeStakeArgs>, -) -> Result<(), csv::Error> { +) -> Result<(), Error> { let mut rdr = ReaderBuilder::new() .trim(Trim::All) .from_path(&args.allocations_csv)?; @@ -346,12 +370,8 @@ pub fn process_distribute_stake( .map(|allocation| allocation.unwrap()) .collect(); - let transaction_infos = if Path::new(&args.transactions_csv).exists() { - read_transaction_infos(&args.transactions_csv) - } else { - vec![] - }; - + let mut db = open_db(&args.transactions_db)?; + let transaction_infos = read_transaction_infos(&db); let mut allocations = merge_allocations(&allocations); apply_previous_transactions(&mut allocations, &transaction_infos); @@ -360,7 +380,7 @@ pub fn process_distribute_stake( return Ok(()); } - distribute_stake(client, &allocations, args)?; + distribute_stake(client, &mut db, &allocations, args)?; Ok(()) } @@ -425,7 +445,7 @@ pub fn test_process_distribute_with_client(client: C, sender_keypair: wtr.flush().unwrap(); let dir = tempdir().unwrap(); - let transactions_csv = dir + let transactions_db = dir .path() .join("transactions.csv") .to_str() @@ -437,11 +457,11 @@ pub fn test_process_distribute_with_client(client: C, sender_keypair: fee_payer: Some(Box::new(fee_payer)), dry_run: false, bids_csv, - transactions_csv: transactions_csv.clone(), + transactions_db: transactions_db.clone(), dollars_per_sol: 0.22, }; process_distribute_tokens(&thin_client, &args).unwrap(); - let transaction_infos = read_transaction_infos(&transactions_csv); + let transaction_infos = read_transaction_infos(&open_db(&transactions_db).unwrap()); assert_eq!(transaction_infos.len(), 1); assert_eq!(transaction_infos[0].recipient, alice_pubkey.to_string()); let expected_amount = bid.accepted_amount_dollars / args.dollars_per_sol; @@ -454,7 +474,7 @@ pub fn test_process_distribute_with_client(client: C, sender_keypair: // Now, run it again, and check there's no double-spend. process_distribute_tokens(&thin_client, &args).unwrap(); - let transaction_infos = read_transaction_infos(&transactions_csv); + let transaction_infos = read_transaction_infos(&open_db(&transactions_db).unwrap()); assert_eq!(transaction_infos.len(), 1); assert_eq!(transaction_infos[0].recipient, alice_pubkey.to_string()); let expected_amount = bid.accepted_amount_dollars / args.dollars_per_sol; @@ -507,7 +527,7 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke wtr.flush().unwrap(); let dir = tempdir().unwrap(); - let transactions_csv = dir + let transactions_db = dir .path() .join("transactions.csv") .to_str() @@ -521,10 +541,10 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke fee_payer: Some(Box::new(fee_payer)), dry_run: false, allocations_csv, - transactions_csv: transactions_csv.clone(), + transactions_db: transactions_db.clone(), }; process_distribute_stake(&thin_client, &args).unwrap(); - let transaction_infos = read_transaction_infos(&transactions_csv); + let transaction_infos = read_transaction_infos(&open_db(&transactions_db).unwrap()); assert_eq!(transaction_infos.len(), 1); assert_eq!(transaction_infos[0].recipient, alice_pubkey.to_string()); let expected_amount = allocation.amount; @@ -545,7 +565,7 @@ pub fn test_process_distribute_stake_with_client(client: C, sender_ke // Now, run it again, and check there's no double-spend. process_distribute_stake(&thin_client, &args).unwrap(); - let transaction_infos = read_transaction_infos(&transactions_csv); + let transaction_infos = read_transaction_infos(&open_db(&transactions_db).unwrap()); assert_eq!(transaction_infos.len(), 1); assert_eq!(transaction_infos[0].recipient, alice_pubkey.to_string()); let expected_amount = allocation.amount; @@ -598,8 +618,8 @@ mod tests { let transaction_infos = vec![TransactionInfo { recipient: "b".to_string(), amount: 1.0, - signature: "".to_string(), new_stake_account_address: "".to_string(), + finalized: true, }]; apply_previous_transactions(&mut allocations, &transaction_infos); assert_eq!(allocations.len(), 1);