From 0cd5233725134b3a707be56180bf3522ac2d2652 Mon Sep 17 00:00:00 2001 From: Andrew Hariri Date: Wed, 9 Oct 2024 12:06:35 -0700 Subject: [PATCH] Mempool API to Get All Addresses From Parking Lot (#14740) --- Cargo.lock | 2 + aptos-node/src/lib.rs | 6 +- aptos-node/src/services.rs | 8 ++- crates/aptos-admin-service/Cargo.toml | 2 + .../src/server/mempool/mod.rs | 55 +++++++++++++++++++ crates/aptos-admin-service/src/server/mod.rs | 27 +++++++++ mempool/src/core_mempool/index.rs | 7 +++ mempool/src/core_mempool/mempool.rs | 4 ++ mempool/src/core_mempool/transaction_store.rs | 4 ++ mempool/src/counters.rs | 5 +- mempool/src/shared_mempool/coordinator.rs | 5 ++ mempool/src/shared_mempool/tasks.rs | 20 +++++++ mempool/src/shared_mempool/types.rs | 8 ++- mempool/src/tests/integration_tests.rs | 23 ++++++++ mempool/src/tests/test_framework.rs | 9 +++ 15 files changed, 179 insertions(+), 6 deletions(-) create mode 100644 crates/aptos-admin-service/src/server/mempool/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 82bd6e543b615..a4522e83c5876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,11 +389,13 @@ dependencies = [ "aptos-crypto", "aptos-infallible", "aptos-logger", + "aptos-mempool", "aptos-runtimes", "aptos-storage-interface", "aptos-system-utils 0.1.0", "aptos-types", "bcs 0.1.4", + "futures-channel", "http 0.2.11", "hyper 0.14.28", "sha256", diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 3eadc3f0b2a26..abf6e340e6c4d 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -578,7 +578,7 @@ where let config = OnChainJWKConsensusConfig::default_enabled(); println!("Flag `INITIALIZE_JWK_CONSENSUS` detected, will enable JWK Consensus for all default OIDC providers in genesis: {:?}", config); Some(config) - }, + } _ => None, }; }))) @@ -704,6 +704,7 @@ pub fn setup_environment_and_start_node( indexer_runtime, indexer_grpc_runtime, internal_indexer_db_runtime, + mempool_client_sender, ) = services::bootstrap_api_and_indexer( &node_config, db_rw.clone(), @@ -714,6 +715,9 @@ pub fn setup_environment_and_start_node( indexer_grpc_port_tx, )?; + // Set mempool client sender in order to enable the Mempool API in the admin service + admin_service.set_mempool_client_sender(mempool_client_sender); + // Create mempool and get the consensus to mempool sender let (mempool_runtime, consensus_to_mempool_sender) = services::start_mempool_runtime_and_get_consensus_sender( diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index dce92c0088e79..9537f4ea3ef9f 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -19,7 +19,9 @@ use aptos_indexer_grpc_table_info::runtime::{ bootstrap as bootstrap_indexer_table_info, bootstrap_internal_indexer_db, }; use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater}; -use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest}; +use aptos_mempool::{ + network::MempoolSyncMsg, MempoolClientRequest, MempoolClientSender, QuorumStoreRequest, +}; use aptos_mempool_notifications::MempoolNotificationListener; use aptos_network::application::{interface::NetworkClientInterface, storage::PeersAndMetadata}; use aptos_network_benchmark::{run_netbench_service, NetbenchMessage}; @@ -59,6 +61,7 @@ pub fn bootstrap_api_and_indexer( Option, Option, Option, + MempoolClientSender, )> { // Create the mempool client and sender let (mempool_client_sender, mempool_client_receiver) = @@ -120,7 +123,7 @@ pub fn bootstrap_api_and_indexer( node_config, chain_id, db_rw.reader.clone(), - mempool_client_sender, + mempool_client_sender.clone(), )?; Ok(( @@ -130,6 +133,7 @@ pub fn bootstrap_api_and_indexer( indexer_runtime, indexer_grpc, db_indexer_runtime, + mempool_client_sender, )) } diff --git a/crates/aptos-admin-service/Cargo.toml b/crates/aptos-admin-service/Cargo.toml index af741b67511c7..a430be43bf996 100644 --- a/crates/aptos-admin-service/Cargo.toml +++ b/crates/aptos-admin-service/Cargo.toml @@ -19,11 +19,13 @@ aptos-consensus = { workspace = true } aptos-crypto = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } +aptos-mempool = { workspace = true } aptos-runtimes = { workspace = true } aptos-storage-interface = { workspace = true } aptos-system-utils = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } +futures-channel = { workspace = true } http = { workspace = true } hyper = { workspace = true } sha256 = { workspace = true } diff --git a/crates/aptos-admin-service/src/server/mempool/mod.rs b/crates/aptos-admin-service/src/server/mempool/mod.rs new file mode 100644 index 0000000000000..187a21cf6cea3 --- /dev/null +++ b/crates/aptos-admin-service/src/server/mempool/mod.rs @@ -0,0 +1,55 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::info; +use aptos_mempool::{MempoolClientRequest, MempoolClientSender}; +use aptos_system_utils::utils::{reply_with, reply_with_status}; +use aptos_types::account_address::AccountAddress; +use futures_channel::oneshot::Canceled; +use http::{Request, Response, StatusCode}; +use hyper::Body; + +pub async fn mempool_handle_parking_lot_address_request( + _req: Request, + mempool_client_sender: MempoolClientSender, +) -> hyper::Result> { + match get_parking_lot_addresses(mempool_client_sender).await { + Ok(addresses) => { + info!("Finished getting parking lot addresses from mempool."); + match bcs::to_bytes(&addresses) { + Ok(addresses) => Ok(reply_with(vec![], addresses)), + Err(e) => { + info!("Failed to bcs serialize parking lot addresses from mempool: {e:?}"); + Ok(reply_with_status( + StatusCode::INTERNAL_SERVER_ERROR, + e.to_string(), + )) + }, + } + }, + Err(e) => { + info!("Failed to get parking lot addresses from mempool: {e:?}"); + Ok(reply_with_status( + StatusCode::INTERNAL_SERVER_ERROR, + e.to_string(), + )) + }, + } +} + +async fn get_parking_lot_addresses( + mempool_client_sender: MempoolClientSender, +) -> Result, Canceled> { + let (sender, receiver) = futures_channel::oneshot::channel(); + + match mempool_client_sender + .clone() + .try_send(MempoolClientRequest::GetAddressesFromParkingLot(sender)) + { + Ok(_) => receiver.await, + Err(e) => { + info!("Failed to send request for GetAddressesFromParkingLot: {e:?}"); + Err(Canceled) + }, + } +} diff --git a/crates/aptos-admin-service/src/server/mod.rs b/crates/aptos-admin-service/src/server/mod.rs index 4387f8c6e9693..bc72d791ed850 100644 --- a/crates/aptos-admin-service/src/server/mod.rs +++ b/crates/aptos-admin-service/src/server/mod.rs @@ -7,6 +7,7 @@ use aptos_consensus::{ }; use aptos_infallible::RwLock; use aptos_logger::info; +use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_system_utils::utils::reply_with_status; #[cfg(target_os = "linux")] @@ -26,6 +27,7 @@ use std::{ use tokio::runtime::Runtime; mod consensus; +mod mempool; #[derive(Default)] pub struct Context { @@ -34,6 +36,7 @@ pub struct Context { aptos_db: RwLock>>, consensus_db: RwLock>>, quorum_store_db: RwLock>>, + mempool_client_sender: RwLock>, } impl Context { @@ -49,6 +52,10 @@ impl Context { *self.consensus_db.write() = Some(consensus_db); *self.quorum_store_db.write() = Some(quorum_store_db); } + + fn set_mempool_client_sender(&self, mempool_client_sender: MempoolClientSender) { + *self.mempool_client_sender.write() = Some(mempool_client_sender); + } } pub struct AdminService { @@ -107,6 +114,11 @@ impl AdminService { .set_consensus_dbs(consensus_db, quorum_store_db) } + pub fn set_mempool_client_sender(&self, mempool_client_sender: MempoolClientSender) { + self.context + .set_mempool_client_sender(mempool_client_sender) + } + fn start(&self, address: SocketAddr, enabled: bool) { let context = self.context.clone(); self.runtime.spawn(async move { @@ -210,6 +222,21 @@ impl AdminService { )) } }, + (hyper::Method::GET, "/debug/mempool/parking-lot/addresses") => { + let mempool_client_sender = context.mempool_client_sender.read().clone(); + if mempool_client_sender.is_some() { + mempool::mempool_handle_parking_lot_address_request( + req, + mempool_client_sender.unwrap(), + ) + .await + } else { + Ok(reply_with_status( + StatusCode::NOT_FOUND, + "Mempool parking lot is not available.", + )) + } + }, _ => Ok(reply_with_status(StatusCode::NOT_FOUND, "Not found.")), } } diff --git a/mempool/src/core_mempool/index.rs b/mempool/src/core_mempool/index.rs index 959f3ca091120..3194a29ae86ba 100644 --- a/mempool/src/core_mempool/index.rs +++ b/mempool/src/core_mempool/index.rs @@ -513,6 +513,13 @@ impl ParkingLotIndex { pub(crate) fn size(&self) -> usize { self.size } + + pub(crate) fn get_addresses(&self) -> Vec<(AccountAddress, u64)> { + self.data + .iter() + .map(|(addr, txns)| (*addr, txns.len() as u64)) + .collect::>() + } } /// Logical pointer to `MempoolTransaction`. diff --git a/mempool/src/core_mempool/mempool.rs b/mempool/src/core_mempool/mempool.rs index bf6d52a462aa0..b0a0ff613e96a 100644 --- a/mempool/src/core_mempool/mempool.rs +++ b/mempool/src/core_mempool/mempool.rs @@ -597,4 +597,8 @@ impl Mempool { pub fn get_transaction_store(&self) -> &TransactionStore { &self.transactions } + + pub fn get_parking_lot_addresses(&self) -> Vec<(AccountAddress, u64)> { + self.transactions.get_parking_lot_addresses() + } } diff --git a/mempool/src/core_mempool/transaction_store.rs b/mempool/src/core_mempool/transaction_store.rs index c225a52725707..6bf63be986a64 100644 --- a/mempool/src/core_mempool/transaction_store.rs +++ b/mempool/src/core_mempool/transaction_store.rs @@ -916,4 +916,8 @@ impl TransactionStore { pub(crate) fn get_transactions(&self) -> &HashMap { &self.transactions } + + pub(crate) fn get_parking_lot_addresses(&self) -> Vec<(AccountAddress, u64)> { + self.parking_lot_index.get_addresses() + } } diff --git a/mempool/src/counters.rs b/mempool/src/counters.rs index 464523419e886..838506efd7fc2 100644 --- a/mempool/src/counters.rs +++ b/mempool/src/counters.rs @@ -67,6 +67,7 @@ pub const SUCCESS_LABEL: &str = "success"; // Bounded executor task labels pub const CLIENT_EVENT_LABEL: &str = "client_event"; pub const CLIENT_EVENT_GET_TXN_LABEL: &str = "client_event_get_txn"; +pub const CLIENT_EVENT_GET_PARKING_LOT_ADDRESSES: &str = "client_event_get_parking_lot_addresses"; pub const RECONFIG_EVENT_LABEL: &str = "reconfig"; pub const PEER_BROADCAST_EVENT_LABEL: &str = "peer_broadcast"; @@ -284,7 +285,7 @@ pub static CORE_MEMPOOL_GC_EVENT_COUNT: Lazy = Lazy::new(|| { "aptos_core_mempool_gc_event_count", "Number of times the periodic garbage-collection event occurs, regardless of how many txns were actually removed", &["type"]) - .unwrap() + .unwrap() }); /// Counter for number of periodic client garbage-collection (=GC) events that happen with eager @@ -362,7 +363,7 @@ static MEMPOOL_SERVICE_TXNS: Lazy = Lazy::new(|| { &["type"], TXN_COUNT_BUCKETS.clone() ) - .unwrap() + .unwrap() }); pub fn mempool_service_transactions(label: &'static str, num: usize) { diff --git a/mempool/src/shared_mempool/coordinator.rs b/mempool/src/shared_mempool/coordinator.rs index 76a36e699cd79..2ddfd088e1917 100644 --- a/mempool/src/shared_mempool/coordinator.rs +++ b/mempool/src/shared_mempool/coordinator.rs @@ -217,6 +217,11 @@ async fn handle_client_request( )) .await; }, + MempoolClientRequest::GetAddressesFromParkingLot(callback) => { + bounded_executor + .spawn(tasks::process_parking_lot_addresses(smp.clone(), callback)) + .await; + }, } } diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index d1acbe7ce1a2a..71a8c6f392a56 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -30,6 +30,7 @@ use aptos_metrics_core::HistogramTimer; use aptos_network::application::interface::NetworkClientInterface; use aptos_storage_interface::state_view::LatestDbStateCheckpointView; use aptos_types::{ + account_address::AccountAddress, mempool_status::{MempoolStatus, MempoolStatusCode}, on_chain_config::{OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig}, transaction::SignedTransaction, @@ -151,6 +152,25 @@ pub(crate) async fn process_client_transaction_submission( + smp: SharedMempool, + callback: oneshot::Sender>, +) where + NetworkClient: NetworkClientInterface, + TransactionValidator: TransactionValidation + 'static, +{ + let addresses = smp.mempool.lock().get_parking_lot_addresses(); + + if callback.send(addresses).is_err() { + warn!(LogSchema::event_log( + LogEntry::JsonRpc, + LogEvent::CallbackFail + )); + counters::CLIENT_CALLBACK_FAIL.inc(); + } +} + /// Processes get transaction by hash request by client. pub(crate) async fn process_client_get_transaction( smp: SharedMempool, diff --git a/mempool/src/shared_mempool/types.rs b/mempool/src/shared_mempool/types.rs index 8f92858ad1873..b2eb6572b89b5 100644 --- a/mempool/src/shared_mempool/types.rs +++ b/mempool/src/shared_mempool/types.rs @@ -21,7 +21,8 @@ use aptos_infallible::{Mutex, RwLock}; use aptos_network::application::interface::NetworkClientInterface; use aptos_storage_interface::DbReader; use aptos_types::{ - mempool_status::MempoolStatus, transaction::SignedTransaction, vm_status::DiscardedVMStatus, + account_address::AccountAddress, mempool_status::MempoolStatus, transaction::SignedTransaction, + vm_status::DiscardedVMStatus, }; use aptos_vm_validator::vm_validator::TransactionValidation; use futures::{ @@ -235,8 +236,13 @@ pub type SubmissionStatus = (MempoolStatus, Option); pub type SubmissionStatusBundle = (SignedTransaction, SubmissionStatus); pub enum MempoolClientRequest { + /// Submits a transaction to the mempool and returns its submission status SubmitTransaction(SignedTransaction, oneshot::Sender>), + /// Retrieves a signed transaction from the mempool using its hash GetTransactionByHash(HashValue, oneshot::Sender>), + /// Retrieves all addresses with transactions in the mempool's parking lot and + /// the number of transactions for each address + GetAddressesFromParkingLot(oneshot::Sender>), } pub type MempoolClientSender = mpsc::Sender; diff --git a/mempool/src/tests/integration_tests.rs b/mempool/src/tests/integration_tests.rs index 8c1661c23a7b7..c10facf9ae787 100644 --- a/mempool/src/tests/integration_tests.rs +++ b/mempool/src/tests/integration_tests.rs @@ -319,6 +319,29 @@ async fn test_rebroadcast_retry_is_empty() { .await; } +#[tokio::test] +async fn test_get_all_addresses_from_parking_lot() { + let mut node = MempoolTestFrameworkBuilder::single_validator(); + + // Add second txn. Using TXN_2 here because sequence number needs to be higher than expected + // to be put in parking lot + node.add_txns_via_client(&TXN_2).await; + + // Check to make sure transaction is in parking lot + let addresses = node.get_parking_lot_txns_via_client().await; + let address = addresses.first().unwrap().0; + let txn_size = addresses.first().unwrap().1; + + // Assert that the address matches + assert_eq!( + address.to_string(), + TXN_2.first().unwrap().address.to_string() + ); + + // Assert there is only one transaction for that address + assert_eq!(txn_size, 1); +} + // -- Multi node tests below here -- /// Tests if the node is a VFN, and it's getting forwarded messages from a PFN. It should forward diff --git a/mempool/src/tests/test_framework.rs b/mempool/src/tests/test_framework.rs index 30260e6b69557..efc1a64a12594 100644 --- a/mempool/src/tests/test_framework.rs +++ b/mempool/src/tests/test_framework.rs @@ -168,6 +168,15 @@ impl MempoolNode { } } + pub async fn get_parking_lot_txns_via_client(&mut self) -> Vec<(AccountAddress, u64)> { + let (sender, receiver) = oneshot::channel(); + self.mempool_client_sender + .send(MempoolClientRequest::GetAddressesFromParkingLot(sender)) + .await + .unwrap(); + receiver.await.unwrap() + } + /// Asynchronously waits for up to 1 second for txns to appear in mempool pub async fn wait_on_txns_in_mempool(&self, txns: &[TestTransaction]) { for _ in 0..10 {