Skip to content

Commit

Permalink
Switch from CSV to a pickledb database (solana-labs#8)
Browse files Browse the repository at this point in the history
* Switch from CSV to a pickledb database

* Allow PickleDb errors to bubble up

* Dedup

* Hoist db

* Add finalized field to TransactionInfo

* Don't allow RPC client to resign transactions

* Remove dead code

* Use transport::Result

* Record unconfirmed transaction

* Fix: separate stake account per allocation

* Catch transport errors

* Panic if we attempt to replay a transaction that hasn't been finalized

* Attempt to fix CI

PickleDb isn't calling flush() or close() after writing to files.
No issue on MacOS, but looks racy in CI.

* Revert "Attempt to fix CI"

This reverts commit 1632394f636c54402b3578120e8817dd1660e19b.

* Poll for signature before returning
  • Loading branch information
garious authored May 4, 2020
1 parent 07d9617 commit ac7dd41
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 134 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ csv = "1.1.3"
clap = "2.33.0"
console = "0.10.3"
indexmap = "1.3.2"
pickledb = "0.4.1"
serde = { version = "1.0", features = ["derive"] }
solana-clap-utils = "1.1.8"
solana-cli-config = "1.1.8"
Expand All @@ -22,6 +23,7 @@ solana-runtime = "1.1.8"
solana-sdk = "1.1.8"
solana-stake-program = "1.1.8"
tempfile = "3.1.0"
thiserror = "1.0"

[dev-dependencies]
solana-core = "1.1.8"
12 changes: 6 additions & 6 deletions src/arg_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ where
SubCommand::with_name("distribute-tokens")
.about("Distribute tokens")
.arg(
Arg::with_name("transactions_csv")
Arg::with_name("transactions_db")
.required(true)
.index(1)
.takes_value(true)
.value_name("FILE")
.help("Transactions CSV file"),
.help("Transactions database file"),
)
.arg(
Arg::with_name("bids_csv")
Expand Down Expand Up @@ -83,12 +83,12 @@ where
SubCommand::with_name("distribute-stake")
.about("Distribute stake accounts")
.arg(
Arg::with_name("transactions_csv")
Arg::with_name("transactions_db")
.required(true)
.index(1)
.takes_value(true)
.value_name("FILE")
.help("Transactions CSV file"),
.help("Transactions database file"),
)
.arg(
Arg::with_name("allocations_csv")
Expand Down Expand Up @@ -163,7 +163,7 @@ where
fn parse_distribute_tokens_args(matches: &ArgMatches<'_>) -> DistributeTokensArgs<String> {
DistributeTokensArgs {
bids_csv: value_t_or_exit!(matches, "bids_csv", String),
transactions_csv: value_t_or_exit!(matches, "transactions_csv", String),
transactions_db: value_t_or_exit!(matches, "transactions_db", String),
dollars_per_sol: value_t_or_exit!(matches, "dollars_per_sol", f64),
dry_run: matches.is_present("dry_run"),
sender_keypair: value_t!(matches, "sender_keypair", String).ok(),
Expand All @@ -174,7 +174,7 @@ fn parse_distribute_tokens_args(matches: &ArgMatches<'_>) -> DistributeTokensArg
fn parse_distribute_stake_args(matches: &ArgMatches<'_>) -> DistributeStakeArgs<String, String> {
DistributeStakeArgs {
allocations_csv: value_t_or_exit!(matches, "allocations_csv", String),
transactions_csv: value_t_or_exit!(matches, "transactions_csv", String),
transactions_db: value_t_or_exit!(matches, "transactions_db", String),
dry_run: matches.is_present("dry_run"),
stake_account_address: value_t_or_exit!(matches, "stake_account_address", String),
stake_authority: value_t!(matches, "stake_authority", String).ok(),
Expand Down
8 changes: 4 additions & 4 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::error::Error;

pub struct DistributeTokensArgs<K> {
pub bids_csv: String,
pub transactions_csv: String,
pub transactions_db: String,
pub dollars_per_sol: f64,
pub dry_run: bool,
pub sender_keypair: Option<K>,
Expand All @@ -15,7 +15,7 @@ pub struct DistributeTokensArgs<K> {

pub struct DistributeStakeArgs<P, K> {
pub allocations_csv: String,
pub transactions_csv: String,
pub transactions_db: String,
pub dry_run: bool,
pub stake_account_address: P,
pub stake_authority: Option<K>,
Expand Down Expand Up @@ -49,7 +49,7 @@ pub fn resolve_command(
let matches = ArgMatches::default();
let resolved_args = DistributeTokensArgs {
bids_csv: args.bids_csv,
transactions_csv: args.transactions_csv,
transactions_db: args.transactions_db,
dollars_per_sol: args.dollars_per_sol,
dry_run: args.dry_run,
sender_keypair: args.sender_keypair.as_ref().map(|key_url| {
Expand All @@ -66,7 +66,7 @@ pub fn resolve_command(
let matches = ArgMatches::default();
let resolved_args = DistributeStakeArgs {
allocations_csv: args.allocations_csv,
transactions_csv: args.transactions_csv,
transactions_db: args.transactions_db,
dry_run: args.dry_run,
stake_account_address: pubkey_from_path(
&matches,
Expand Down
97 changes: 43 additions & 54 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,94 @@
use solana_client::rpc_client::RpcClient;
use solana_runtime::bank_client::BankClient;
use solana_sdk::{
client::SyncClient,
client::{AsyncClient, SyncClient},
fee_calculator::FeeCalculator,
hash::Hash,
message::Message,
pubkey::Pubkey,
signature::{Signature, Signer},
signature::{Keypair, Signature, Signer},
signers::Signers,
system_instruction,
transaction::Transaction,
transport::TransportError,
transport::{Result, TransportError},
};

pub trait Client {
fn send_and_confirm_message<S: Signers>(
&self,
message: Message,
signers: &S,
) -> Result<Signature, TransportError>;

fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64, TransportError>;
fn send_and_confirm_transaction1(&self, transaction: Transaction) -> Result<Signature>;
fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64>;
fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)>;
}

impl Client for RpcClient {
fn send_and_confirm_message<S: Signers>(
&self,
message: Message,
signers: &S,
) -> Result<Signature, TransportError> {
let mut transaction = Transaction::new_unsigned(message);
self.resign_transaction(&mut transaction, signers)
.map_err(|e| TransportError::Custom(e.to_string()))?;
let initial_signature = transaction.signatures[0];
println!("Sending transaction with signature {}", initial_signature);
let signature = self
.send_and_confirm_transaction_with_spinner(&mut transaction, signers)
.map_err(|e| TransportError::Custom(e.to_string()))?;
Ok(signature)
fn send_and_confirm_transaction1(&self, mut transaction: Transaction) -> Result<Signature> {
let signers: Vec<&Keypair> = vec![]; // Don't allow resigning
self.send_and_confirm_transaction_with_spinner(&mut transaction, &signers)
.map_err(|e| TransportError::Custom(e.to_string()))
}

fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64, TransportError> {
fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64> {
let balance = self
.get_balance(pubkey)
.map_err(|e| TransportError::Custom(e.to_string()))?;
Ok(balance)
}

fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> {
let blockhash = self
.get_recent_blockhash()
.map_err(|e| TransportError::Custom(e.to_string()))?;
Ok(blockhash)
}
}

impl Client for BankClient {
fn send_and_confirm_message<S: Signers>(
&self,
message: Message,
signers: &S,
) -> Result<Signature, TransportError> {
self.send_message(signers, message)
fn send_and_confirm_transaction1(&self, transaction: Transaction) -> Result<Signature> {
let signature = self.async_send_transaction(transaction)?;
self.poll_for_signature(&signature)?;
Ok(signature)
}

fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64, TransportError> {
fn get_balance1(&self, pubkey: &Pubkey) -> Result<u64> {
self.get_balance(pubkey)
}
}

impl Client for () {
fn send_and_confirm_message<S: Signers>(
&self,
_message: Message,
_signers: &S,
) -> Result<Signature, TransportError> {
Ok(Signature::default())
}

fn get_balance1(&self, _pubkey: &Pubkey) -> Result<u64, TransportError> {
Ok(0)
fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> {
self.get_recent_blockhash()
}
}

pub struct ThinClient<C: Client>(pub C);

impl<C: Client> ThinClient<C> {
pub fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
self.0.send_and_confirm_transaction1(transaction)
}

pub fn send_message<S: Signers>(&self, message: Message, signers: &S) -> Result<Signature> {
let (blockhash, _fee_caluclator) = self.0.get_recent_blockhash_and_fees()?;
let transaction = Transaction::new(signers, message, blockhash);
let signature = transaction.signatures[0];
self.send_transaction(transaction)?;
Ok(signature)
}

pub fn transfer<S: Signer>(
&self,
lamports: u64,
sender_keypair: &S,
to_pubkey: &Pubkey,
) -> Result<Signature, TransportError> {
) -> Result<Signature> {
let create_instruction =
system_instruction::transfer(&sender_keypair.pubkey(), &to_pubkey, lamports);
let message = Message::new(&[create_instruction]);
self.send_message(message, &[sender_keypair])
}
}

impl<C: Client> ThinClient<C> {
pub fn send_message<S: Signers>(
&self,
message: Message,
signers: &S,
) -> Result<Signature, TransportError> {
self.0.send_and_confirm_message(message, signers)
pub fn get_recent_blockhash_and_fees(&self) -> Result<(Hash, FeeCalculator)> {
self.0.get_recent_blockhash_and_fees()
}

pub fn get_balance(&self, pubkey: &Pubkey) -> Result<u64, TransportError> {
pub fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> {
self.0.get_balance1(pubkey)
}
}
Loading

0 comments on commit ac7dd41

Please sign in to comment.