From 89252b85ea7964838d7b8dac70378420c8e10928 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 27 Sep 2019 13:12:44 +0800 Subject: [PATCH] feat: fee estimator --- Cargo.lock | 13 + Cargo.toml | 1 + benches/benches/benchmarks/overall.rs | 2 +- rpc/Cargo.toml | 2 + rpc/README.md | 36 +++ rpc/json/rpc.json | 11 + rpc/src/module/experiment.rs | 43 ++- rpc/src/module/pool.rs | 2 +- rpc/src/service_builder.rs | 2 +- rpc/src/test.rs | 33 +- sync/src/relayer/mod.rs | 2 +- sync/src/relayer/tests/helper.rs | 2 +- test/src/main.rs | 1 + test/src/rpc.rs | 14 +- test/src/specs/mod.rs | 2 +- .../relay/transaction_relay_low_fee_rate.rs | 2 +- test/src/specs/tx_pool/fee_estimate.rs | 140 ++++++++ test/src/specs/tx_pool/limit.rs | 2 +- test/src/specs/tx_pool/mod.rs | 2 + .../src/specs/tx_pool/send_low_fee_rate_tx.rs | 2 +- tx-pool/Cargo.toml | 1 + tx-pool/src/component/commit_txs_scanner.rs | 2 +- tx-pool/src/component/pending.rs | 2 +- tx-pool/src/config.rs | 2 +- tx-pool/src/lib.rs | 2 +- tx-pool/src/pool.rs | 18 +- tx-pool/src/process/chain_reorg.rs | 6 +- tx-pool/src/process/estimate_fee_rate.rs | 34 ++ .../src/process/estimator_process_block.rs | 42 +++ tx-pool/src/process/estimator_track_tx.rs | 46 +++ tx-pool/src/process/mod.rs | 6 + tx-pool/src/process/submit_txs.rs | 14 +- tx-pool/src/service.rs | 104 +++++- util/fee-estimator/Cargo.toml | 14 + util/fee-estimator/src/estimator.rs | 144 +++++++++ .../fee-estimator}/src/fee_rate.rs | 17 +- util/fee-estimator/src/lib.rs | 6 + util/fee-estimator/src/tx_confirm_stat.rs | 305 ++++++++++++++++++ util/jsonrpc-types/src/experiment.rs | 7 +- util/jsonrpc-types/src/lib.rs | 4 +- util/jsonrpc-types/src/primitive.rs | 1 + 41 files changed, 1059 insertions(+), 32 deletions(-) create mode 100644 test/src/specs/tx_pool/fee_estimate.rs create mode 100644 tx-pool/src/process/estimate_fee_rate.rs create mode 100644 tx-pool/src/process/estimator_process_block.rs create mode 100644 tx-pool/src/process/estimator_track_tx.rs create mode 100644 util/fee-estimator/Cargo.toml create mode 100644 util/fee-estimator/src/estimator.rs rename {tx-pool => util/fee-estimator}/src/fee_rate.rs (54%) create mode 100644 util/fee-estimator/src/lib.rs create mode 100644 util/fee-estimator/src/tx_confirm_stat.rs diff --git a/Cargo.lock b/Cargo.lock index 2924499cd9..724e1d571d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,6 +531,16 @@ dependencies = [ "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ckb-fee-estimator" +version = "0.24.0-pre" +dependencies = [ + "ckb-logger 0.24.0-pre", + "ckb-types 0.24.0-pre", + "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.93 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ckb-fixed-hash" version = "0.24.0-pre" @@ -830,6 +840,7 @@ dependencies = [ "ckb-dao 0.24.0-pre", "ckb-dao-utils 0.24.0-pre", "ckb-error 0.24.0-pre", + "ckb-fee-estimator 0.24.0-pre", "ckb-indexer 0.24.0-pre", "ckb-jsonrpc-types 0.24.0-pre", "ckb-logger 0.24.0-pre", @@ -853,6 +864,7 @@ dependencies = [ "jsonrpc-server-utils 10.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_assertions 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.19 (registry+https://github.com/rust-lang/crates.io-index)", "sentry 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1022,6 +1034,7 @@ dependencies = [ "ckb-chain-spec 0.24.0-pre", "ckb-dao 0.24.0-pre", "ckb-error 0.24.0-pre", + "ckb-fee-estimator 0.24.0-pre", "ckb-future-executor 0.24.0-pre", "ckb-jsonrpc-types 0.24.0-pre", "ckb-logger 0.24.0-pre", diff --git a/Cargo.toml b/Cargo.toml index b209d5edb4..fc17d91cca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "util/types", "util/future-executor", "util/jsonrpc-types", + "util/fee-estimator", "script/data-loader", "db", "resource", diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs index 720f069898..ce5a41c874 100644 --- a/benches/benches/benchmarks/overall.rs +++ b/benches/benches/benchmarks/overall.rs @@ -8,7 +8,7 @@ use ckb_shared::{ Snapshot, }; use ckb_store::ChainStore; -use ckb_tx_pool::{fee_rate::FeeRate, BlockAssemblerConfig, TxPoolConfig}; +use ckb_tx_pool::{BlockAssemblerConfig, FeeRate, TxPoolConfig}; use ckb_types::{ bytes::Bytes, core::{ diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 3243a021db..a4d56f66c5 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -15,6 +15,7 @@ ckb-sync = { path = "../sync" } ckb-chain = { path = "../chain" } ckb-logger = { path = "../util/logger"} ckb-network-alert = { path = "../util/network-alert" } +ckb-fee-estimator = { path = "../util/fee-estimator" } jsonrpc-core = "10.1" jsonrpc-derive = "10.1" jsonrpc-http-server = { git = "https://github.com/nervosnetwork/jsonrpc", rev = "7c101f83a8fe34369c1b7a0e9b6721fcb0f91ee0" } @@ -42,3 +43,4 @@ ckb-test-chain-utils = { path = "../util/test-chain-utils" } tempfile = "3.0" pretty_assertions = "0.6.1" ckb-dao-utils = { path = "../util/dao/utils" } +rand = "0.7" diff --git a/rpc/README.md b/rpc/README.md index 1014c52d91..2ec3509943 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -21,6 +21,7 @@ NOTE: This file is auto-generated. Please don't update this file directly; inste * [`dry_run_transaction`](#dry_run_transaction) * [`_compute_transaction_hash`](#_compute_transaction_hash) * [`calculate_dao_maximum_withdraw`](#calculate_dao_maximum_withdraw) + * [`estimate_fee_rate`](#estimate_fee_rate) * [`_compute_script_hash`](#_compute_script_hash) * [`Indexer`](#indexer) * [`index_lock_hash`](#index_lock_hash) @@ -898,6 +899,41 @@ http://localhost:8114 } ``` +### `estimate_fee_rate` + +Estimate a fee rate (capacity/KB) for a transaction that to be committed in expect blocks. + +This method estimate fee rate by sample transactions that collected from p2p network +expected_confirm_blocks must be between 3 and 1000 +an error will return if samples is not enough + + +#### Examples + +```bash +echo '{ + "id": 2, + "jsonrpc": "2.0", + "method": "estimate_fee_rate", + "params": [ + "0xa" + ] +}' \ +| tr -d '\n' \ +| curl -H 'content-type: application/json' -d @- \ +http://localhost:8114 +``` + +```json +{ + "id": 2, + "jsonrpc": "2.0", + "result": { + "fee_rate": "0x7d0" + } +} +``` + ### `_compute_script_hash` Returns script hash of given transaction script diff --git a/rpc/json/rpc.json b/rpc/json/rpc.json index 2fed776e35..c6b03de8a4 100644 --- a/rpc/json/rpc.json +++ b/rpc/json/rpc.json @@ -559,6 +559,17 @@ } ] }, + { + "description": "Estimate a fee rate (capacity/KB) for a transaction that to be committed in expect blocks.\n\nThis method estimate fee rate by sample transactions that collected from p2p network\nexpected_confirm_blocks must be between 3 and 1000\nan error will return if samples is not enough", + "method": "estimate_fee_rate", + "module": "experiment", + "params": [ + "0xa" + ], + "result": { + "fee_rate": "0x7d0" + } + }, { "description": "Send new transaction into transaction pool\n\nIf of is not specified, loads the corresponding input cell. If is specified, load the corresponding input cell only if the corresponding block exist and contain this cell as output.", "method": "send_transaction", diff --git a/rpc/src/module/experiment.rs b/rpc/src/module/experiment.rs index 4003c04e5c..28b0fc496f 100644 --- a/rpc/src/module/experiment.rs +++ b/rpc/src/module/experiment.rs @@ -1,6 +1,9 @@ use crate::error::RPCError; use ckb_dao::DaoCalculator; -use ckb_jsonrpc_types::{Capacity, DryRunResult, OutPoint, Script, Transaction}; +use ckb_fee_estimator::MAX_CONFIRM_BLOCKS; +use ckb_jsonrpc_types::{ + Capacity, DryRunResult, EstimateResult, OutPoint, Script, Transaction, Uint64, +}; use ckb_logger::error; use ckb_shared::{shared::Shared, Snapshot}; use ckb_store::ChainStore; @@ -31,6 +34,10 @@ pub trait ExperimentRpc { #[rpc(name = "calculate_dao_maximum_withdraw")] fn calculate_dao_maximum_withdraw(&self, _out_point: OutPoint, _hash: H256) -> Result; + + // Estimate fee + #[rpc(name = "estimate_fee_rate")] + fn estimate_fee_rate(&self, expect_confirm_blocks: Uint64) -> Result; } pub(crate) struct ExperimentRpcImpl { @@ -65,6 +72,40 @@ impl ExperimentRpc for ExperimentRpcImpl { } } } + + fn estimate_fee_rate(&self, expect_confirm_blocks: Uint64) -> Result { + let expect_confirm_blocks = expect_confirm_blocks.value() as usize; + // A tx need 1 block to propose, then 2 block to get confirmed + // so at least confirm blocks is 3 blocks. + if expect_confirm_blocks < 3 || expect_confirm_blocks > MAX_CONFIRM_BLOCKS { + return Err(RPCError::custom( + RPCError::Invalid, + format!( + "expect_confirm_blocks should between 3 and {}, got {}", + MAX_CONFIRM_BLOCKS, expect_confirm_blocks + ), + )); + } + + let tx_pool = self.shared.tx_pool_controller(); + let fee_rate = tx_pool.estimate_fee_rate(expect_confirm_blocks); + if let Err(e) = fee_rate { + error!("send estimate_fee_rate request error {}", e); + return Err(Error::internal_error()); + }; + let fee_rate = fee_rate.unwrap(); + + if fee_rate.as_u64() == 0 { + return Err(RPCError::custom( + RPCError::Invalid, + "collected samples is not enough, please make sure node has peers and try later" + .into(), + )); + } + Ok(EstimateResult { + fee_rate: fee_rate.as_u64().into(), + }) + } } // DryRunner dry run given transaction, and return the result, including execution cycles. diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index b9da48ccd3..12b188b90e 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -4,7 +4,7 @@ use ckb_logger::error; use ckb_network::PeerIndex; use ckb_shared::shared::Shared; use ckb_sync::SyncSharedState; -use ckb_tx_pool::{error::SubmitTxError, fee_rate::FeeRate}; +use ckb_tx_pool::{error::SubmitTxError, FeeRate}; use ckb_types::{ core::{self}, packed, diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index ae3c23f4e0..1a2956060a 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -11,7 +11,7 @@ use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier use ckb_shared::shared::Shared; use ckb_sync::SyncSharedState; use ckb_sync::Synchronizer; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_util::Mutex; use jsonrpc_core::IoHandler; use std::sync::Arc; diff --git a/rpc/src/test.rs b/rpc/src/test.rs index d8f3916daa..1e03acfc83 100644 --- a/rpc/src/test.rs +++ b/rpc/src/test.rs @@ -20,14 +20,16 @@ use ckb_shared::{ use ckb_store::ChainStore; use ckb_sync::{SyncSharedState, Synchronizer}; use ckb_test_chain_utils::{always_success_cell, always_success_cellbase}; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_types::{ core::{ capacity_bytes, cell::resolve_transaction, BlockBuilder, BlockView, Capacity, EpochNumberWithFraction, HeaderView, TransactionBuilder, TransactionView, }, h256, - packed::{AlertBuilder, CellDep, CellInput, CellOutputBuilder, OutPoint, RawAlertBuilder}, + packed::{ + AlertBuilder, Byte32, CellDep, CellInput, CellOutputBuilder, OutPoint, RawAlertBuilder, + }, prelude::*, H256, }; @@ -36,6 +38,7 @@ use jsonrpc_http_server::ServerBuilder; use jsonrpc_server_utils::cors::AccessControlAllowOrigin; use jsonrpc_server_utils::hosts::DomainsValidation; use pretty_assertions::assert_eq as pretty_assert_eq; +use rand::{thread_rng, Rng}; use reqwest; use serde::ser::Serialize; use serde_derive::{Deserialize, Serialize}; @@ -146,17 +149,42 @@ fn setup_node(height: u64) -> (Shared, ChainController, RpcServer) { .build() .unwrap(); let chain_controller = ChainService::new(shared.clone(), table).start::<&str>(None); + let tx_pool_controller = shared.tx_pool_controller(); // Build chain, insert [1, height) blocks let mut parent = always_success_consensus().genesis_block; + // prepare fee estimator samples + let sample_txs: Vec = (0..30) + .map(|_| { + let mut buf = [0u8; 32]; + let mut rng = thread_rng(); + rng.fill(&mut buf); + buf.pack() + }) + .collect(); + let fee_rate = FeeRate::from_u64(2_000); + let send_height = height.saturating_sub(9); + for _ in 0..height { let block = next_block(&shared, &parent.header()); chain_controller .process_block(Arc::new(block.clone())) .expect("processing new block should be ok"); + // Fake fee estimator samples + if block.header().number() == send_height { + for tx_hash in sample_txs.clone() { + tx_pool_controller + .estimator_track_tx(tx_hash, fee_rate, send_height) + .expect("prepare estimator samples"); + } + } parent = block; } + // mark txs as confirmed + tx_pool_controller + .estimator_process_block(height + 1, sample_txs.into_iter()) + .expect("process estimator samples"); // Start network services let dir = tempfile::tempdir() @@ -402,6 +430,7 @@ fn params_of(shared: &Shared, method: &str) -> Value { let json_script: ckb_jsonrpc_types::Script = script.into(); vec![json!(json_script)] } + "estimate_fee_rate" => vec![json!("0xa")], "calculate_dao_maximum_withdraw" => vec![json!(always_success_out_point), json!(tip_hash)], "get_block_template" => vec![json!(null), json!(null), json!(null)], "submit_block" => { diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index dd8ad53a2c..a884f9a039 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -28,7 +28,7 @@ use crate::BAD_MESSAGE_BAN_TIME; use ckb_chain::chain::ChainController; use ckb_logger::{debug_target, info_target, trace_target}; use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession}; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_types::{ core::{self}, packed::{self, Byte32, ProposalShortId}, diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs index 218ae4b56d..1f45bf55f5 100644 --- a/sync/src/relayer/tests/helper.rs +++ b/sync/src/relayer/tests/helper.rs @@ -7,7 +7,7 @@ use ckb_network::{ use ckb_shared::shared::{Shared, SharedBuilder}; use ckb_store::ChainStore; use ckb_test_chain_utils::always_success_cell; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_types::prelude::*; use ckb_types::{ bytes::Bytes, diff --git a/test/src/main.rs b/test/src/main.rs index 8b9980b68a..f17844c467 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -250,6 +250,7 @@ fn all_specs() -> SpecMap { Box::new(ReferenceHeaderMaturity), Box::new(ValidSince), Box::new(SendLowFeeRateTx), + Box::new(FeeEstimate), Box::new(DifferentTxsWithSameInput), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), diff --git a/test/src/rpc.rs b/test/src/rpc.rs index 45d2958cbe..d72c2cf519 100644 --- a/test/src/rpc.rs +++ b/test/src/rpc.rs @@ -1,8 +1,9 @@ use ckb_jsonrpc_types::{ Alert, BannedAddr, Block, BlockNumber, BlockReward, BlockTemplate, BlockView, Capacity, CellOutputWithOutPoint, CellTransaction, CellWithStatus, ChainInfo, Cycle, DryRunResult, - EpochNumber, EpochView, HeaderView, LiveCell, LockHashIndexState, Node, OutPoint, PeerState, - Timestamp, Transaction, TransactionWithStatus, TxPoolInfo, Uint64, Version, + EpochNumber, EpochView, EstimateResult, HeaderView, LiveCell, LockHashIndexState, Node, + OutPoint, PeerState, Timestamp, Transaction, TransactionWithStatus, TxPoolInfo, Uint64, + Version, }; use ckb_types::core::{ BlockNumber as CoreBlockNumber, Capacity as CoreCapacity, EpochNumber as CoreEpochNumber, @@ -359,6 +360,14 @@ impl RpcClient { .expect("rpc call get_cellbase_output_capacity_details") .expect("get_cellbase_output_capacity_details return none") } + + pub fn estimate_fee_rate(&self, expect_confirm_blocks: Uint64) -> EstimateResult { + self.inner() + .lock() + .estimate_fee_rate(expect_confirm_blocks) + .call() + .expect("rpc call estimate_fee_rate") + } } jsonrpc_client!(pub struct Inner { @@ -420,4 +429,5 @@ jsonrpc_client!(pub struct Inner { pub fn calculate_dao_maximum_withdraw(&mut self, _out_point: OutPoint, _hash: H256) -> RpcRequest; pub fn get_cellbase_output_capacity_details(&mut self, _hash: H256) -> RpcRequest>; pub fn broadcast_transaction(&mut self, tx: Transaction, cycles: Cycle) -> RpcRequest; + pub fn estimate_fee_rate(&mut self, expect_confirm_blocks: Uint64) -> RpcRequest; }); diff --git a/test/src/specs/mod.rs b/test/src/specs/mod.rs index 1b135b5002..a0a8463d17 100644 --- a/test/src/specs/mod.rs +++ b/test/src/specs/mod.rs @@ -23,7 +23,7 @@ use ckb_app_config::CKBAppConfig; use ckb_chain_spec::ChainSpec; use ckb_network::{ProtocolId, ProtocolVersion}; use ckb_sync::NetworkProtocol; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; #[macro_export] macro_rules! name { diff --git a/test/src/specs/relay/transaction_relay_low_fee_rate.rs b/test/src/specs/relay/transaction_relay_low_fee_rate.rs index ae6affcbe5..af346104ae 100644 --- a/test/src/specs/relay/transaction_relay_low_fee_rate.rs +++ b/test/src/specs/relay/transaction_relay_low_fee_rate.rs @@ -2,7 +2,7 @@ use crate::utils::wait_until; use crate::{Net, Spec, DEFAULT_TX_PROPOSAL_WINDOW}; use ckb_app_config::CKBAppConfig; use ckb_jsonrpc_types::Status; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_types::{core::TransactionView, packed, prelude::*}; use log::info; diff --git a/test/src/specs/tx_pool/fee_estimate.rs b/test/src/specs/tx_pool/fee_estimate.rs new file mode 100644 index 0000000000..2f7d32a6a1 --- /dev/null +++ b/test/src/specs/tx_pool/fee_estimate.rs @@ -0,0 +1,140 @@ +use crate::rpc::RpcClient; +use crate::utils::wait_until; +use crate::{Net, Node, Spec, DEFAULT_TX_PROPOSAL_WINDOW}; +use ckb_app_config::CKBAppConfig; +use ckb_jsonrpc_types::Status; +use ckb_tx_pool::FeeRate; +use ckb_types::{ + core::{Capacity, TransactionView}, + prelude::*, +}; +use log::info; +use rand::{seq::SliceRandom, thread_rng}; + +const MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1_000); + +pub struct FeeEstimate; + +fn gen_txs_with_fee( + node: &Node, + count: usize, + min_fee_rate: FeeRate, + increment_fee_rate: FeeRate, +) -> Vec<(TransactionView, FeeRate)> { + let mut fee_rate = min_fee_rate; + let mut txs = Vec::new(); + for _ in 0..count { + node.generate_block(); + let block = node.get_tip_block(); + let cellbase = &block.transactions()[0]; + let capacity: u64 = cellbase.outputs().get(0).unwrap().capacity().unpack(); + let tx = node.new_transaction_with_since_capacity( + cellbase.hash(), + 0, + Capacity::shannons(capacity), + ); + let fee = fee_rate.fee(tx.data().serialized_size_in_block()); + let output = tx + .outputs() + .get(0) + .unwrap() + .as_builder() + .capacity((capacity - fee.as_u64()).pack()) + .build(); + let tx = tx.as_advanced_builder().set_outputs(vec![output]).build(); + txs.push((tx, fee_rate)); + fee_rate = FeeRate::from_u64(fee_rate.as_u64() + increment_fee_rate.as_u64()); + } + txs +} + +fn check_fee_esimate(client: &RpcClient, tx_fee_rates: Vec) { + // test wait more blocks should use less fee + let f1: u64 = client.estimate_fee_rate(3.into()).fee_rate.into(); + let f2: u64 = client.estimate_fee_rate(4.into()).fee_rate.into(); + let f3: u64 = client.estimate_fee_rate(5.into()).fee_rate.into(); + assert!(f1 > f2); + assert!(f2 > f3); + // test estimate fee should in a bound + let min_rate = tx_fee_rates.iter().min().unwrap(); + let max_rate = tx_fee_rates.iter().max().unwrap(); + let mut previous_fee_rate = f1; + for i in 3..42 { + let fee_rate: u64 = client.estimate_fee_rate((i as u64).into()).fee_rate.into(); + assert!( + fee_rate > 0, + "estimate fee should return a resonable result" + ); + assert!( + fee_rate <= previous_fee_rate, + "should not greater than lesser confirm blocks estimated fee" + ); + assert!( + fee_rate > min_rate.as_u64(), + "estimate fee should greater than min fee rate" + ); + assert!( + fee_rate < max_rate.as_u64(), + "estimate fee should less than max fee rate" + ); + previous_fee_rate = fee_rate; + } +} + +impl Spec for FeeEstimate { + crate::name!("fee_estimate"); + + fn run(&self, net: &mut Net) { + // 1. prepare some txs with fee + // 2. send them then commit them + // 3. check fee estiamte is between lower bound and upper bound + + let node0 = &net.nodes[0]; + node0.generate_blocks((DEFAULT_TX_PROPOSAL_WINDOW.1 + 2) as usize); + + let min_fee_rate = MIN_FEE_RATE; + let count = 100; + + let mut txs = gen_txs_with_fee(&node0, count, min_fee_rate, min_fee_rate); + + let mut rng = thread_rng(); + txs.shuffle(&mut rng); + + info!("Send transactions"); + for (tx, _) in &txs { + node0.rpc_client().send_transaction(tx.data().into()); + } + + // confirm all txs + let tx_size = txs[0].0.data().serialized_size_in_block(); + for _ in 0..10 { + // submit 30 txs in each block + node0.submit_block(&node0.new_block(Some((tx_size * 30) as u64), None, None)); + } + + let ret = wait_until(10, || { + txs.iter().all(|(tx, _)| { + node0 + .rpc_client() + .get_transaction(tx.hash().clone()) + .map(|r| r.tx_status.status == Status::Committed) + .unwrap_or(false) + }) + }); + + assert!(ret, "send txs should success"); + + info!("Check fee estimate"); + + check_fee_esimate( + node0.rpc_client(), + txs.iter().map(|(_, fee_rate)| *fee_rate).collect(), + ); + } + + fn modify_ckb_config(&self) -> Box ()> { + Box::new(|config| { + config.tx_pool.min_fee_rate = MIN_FEE_RATE; + }) + } +} diff --git a/test/src/specs/tx_pool/limit.rs b/test/src/specs/tx_pool/limit.rs index 277ca18cc5..7ce738e57a 100644 --- a/test/src/specs/tx_pool/limit.rs +++ b/test/src/specs/tx_pool/limit.rs @@ -1,7 +1,7 @@ use crate::utils::assert_send_transaction_fail; use crate::{Net, Spec, DEFAULT_TX_PROPOSAL_WINDOW}; use ckb_app_config::CKBAppConfig; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use log::info; pub struct SizeLimit; diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index c296aa6126..fa9f6ba35c 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -3,6 +3,7 @@ mod collision; mod depend_tx_in_same_block; mod descendant; mod different_txs_with_same_input; +mod fee_estimate; mod limit; mod pool_reconcile; mod pool_resurrect; @@ -19,6 +20,7 @@ pub use collision::*; pub use depend_tx_in_same_block::*; pub use descendant::*; pub use different_txs_with_same_input::*; +pub use fee_estimate::*; pub use limit::*; pub use pool_reconcile::*; pub use pool_resurrect::*; diff --git a/test/src/specs/tx_pool/send_low_fee_rate_tx.rs b/test/src/specs/tx_pool/send_low_fee_rate_tx.rs index a7e6b28fc8..51c2c31c32 100644 --- a/test/src/specs/tx_pool/send_low_fee_rate_tx.rs +++ b/test/src/specs/tx_pool/send_low_fee_rate_tx.rs @@ -1,7 +1,7 @@ use crate::utils::wait_until; use crate::{Net, Spec, DEFAULT_TX_PROPOSAL_WINDOW}; use ckb_app_config::CKBAppConfig; -use ckb_tx_pool::fee_rate::FeeRate; +use ckb_tx_pool::FeeRate; use ckb_types::{core::TransactionView, packed, prelude::*}; use log::info; diff --git a/tx-pool/Cargo.toml b/tx-pool/Cargo.toml index 1f369d55e7..9fcd4af7a8 100644 --- a/tx-pool/Cargo.toml +++ b/tx-pool/Cargo.toml @@ -29,3 +29,4 @@ tokio = "0.1" crossbeam-channel = "0.3" ckb-future-executor = { path = "../util/future-executor" } ckb-stop-handler = { path = "../util/stop-handler" } +ckb-fee-estimator = { path = "../util/fee-estimator" } diff --git a/tx-pool/src/component/commit_txs_scanner.rs b/tx-pool/src/component/commit_txs_scanner.rs index 2d8151fb69..e6b3256e40 100644 --- a/tx-pool/src/component/commit_txs_scanner.rs +++ b/tx-pool/src/component/commit_txs_scanner.rs @@ -3,7 +3,7 @@ use crate::{ entry::{TxEntry, TxModifiedEntries}, proposed::ProposedPool, }, - fee_rate::FeeRate, + FeeRate, }; use ckb_types::{core::Cycle, packed::ProposalShortId}; use std::cmp::max; diff --git a/tx-pool/src/component/pending.rs b/tx-pool/src/component/pending.rs index ffba89d28d..15476712bd 100644 --- a/tx-pool/src/component/pending.rs +++ b/tx-pool/src/component/pending.rs @@ -1,6 +1,6 @@ use crate::component::container::{AncestorsScoreSortKey, SortedTxMap}; use crate::component::entry::TxEntry; -use crate::fee_rate::FeeRate; +use crate::FeeRate; use ckb_types::{ core::{ cell::{CellMetaBuilder, CellProvider, CellStatus}, diff --git a/tx-pool/src/config.rs b/tx-pool/src/config.rs index 3b6a9790fd..d300b6bca9 100644 --- a/tx-pool/src/config.rs +++ b/tx-pool/src/config.rs @@ -1,4 +1,4 @@ -use crate::fee_rate::FeeRate; +use crate::FeeRate; use ckb_jsonrpc_types::{JsonBytes, ScriptHashType}; use ckb_types::core::Cycle; use ckb_types::H256; diff --git a/tx-pool/src/lib.rs b/tx-pool/src/lib.rs index 2da2b3692b..35e0031abe 100644 --- a/tx-pool/src/lib.rs +++ b/tx-pool/src/lib.rs @@ -2,13 +2,13 @@ mod block_assembler; mod component; mod config; pub mod error; -pub mod fee_rate; pub mod pool; mod process; pub mod service; pub(crate) const LOG_TARGET_TX_POOL: &str = "ckb-tx-pool"; +pub use ckb_fee_estimator::FeeRate; pub use component::entry::TxEntry; pub use config::{BlockAssemblerConfig, TxPoolConfig}; pub use process::PlugTarget; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index b20cfcc2e7..e46acac35d 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -6,6 +6,7 @@ use crate::component::proposed::ProposedPool; use crate::config::TxPoolConfig; use ckb_dao::DaoCalculator; use ckb_error::{Error, ErrorKind, InternalErrorKind}; +use ckb_fee_estimator::Estimator as FeeEstimator; use ckb_logger::{debug_target, error_target, trace_target}; use ckb_snapshot::Snapshot; use ckb_store::ChainStore; @@ -49,6 +50,8 @@ pub struct TxPool { pub(crate) total_tx_size: usize, // sum of all tx_pool tx's cycles. pub(crate) total_tx_cycles: Cycle, + // tx fee estimator + pub(crate) fee_estimator: FeeEstimator, pub snapshot: Arc, } @@ -83,6 +86,7 @@ impl TxPool { total_tx_size: 0, total_tx_cycles: 0, snapshot, + fee_estimator: FeeEstimator::default(), } } @@ -666,6 +670,7 @@ impl TxPool { ) -> Option<(Byte32, CacheEntry)> { let mut ret = None; let tx_hash = tx.hash(); + let mut readd_tx = false; let cache_entry = txs_verify_cache.get(&tx_hash).cloned(); let tx_short_id = tx.proposal_short_id(); let tx_size = tx.data().serialized_size_in_block(); @@ -673,22 +678,29 @@ impl TxPool { if let Ok(new_cache_entry) = self.proposed_tx_and_descendants(cache_entry, tx_size, tx) { if cache_entry.is_none() { - ret = Some((tx_hash, new_cache_entry)); + ret = Some((tx_hash.clone(), new_cache_entry)); } self.update_statics_for_add_tx(tx_size, new_cache_entry.cycles); + readd_tx = true; } } else if snapshot.proposals().contains_gap(&tx_short_id) { if let Ok(new_cache_entry) = self.gap_tx(cache_entry, tx_size, tx) { if cache_entry.is_none() { - ret = Some((tx_hash, new_cache_entry)); + ret = Some((tx_hash.clone(), new_cache_entry)); } self.update_statics_for_add_tx(tx_size, cache_entry.map(|c| c.cycles).unwrap_or(0)); + readd_tx = true; } } else if let Ok(new_cache_entry) = self.pending_tx(cache_entry, tx_size, tx) { if cache_entry.is_none() { - ret = Some((tx_hash, new_cache_entry)); + ret = Some((tx_hash.clone(), new_cache_entry)); } self.update_statics_for_add_tx(tx_size, cache_entry.map(|c| c.cycles).unwrap_or(0)); + readd_tx = true; + } + + if !readd_tx { + self.fee_estimator.drop_tx(&tx_hash); } ret } diff --git a/tx-pool/src/process/chain_reorg.rs b/tx-pool/src/process/chain_reorg.rs index f877738564..dda2d508e3 100644 --- a/tx-pool/src/process/chain_reorg.rs +++ b/tx-pool/src/process/chain_reorg.rs @@ -87,7 +87,11 @@ pub fn update_tx_pool_for_reorg( } for blk in attached_blocks { - attached.extend(blk.transactions().iter().skip(1).cloned()) + attached.extend(blk.transactions().iter().skip(1).cloned()); + tx_pool.fee_estimator.process_block( + blk.header().number(), + blk.transactions().iter().skip(1).map(|tx| tx.hash()), + ); } let retain: Vec = detached.difference(&attached).cloned().collect(); diff --git a/tx-pool/src/process/estimate_fee_rate.rs b/tx-pool/src/process/estimate_fee_rate.rs new file mode 100644 index 0000000000..46acff68cd --- /dev/null +++ b/tx-pool/src/process/estimate_fee_rate.rs @@ -0,0 +1,34 @@ +use crate::pool::TxPool; +use crate::FeeRate; +use futures::future::Future; +use tokio::prelude::{Async, Poll}; +use tokio::sync::lock::Lock; + +pub struct EstimateFeeRateProcess { + pub tx_pool: Lock, + pub expect_confirm_blocks: usize, +} + +impl EstimateFeeRateProcess { + pub fn new(tx_pool: Lock, expect_confirm_blocks: usize) -> EstimateFeeRateProcess { + EstimateFeeRateProcess { + tx_pool, + expect_confirm_blocks, + } + } +} + +impl Future for EstimateFeeRateProcess { + type Item = FeeRate; + type Error = (); + + fn poll(&mut self) -> Poll { + match self.tx_pool.poll_lock() { + Async::Ready(guard) => { + let fee_rate = guard.fee_estimator.estimate(self.expect_confirm_blocks); + Ok(Async::Ready(fee_rate)) + } + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/tx-pool/src/process/estimator_process_block.rs b/tx-pool/src/process/estimator_process_block.rs new file mode 100644 index 0000000000..9c09ac2515 --- /dev/null +++ b/tx-pool/src/process/estimator_process_block.rs @@ -0,0 +1,42 @@ +use crate::pool::TxPool; +use ckb_types::packed::Byte32; +use futures::future::Future; +use tokio::prelude::{Async, Poll}; +use tokio::sync::lock::Lock; + +pub struct EstimatorProcessBlockProcess { + pub tx_pool: Lock, + pub height: u64, + pub txs: Vec, +} + +impl EstimatorProcessBlockProcess { + pub fn new( + tx_pool: Lock, + height: u64, + txs: Vec, + ) -> EstimatorProcessBlockProcess { + EstimatorProcessBlockProcess { + tx_pool, + height, + txs, + } + } +} + +impl Future for EstimatorProcessBlockProcess { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.tx_pool.poll_lock() { + Async::Ready(mut guard) => { + guard + .fee_estimator + .process_block(self.height, self.txs.iter().cloned()); + Ok(Async::Ready(())) + } + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/tx-pool/src/process/estimator_track_tx.rs b/tx-pool/src/process/estimator_track_tx.rs new file mode 100644 index 0000000000..66f33bcbdb --- /dev/null +++ b/tx-pool/src/process/estimator_track_tx.rs @@ -0,0 +1,46 @@ +use crate::pool::TxPool; +use crate::FeeRate; +use ckb_types::packed::Byte32; +use futures::future::Future; +use tokio::prelude::{Async, Poll}; +use tokio::sync::lock::Lock; + +pub struct EstimatorTrackTxProcess { + pub tx_pool: Lock, + pub tx_hash: Byte32, + pub fee_rate: FeeRate, + pub height: u64, +} + +impl EstimatorTrackTxProcess { + pub fn new( + tx_pool: Lock, + tx_hash: Byte32, + fee_rate: FeeRate, + height: u64, + ) -> EstimatorTrackTxProcess { + EstimatorTrackTxProcess { + tx_pool, + tx_hash, + fee_rate, + height, + } + } +} + +impl Future for EstimatorTrackTxProcess { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.tx_pool.poll_lock() { + Async::Ready(mut guard) => { + guard + .fee_estimator + .track_tx(self.tx_hash.clone(), self.fee_rate, self.height); + Ok(Async::Ready(())) + } + Async::NotReady => Ok(Async::NotReady), + } + } +} diff --git a/tx-pool/src/process/mod.rs b/tx-pool/src/process/mod.rs index c2efba25a3..829094749d 100644 --- a/tx-pool/src/process/mod.rs +++ b/tx-pool/src/process/mod.rs @@ -1,5 +1,8 @@ mod block_template; mod chain_reorg; +mod estimate_fee_rate; +mod estimator_process_block; +mod estimator_track_tx; mod fetch_tx_for_rpc; mod fetch_txs; mod fetch_txs_with_cycles; @@ -15,6 +18,9 @@ pub use block_template::{ }; pub use chain_reorg::ChainReorgProcess; pub use ckb_verification::txs_verify_cache::{FetchCache, UpdateCache}; +pub use estimate_fee_rate::EstimateFeeRateProcess; +pub use estimator_process_block::EstimatorProcessBlockProcess; +pub use estimator_track_tx::EstimatorTrackTxProcess; pub use fetch_tx_for_rpc::FetchTxRPCProcess; pub use fetch_txs::FetchTxsProcess; pub use fetch_txs_with_cycles::FetchTxsWithCyclesProcess; diff --git a/tx-pool/src/process/submit_txs.rs b/tx-pool/src/process/submit_txs.rs index 8a14f9d456..c458d34fb4 100644 --- a/tx-pool/src/process/submit_txs.rs +++ b/tx-pool/src/process/submit_txs.rs @@ -1,6 +1,7 @@ use crate::component::entry::TxEntry; use crate::error::SubmitTxError; use crate::pool::TxPool; +use crate::FeeRate; use ckb_error::{Error, InternalErrorKind}; use ckb_snapshot::Snapshot; use ckb_types::{ @@ -219,7 +220,18 @@ impl<'a> SubmitTxsExecutor<'a> { related_dep_out_points, ); if match status { - TxStatus::Fresh => self.tx_pool.add_pending(entry), + TxStatus::Fresh => { + let tx_hash = entry.transaction.hash(); + let inserted = self.tx_pool.add_pending(entry); + if inserted { + let height = self.tx_pool.snapshot().tip_number(); + let fee_rate = FeeRate::calculate(fee, tx_size); + self.tx_pool + .fee_estimator + .track_tx(tx_hash, fee_rate, height); + } + inserted + } TxStatus::Gap => self.tx_pool.add_gap(entry), TxStatus::Proposed => self.tx_pool.add_proposed(entry), } { diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index be66f76877..673953354e 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -5,11 +5,13 @@ use crate::config::TxPoolConfig; use crate::pool::{TxPool, TxPoolInfo}; use crate::process::{ BlockTemplateBuilder, BlockTemplateCacheProcess, BuildCellbaseProcess, ChainReorgProcess, - FetchCache, FetchTxRPCProcess, FetchTxsProcess, FetchTxsWithCyclesProcess, - FreshProposalsFilterProcess, NewUncleProcess, PackageTxsProcess, PlugEntryProcess, PlugTarget, - PreResolveTxsProcess, PrepareUnclesProcess, SubmitTxsProcess, TxPoolInfoProcess, - UpdateBlockTemplateCache, UpdateCache, VerifyTxsProcess, + EstimateFeeRateProcess, EstimatorProcessBlockProcess, EstimatorTrackTxProcess, FetchCache, + FetchTxRPCProcess, FetchTxsProcess, FetchTxsWithCyclesProcess, FreshProposalsFilterProcess, + NewUncleProcess, PackageTxsProcess, PlugEntryProcess, PlugTarget, PreResolveTxsProcess, + PrepareUnclesProcess, SubmitTxsProcess, TxPoolInfoProcess, UpdateBlockTemplateCache, + UpdateCache, VerifyTxsProcess, }; +use crate::FeeRate; use ckb_error::{Error, InternalErrorKind}; use ckb_future_executor::{new_executor, Executor}; use ckb_jsonrpc_types::BlockTemplate; @@ -87,6 +89,9 @@ pub enum Message { FetchTxRPC(Request>), NewUncle(Notify), PlugEntry(Request<(Vec, PlugTarget), ()>), + EstimateFeeRate(Request), + EstimatorTrackTx(Request<(Byte32, FeeRate, u64), ()>), + EstimatorProcessBlock(Request<(u64, Vec), ()>), } #[derive(Clone)] @@ -227,6 +232,39 @@ impl TxPoolController { sender.try_send(Message::FetchTxsWithCycles(request))?; response.recv().map_err(Into::into) } + + pub fn estimate_fee_rate(&self, expect_confirm_blocks: usize) -> Result { + let mut sender = self.sender.clone(); + let (responder, response) = crossbeam_channel::bounded(1); + let request = Request::call(expect_confirm_blocks, responder); + sender.try_send(Message::EstimateFeeRate(request))?; + response.recv().map_err(Into::into) + } + + pub fn estimator_track_tx( + &self, + tx_hash: Byte32, + fee_rate: FeeRate, + height: u64, + ) -> Result<(), FailureError> { + let mut sender = self.sender.clone(); + let (responder, response) = crossbeam_channel::bounded(1); + let request = Request::call((tx_hash, fee_rate, height), responder); + sender.try_send(Message::EstimatorTrackTx(request))?; + response.recv().map_err(Into::into) + } + + pub fn estimator_process_block( + &self, + height: u64, + txs: impl Iterator, + ) -> Result<(), FailureError> { + let mut sender = self.sender.clone(); + let (responder, response) = crossbeam_channel::bounded(1); + let request = Request::call((height, txs.collect::>()), responder); + sender.try_send(Message::EstimatorProcessBlock(request))?; + response.recv().map_err(Into::into) + } } pub struct TxPoolServiceBuilder { @@ -410,6 +448,40 @@ impl TxPoolService { }; future::ok(()) })), + Message::EstimateFeeRate(Request { + responder, + arguments: expect_confirm_blocks, + }) => Box::new(self.estimate_fee_rate(expect_confirm_blocks).and_then( + move |fee_rate| { + if let Err(e) = responder.send(fee_rate) { + error!("responder send estimate_fee_rate failed {:?}", e) + }; + future::ok(()) + }, + )), + Message::EstimatorTrackTx(Request { + responder, + arguments: (tx_hash, fee_rate, height), + }) => Box::new(self.estimator_track_tx(tx_hash, fee_rate, height).and_then( + move |_| { + if let Err(e) = responder.send(()) { + error!("responder send estimator_track_tx failed {:?}", e) + }; + future::ok(()) + }, + )), + Message::EstimatorProcessBlock(Request { + responder, + arguments: (height, txs), + }) => Box::new( + self.estimator_process_block(height, txs) + .and_then(move |_| { + if let Err(e) = responder.send(()) { + error!("responder send estimator_process_block failed {:?}", e) + }; + future::ok(()) + }), + ), } } @@ -621,4 +693,28 @@ impl TxPoolService { )) } } + + pub fn estimate_fee_rate( + &self, + expect_confirm_blocks: usize, + ) -> impl Future { + EstimateFeeRateProcess::new(self.tx_pool.clone(), expect_confirm_blocks) + } + + pub fn estimator_track_tx( + &self, + tx_hash: Byte32, + fee_rate: FeeRate, + height: u64, + ) -> impl Future { + EstimatorTrackTxProcess::new(self.tx_pool.clone(), tx_hash, fee_rate, height) + } + + pub fn estimator_process_block( + &self, + height: u64, + txs: Vec, + ) -> impl Future { + EstimatorProcessBlockProcess::new(self.tx_pool.clone(), height, txs) + } } diff --git a/util/fee-estimator/Cargo.toml b/util/fee-estimator/Cargo.toml new file mode 100644 index 0000000000..dd7eb65fff --- /dev/null +++ b/util/fee-estimator/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ckb-fee-estimator" +version = "0.24.0-pre" +authors = ["Nervos Core Dev "] +edition = "2018" +license = "MIT" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ckb-types = { path = "../types" } +ckb-logger = { path = "../logger" } +serde = "1.0" +serde_derive = "1.0" diff --git a/util/fee-estimator/src/estimator.rs b/util/fee-estimator/src/estimator.rs new file mode 100644 index 0000000000..c76286e78e --- /dev/null +++ b/util/fee-estimator/src/estimator.rs @@ -0,0 +1,144 @@ +use crate::tx_confirm_stat::TxConfirmStat; +use crate::FeeRate; +use ckb_logger::debug; +use ckb_types::packed::Byte32; +use std::collections::HashMap; + +pub const MAX_CONFIRM_BLOCKS: usize = 1000; +const MIN_BUCKET_FEERATE: f64 = 1000f64; +const MAX_BUCKET_FEERATE: f64 = 1e7; +const FEE_SPACING: f64 = 1.05f64; +const MIN_ESTIMATE_SAMPLES: usize = 20; +const MIN_ESTIMATE_CONFIRM_RATE: f64 = 0.85f64; +/// half life each 100 blocks, math.exp(math.log(0.5) / 100) +const DEFAULT_DECAY_FACTOR: f64 = 0.993; + +#[derive(Clone)] +struct TxRecord { + height: u64, + bucket_index: usize, + fee_rate: FeeRate, +} + +/// Fee Estimator +/// Estimator track new block and tx_pool to collect data +/// we track every new tx enter txpool and record the tip height and fee_rate, +/// when tx is packed into a new block or dropped by txpool, +/// we get a sample about how long a tx with X fee_rate can get confirmed or get dropped. +/// +/// In inner, we group samples by predefined fee_rate buckets. +/// To estimator fee_rate for a confirm target(how many blocks that a tx can get committed), +/// we travel through fee_rate buckets, try to find a fee_rate X to let a tx get committed +/// with high problities within confirm target blocks. +/// +#[derive(Clone)] +pub struct Estimator { + best_height: u64, + start_height: u64, + /// a data struct to track tx confirm status + tx_confirm_stat: TxConfirmStat, + tracked_txs: HashMap, +} + +impl Default for Estimator { + fn default() -> Self { + Self::new() + } +} + +impl Estimator { + pub fn new() -> Self { + let mut buckets = Vec::new(); + let mut bucket_fee_boundary = MIN_BUCKET_FEERATE; + // initialize fee_rate buckets + while bucket_fee_boundary <= MAX_BUCKET_FEERATE { + buckets.push(FeeRate::from_u64(bucket_fee_boundary as u64)); + bucket_fee_boundary *= FEE_SPACING; + } + Estimator { + best_height: 0, + start_height: 0, + tx_confirm_stat: TxConfirmStat::new(&buckets, MAX_CONFIRM_BLOCKS, DEFAULT_DECAY_FACTOR), + tracked_txs: Default::default(), + } + } + + fn process_block_tx(&mut self, height: u64, tx_hash: &Byte32) -> bool { + if let Some(tx) = self.drop_tx_inner(tx_hash, false) { + let blocks_to_confirm = height.saturating_sub(tx.height) as usize; + self.tx_confirm_stat + .add_confirmed_tx(blocks_to_confirm, tx.fee_rate); + true + } else { + // tx is not tracked + false + } + } + + /// process new block + /// record confirm blocks for txs which we tracked before. + pub fn process_block(&mut self, height: u64, txs: impl Iterator) { + // For simpfy, we assume chain reorg will not effect tx fee. + if height <= self.best_height { + return; + } + self.best_height = height; + // update tx confirm stat + self.tx_confirm_stat.move_track_window(height); + self.tx_confirm_stat.decay(); + let processed_txs = txs.filter(|tx| self.process_block_tx(height, tx)).count(); + if self.start_height == 0 && processed_txs > 0 { + // start record + self.start_height = self.best_height; + debug!("Fee estimator start recording at {}", self.start_height); + } + } + + /// track a tx that entered txpool + pub fn track_tx(&mut self, tx_hash: Byte32, fee_rate: FeeRate, height: u64) { + if self.tracked_txs.contains_key(&tx_hash) { + // already in track + return; + } + if height != self.best_height { + // ignore wrong height txs + return; + } + if let Some(bucket_index) = self.tx_confirm_stat.add_unconfirmed_tx(height, fee_rate) { + self.tracked_txs.insert( + tx_hash, + TxRecord { + height, + bucket_index, + fee_rate, + }, + ); + } + } + + fn drop_tx_inner(&mut self, tx_hash: &Byte32, count_failure: bool) -> Option { + self.tracked_txs.remove(tx_hash).map(|tx_record| { + self.tx_confirm_stat.remove_unconfirmed_tx( + tx_record.height, + self.best_height, + tx_record.bucket_index, + count_failure, + ); + tx_record + }) + } + + /// tx removed from txpool + pub fn drop_tx(&mut self, tx_hash: &Byte32) -> bool { + self.drop_tx_inner(tx_hash, true).is_some() + } + + /// estimate a fee rate for confirm target + pub fn estimate(&self, expect_confirm_blocks: usize) -> FeeRate { + self.tx_confirm_stat.estimate_median( + expect_confirm_blocks, + MIN_ESTIMATE_SAMPLES, + MIN_ESTIMATE_CONFIRM_RATE, + ) + } +} diff --git a/tx-pool/src/fee_rate.rs b/util/fee-estimator/src/fee_rate.rs similarity index 54% rename from tx-pool/src/fee_rate.rs rename to util/fee-estimator/src/fee_rate.rs index 04ba44bdca..7f5dbd45c7 100644 --- a/tx-pool/src/fee_rate.rs +++ b/util/fee-estimator/src/fee_rate.rs @@ -2,20 +2,33 @@ use ckb_types::core::Capacity; use serde_derive::{Deserialize, Serialize}; /// shannons per kilobytes -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct FeeRate(u64); +const KB: u64 = 1000; + impl FeeRate { + pub fn calculate(fee: Capacity, vbytes: usize) -> Self { + if vbytes == 0 { + return FeeRate::zero(); + } + FeeRate::from_u64(fee.as_u64().saturating_mul(KB) / (vbytes as u64)) + } + pub const fn from_u64(fee_per_kb: u64) -> Self { FeeRate(fee_per_kb) } + pub const fn as_u64(self) -> u64 { + self.0 + } + pub const fn zero() -> Self { Self::from_u64(0) } pub fn fee(self, size: usize) -> Capacity { - let fee = self.0.saturating_mul(size as u64) / 1000; + let fee = self.0.saturating_mul(size as u64) / KB; Capacity::shannons(fee) } } diff --git a/util/fee-estimator/src/lib.rs b/util/fee-estimator/src/lib.rs new file mode 100644 index 0000000000..ae1f131ad7 --- /dev/null +++ b/util/fee-estimator/src/lib.rs @@ -0,0 +1,6 @@ +mod estimator; +mod fee_rate; +mod tx_confirm_stat; + +pub use estimator::{Estimator, MAX_CONFIRM_BLOCKS}; +pub use fee_rate::FeeRate; diff --git a/util/fee-estimator/src/tx_confirm_stat.rs b/util/fee-estimator/src/tx_confirm_stat.rs new file mode 100644 index 0000000000..f4a6a44831 --- /dev/null +++ b/util/fee-estimator/src/tx_confirm_stat.rs @@ -0,0 +1,305 @@ +use crate::FeeRate; +use std::collections::BTreeMap; + +#[derive(Default, Debug, Clone)] +struct BucketStat { + total_fee_rate: FeeRate, + txs_count: f64, + old_unconfirmed_txs: usize, +} + +impl BucketStat { + // add a new fee rate to this bucket + fn new_fee_rate_sample(&mut self, fee_rate: FeeRate) { + self.txs_count += 1f64; + let total_fee_rate = self + .total_fee_rate + .as_u64() + .saturating_add(fee_rate.as_u64()); + self.total_fee_rate = FeeRate::from_u64(total_fee_rate); + } + + // get average fee rate from a bucket + fn avg_fee_rate(&self) -> FeeRate { + if self.txs_count > 0f64 { + FeeRate::from_u64(((self.total_fee_rate.as_u64() as f64) / self.txs_count) as u64) + } else { + FeeRate::zero() + } + } +} + +/// TxConfirmStat is a struct to help to estimate txs fee rate, +/// This struct record txs fee_rate and blocks that txs to be committed. +/// +/// We start from track unconfirmed txs, +/// When tx added to txpool, we increase the count of unconfirmed tx, we do opposite tx removed. +/// When a tx get committed, put it into bucket by tx fee_rate and confirmed blocks, +/// then decrease the count of unconfirmed txs. +/// +/// So we get a group of samples which includes txs count, average fee rate and confirmed blocks, etc. +/// For estimate, we loop through each bucket, calculate the confirmed txs rate, until meet the required_confirm_rate. +#[derive(Clone)] +pub struct TxConfirmStat { + /// per bucket stat + bucket_stats: Vec, + /// bucket upper bound fee_rate => bucket index + fee_rate_to_bucket: BTreeMap, + /// confirm_blocks => bucket index => confirmed txs count + confirm_blocks_to_confirmed_txs: Vec>, + /// confirm_blocks => bucket index => failed txs count + confirm_blocks_to_failed_txs: Vec>, + /// Track recent N blocks unconfirmed txs + /// tracked block index => bucket index => TxTracker + block_unconfirmed_txs: Vec>, + decay_factor: f64, +} + +impl TxConfirmStat { + pub fn new(buckets: &[FeeRate], max_confirm_blocks: usize, decay_factor: f64) -> Self { + let bucket_stats = vec![BucketStat::default(); buckets.len()]; + let confirm_blocks_to_confirmed_txs = vec![vec![0f64; buckets.len()]; max_confirm_blocks]; + let confirm_blocks_to_failed_txs = vec![vec![0f64; buckets.len()]; max_confirm_blocks]; + let block_unconfirmed_txs = vec![vec![0; buckets.len()]; max_confirm_blocks]; + let fee_rate_to_bucket = buckets + .iter() + .enumerate() + .map(|(i, fee_rate)| (*fee_rate, i)) + .collect(); + TxConfirmStat { + bucket_stats, + fee_rate_to_bucket, + block_unconfirmed_txs, + confirm_blocks_to_confirmed_txs, + confirm_blocks_to_failed_txs, + decay_factor, + } + } + + /// Return upper bound fee_rate bucket + /// assume we have three buckets with fee_rate [1.0, 2.0, 3.0], we return index 1 for fee_rate 1.5 + fn bucket_index_by_fee_rate(&self, fee_rate: FeeRate) -> Option { + self.fee_rate_to_bucket + .range(fee_rate..) + .next() + .map(|(_fee_rate, index)| *index) + } + + fn max_confirms(&self) -> usize { + self.confirm_blocks_to_confirmed_txs.len() + } + + // add confirmed sample + pub fn add_confirmed_tx(&mut self, blocks_to_confirm: usize, fee_rate: FeeRate) { + if blocks_to_confirm < 1 { + return; + } + let bucket_index = match self.bucket_index_by_fee_rate(fee_rate) { + Some(index) => index, + None => return, + }; + // increase txs_count in buckets + for i in (blocks_to_confirm - 1)..self.max_confirms() { + self.confirm_blocks_to_confirmed_txs[i][bucket_index] += 1f64; + } + let stat = &mut self.bucket_stats[bucket_index]; + stat.new_fee_rate_sample(fee_rate); + } + + // track an unconfirmed tx + // entry_height - tip number when tx enter txpool + pub fn add_unconfirmed_tx(&mut self, entry_height: u64, fee_rate: FeeRate) -> Option { + let bucket_index = match self.bucket_index_by_fee_rate(fee_rate) { + Some(index) => index, + None => return None, + }; + let block_index = (entry_height % (self.block_unconfirmed_txs.len() as u64)) as usize; + self.block_unconfirmed_txs[block_index][bucket_index] += 1; + Some(bucket_index) + } + + pub fn remove_unconfirmed_tx( + &mut self, + entry_height: u64, + tip_height: u64, + bucket_index: usize, + count_failure: bool, + ) { + let tx_age = tip_height.saturating_sub(entry_height) as usize; + if tx_age < 1 { + return; + } + if tx_age >= self.block_unconfirmed_txs.len() { + self.bucket_stats[bucket_index].old_unconfirmed_txs -= 1; + } else { + let block_index = (entry_height % self.block_unconfirmed_txs.len() as u64) as usize; + self.block_unconfirmed_txs[block_index][bucket_index] -= 1; + } + if count_failure { + self.confirm_blocks_to_failed_txs[tx_age - 1][bucket_index] += 1f64; + } + } + + pub fn move_track_window(&mut self, height: u64) { + let block_index = (height % (self.block_unconfirmed_txs.len() as u64)) as usize; + for bucket_index in 0..self.bucket_stats.len() { + // mark unconfirmed txs as old_unconfirmed_txs + self.bucket_stats[bucket_index].old_unconfirmed_txs += + self.block_unconfirmed_txs[block_index][bucket_index]; + self.block_unconfirmed_txs[block_index][bucket_index] = 0; + } + } + + /// apply decay factor on stats, smoothly reduce the effects of old samples. + pub fn decay(&mut self) { + let decay_factor = self.decay_factor; + for (bucket_index, bucket) in self.bucket_stats.iter_mut().enumerate() { + self.confirm_blocks_to_confirmed_txs + .iter_mut() + .for_each(|buckets| { + buckets[bucket_index] *= decay_factor; + }); + + self.confirm_blocks_to_failed_txs + .iter_mut() + .for_each(|buckets| { + buckets[bucket_index] *= decay_factor; + }); + bucket.total_fee_rate = + FeeRate::from_u64((bucket.total_fee_rate.as_u64() as f64 * decay_factor) as u64); + bucket.txs_count *= decay_factor; + // TODO do we need decay the old unconfirmed? + } + } + + /// The naive estimate implementation + /// 1. find best range of buckets satisfy the given condition + /// 2. get median fee_rate from best range bucekts + pub fn estimate_median( + &self, + confirm_blocks: usize, + required_samples: usize, + required_confirm_rate: f64, + ) -> FeeRate { + // A tx need 1 block to propose, then 2 block to get confirmed + // so at least confirm blocks is 3 blocks. + if confirm_blocks < 3 || required_samples == 0 { + return FeeRate::zero(); + } + let mut confirmed_txs = 0f64; + let mut txs_count = 0f64; + let mut failure_count = 0f64; + let mut extra_count = 0; + let mut best_bucket_start = 0; + let mut best_bucket_end = 0; + let mut start_bucket_index = 0; + let mut find_best = false; + // try find enough sample data from buckets + for (bucket_index, stat) in self.bucket_stats.iter().enumerate() { + confirmed_txs += self.confirm_blocks_to_confirmed_txs[confirm_blocks - 1][bucket_index]; + failure_count += self.confirm_blocks_to_failed_txs[confirm_blocks - 1][bucket_index]; + extra_count += &self.block_unconfirmed_txs[confirm_blocks - 1][bucket_index]; + txs_count += stat.txs_count; + // we have enough data + while txs_count as usize >= required_samples { + let confirm_rate = confirmed_txs / (txs_count + failure_count + extra_count as f64); + // satisfied required_confirm_rate, find the best buckets range + if confirm_rate >= required_confirm_rate { + best_bucket_start = start_bucket_index; + best_bucket_end = bucket_index; + find_best = true; + break; + } else { + // remove sample data of the first bucket in the range, then retry + let stat = self.bucket_stats.get(start_bucket_index).expect("exists"); + confirmed_txs -= self.confirm_blocks_to_confirmed_txs[confirm_blocks - 1] + [start_bucket_index]; + failure_count -= + self.confirm_blocks_to_failed_txs[confirm_blocks - 1][start_bucket_index]; + extra_count -= + &self.block_unconfirmed_txs[confirm_blocks - 1][start_bucket_index]; + txs_count -= stat.txs_count; + start_bucket_index += 1; + continue; + } + } + + // end loop if we found the best buckets + if find_best { + break; + } + } + + if !find_best { + return FeeRate::zero(); + } + + let best_range_txs_count: f64 = self.bucket_stats[best_bucket_start..=best_bucket_end] + .iter() + .map(|b| b.txs_count) + .sum(); + + // find median bucket + if best_range_txs_count != 0f64 { + let mut half_count = best_range_txs_count / 2f64; + for bucket in &self.bucket_stats[best_bucket_start..=best_bucket_end] { + // find the median bucket + if bucket.txs_count >= half_count { + return bucket.avg_fee_rate(); + } else { + half_count -= bucket.txs_count; + } + } + } + FeeRate::zero() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_estimate_median() { + let mut bucket_fee_rate = 1000; + let bucket_end_fee_rate = 5000; + let rate = 1.1f64; + // decay = exp(ln(0.5) / 100), so decay.pow(100) =~ 0.5 + let decay = 0.993f64; + let max_confirm_blocks = 1000; + // prepare fee rate buckets + let mut buckets = vec![]; + while bucket_fee_rate < bucket_end_fee_rate { + buckets.push(FeeRate::from_u64(bucket_fee_rate)); + bucket_fee_rate = (rate * bucket_fee_rate as f64) as u64; + } + let mut stat = TxConfirmStat::new(&buckets, max_confirm_blocks, decay); + // txs data + let fee_rate_and_confirms = vec![ + (2500, 5), + (3000, 5), + (3500, 5), + (1500, 10), + (2000, 10), + (2100, 10), + (2200, 10), + (1200, 15), + (1000, 15), + ]; + for (fee_rate, blocks_to_confirm) in fee_rate_and_confirms { + stat.add_confirmed_tx(blocks_to_confirm, FeeRate::from_u64(fee_rate)); + } + // test basic median fee rate + assert_eq!(stat.estimate_median(5, 3, 1f64), FeeRate::from_u64(3000)); + // test different required samples + assert_eq!(stat.estimate_median(10, 1, 1f64), FeeRate::from_u64(1500)); + assert_eq!(stat.estimate_median(10, 3, 1f64), FeeRate::from_u64(2050)); + assert_eq!(stat.estimate_median(10, 4, 1f64), FeeRate::from_u64(2050)); + assert_eq!(stat.estimate_median(15, 2, 1f64), FeeRate::from_u64(1000)); + assert_eq!(stat.estimate_median(15, 3, 1f64), FeeRate::from_u64(1200)); + // test return zero if confirm_blocks or required_samples is zero + assert_eq!(stat.estimate_median(0, 4, 1f64), FeeRate::zero()); + assert_eq!(stat.estimate_median(15, 0, 1f64), FeeRate::zero()); + assert_eq!(stat.estimate_median(0, 3, 1f64), FeeRate::zero()); + } +} diff --git a/util/jsonrpc-types/src/experiment.rs b/util/jsonrpc-types/src/experiment.rs index 4125798a9e..5f27264d35 100644 --- a/util/jsonrpc-types/src/experiment.rs +++ b/util/jsonrpc-types/src/experiment.rs @@ -1,7 +1,12 @@ -use crate::Cycle; +use crate::{Cycle, FeeRate}; use serde_derive::{Deserialize, Serialize}; #[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] pub struct DryRunResult { pub cycles: Cycle, } + +#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +pub struct EstimateResult { + pub fee_rate: FeeRate, +} diff --git a/util/jsonrpc-types/src/lib.rs b/util/jsonrpc-types/src/lib.rs index fc22178b3f..42877909f3 100644 --- a/util/jsonrpc-types/src/lib.rs +++ b/util/jsonrpc-types/src/lib.rs @@ -28,7 +28,7 @@ pub use self::blockchain::{ pub use self::bytes::JsonBytes; pub use self::cell::{CellOutputWithOutPoint, CellWithStatus}; pub use self::chain_info::ChainInfo; -pub use self::experiment::DryRunResult; +pub use self::experiment::{DryRunResult, EstimateResult}; pub use self::fixed_bytes::Byte32; pub use self::indexer::{CellTransaction, LiveCell, LockHashIndexState, TransactionPoint}; pub use self::net::{BannedAddr, Node, NodeAddress}; @@ -40,6 +40,6 @@ pub use self::uint32::Uint32; pub use self::uint64::Uint64; pub use jsonrpc_core::types::{error, id, params, request, response, version}; pub use primitive::{ - BlockNumber, Capacity, Cycle, EpochNumber, EpochNumberWithFraction, Timestamp, Version, + BlockNumber, Capacity, Cycle, EpochNumber, EpochNumberWithFraction, FeeRate, Timestamp, Version, }; pub use serde_derive::{Deserialize, Serialize}; diff --git a/util/jsonrpc-types/src/primitive.rs b/util/jsonrpc-types/src/primitive.rs index 7c4ad341df..db5c46e8da 100644 --- a/util/jsonrpc-types/src/primitive.rs +++ b/util/jsonrpc-types/src/primitive.rs @@ -5,5 +5,6 @@ pub type EpochNumber = Uint64; pub type EpochNumberWithFraction = Uint64; pub type Capacity = Uint64; pub type Cycle = Uint64; +pub type FeeRate = Uint64; pub type Timestamp = Uint64; pub type Version = Uint32;