From e710c9b1695445a8539c4fce762ef193717d126f Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Sun, 28 Jan 2024 10:27:24 -0800 Subject: [PATCH] feat: merging movement-v2 upstream. --- m1/Cargo.lock | 1 + m1/Cargo.toml | 2 + m1/rust-toolchain | 2 +- m1/subnet/Cargo.toml | 1 + m1/subnet/src/api/chain_handlers.rs | 121 +- m1/subnet/src/api/static_handlers.rs | 4 +- m1/subnet/src/block/mod.rs | 10 +- m1/subnet/src/lib.rs | 1 + m1/subnet/src/state/mod.rs | 8 +- m1/subnet/src/util/mod.rs | 13 + m1/subnet/src/vm/mod.rs | 1661 ++++++++++---------------- 11 files changed, 754 insertions(+), 1070 deletions(-) create mode 100644 m1/subnet/src/util/mod.rs diff --git a/m1/Cargo.lock b/m1/Cargo.lock index 05604bc6..a6ed8db4 100644 --- a/m1/Cargo.lock +++ b/m1/Cargo.lock @@ -9163,6 +9163,7 @@ dependencies = [ "jsonrpc-core-client", "jsonrpc-derive", "log", + "poem-openapi", "rand 0.7.3", "serde 1.0.193", "serde_json", diff --git a/m1/Cargo.toml b/m1/Cargo.toml index 19b7b068..6270940c 100644 --- a/m1/Cargo.toml +++ b/m1/Cargo.toml @@ -63,6 +63,8 @@ tokio = { version = "1.21.0", features = ["full"] } tokio-util = { version = "0.7.2", features = ["compat", "codec"] } toml = "0.5.9" walkdir = "2.3.2" +poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } +poem-openapi-derive = "=2.0.11" jemallocator = { version = "0.3.2", features = [ "profiling", diff --git a/m1/rust-toolchain b/m1/rust-toolchain index 5e3a4256..68bc7ff2 100644 --- a/m1/rust-toolchain +++ b/m1/rust-toolchain @@ -1 +1 @@ -1.73.0 +1.71.1 diff --git a/m1/subnet/Cargo.toml b/m1/subnet/Cargo.toml index cb3cf071..e5aca851 100644 --- a/m1/subnet/Cargo.toml +++ b/m1/subnet/Cargo.toml @@ -52,6 +52,7 @@ bcs = { workspace = true } aptos-indexer = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-protos = { workspace = true } +poem-openapi = { workspace = true } # todo: differs from workspace because of e2e tests crate I believe, need to check clap = { version = "4.4.8", features = ["cargo", "derive"] } \ No newline at end of file diff --git a/m1/subnet/src/api/chain_handlers.rs b/m1/subnet/src/api/chain_handlers.rs index cf86476d..0db79950 100644 --- a/m1/subnet/src/api/chain_handlers.rs +++ b/m1/subnet/src/api/chain_handlers.rs @@ -1,16 +1,17 @@ use std::io; use std::marker::PhantomData; +use aptos_api::accept_type::AcceptType; +use aptos_api_types::U64; use avalanche_types::proto::http::Element; use avalanche_types::subnet::rpc::http::handle::Handle; use bytes::Bytes; use jsonrpc_core::{BoxFuture, Error, ErrorCode, IoHandler, Result}; use jsonrpc_derive::rpc; use serde::{Deserialize, Serialize}; -use aptos_api::accept_type::AcceptType; -use aptos_api_types::U64; use crate::api::de_request; +use crate::util::HexParser; use crate::vm::Vm; #[rpc] @@ -22,16 +23,28 @@ pub trait Rpc { #[rpc(name = "submitTransaction", alias("aptosvm.submitTransaction"))] fn submit_transaction(&self, args: RpcReq) -> BoxFuture>; - #[rpc(name = "submitTransactionBatch", alias("aptosvm.submitTransactionBatch"))] + #[rpc( + name = "submitTransactionBatch", + alias("aptosvm.submitTransactionBatch") + )] fn submit_transaction_batch(&self, args: RpcReq) -> BoxFuture>; #[rpc(name = "getTransactionByHash", alias("aptosvm.getTransactionByHash"))] fn get_transaction_by_hash(&self, args: RpcReq) -> BoxFuture>; - #[rpc(name = "getTransactionByVersion", alias("aptosvm.getTransactionByVersion"))] - fn get_transaction_by_version(&self, args: GetTransactionByVersionArgs) -> BoxFuture>; + #[rpc( + name = "getTransactionByVersion", + alias("aptosvm.getTransactionByVersion") + )] + fn get_transaction_by_version( + &self, + args: GetTransactionByVersionArgs, + ) -> BoxFuture>; - #[rpc(name = "getAccountsTransactions", alias("aptosvm.getAccountsTransactions"))] + #[rpc( + name = "getAccountsTransactions", + alias("aptosvm.getAccountsTransactions") + )] fn get_accounts_transactions(&self, args: RpcReq) -> BoxFuture>; #[rpc(name = "simulateTransaction", alias("aptosvm.simulateTransaction"))] @@ -44,17 +57,18 @@ pub trait Rpc { fn estimate_gas_price(&self) -> BoxFuture>; /*******************************TRANSACTION END***************************************/ - /*******************************HELPER API START***************************************/ #[rpc(name = "faucet", alias("aptosvm.faucet"))] fn faucet_apt(&self, args: RpcReq) -> BoxFuture>; + #[rpc(name = "faucetWithCli")] + fn faucet_with_cli(&self, args: RpcReq) -> BoxFuture>; + #[rpc(name = "createAccount", alias("aptosvm.createAccount"))] fn create_account(&self, args: RpcReq) -> BoxFuture>; /*******************************HELPER API END***************************************/ - /******************************* ACCOUNT START ***************************************/ #[rpc(name = "getAccount", alias("aptosvm.getAccount"))] @@ -66,14 +80,19 @@ pub trait Rpc { #[rpc(name = "getAccountModules", alias("aptosvm.getAccountModules"))] fn get_account_modules(&self, args: RpcReq) -> BoxFuture>; - #[rpc(name = "getAccountResourcesState", alias("aptosvm.getAccountResourcesState"))] + #[rpc( + name = "getAccountResourcesState", + alias("aptosvm.getAccountResourcesState") + )] fn get_account_resources_state(&self, args: AccountStateArgs) -> BoxFuture>; - #[rpc(name = "getAccountModulesState", alias("aptosvm.getAccountModulesState"))] + #[rpc( + name = "getAccountModulesState", + alias("aptosvm.getAccountModulesState") + )] fn get_account_modules_state(&self, args: AccountStateArgs) -> BoxFuture>; /******************************* ACCOUNT END ***************************************/ - /*******************************BLOCK START***************************************/ #[rpc(name = "getBlockByHeight", alias("aptosvm.getBlockByHeight"))] fn get_block_by_height(&self, args: BlockArgs) -> BoxFuture>; @@ -91,24 +110,29 @@ pub trait Rpc { #[rpc(name = "getRawTableItem", alias("aptosvm.getRawTableItem"))] fn get_raw_table_item(&self, args: RpcTableReq) -> BoxFuture>; - #[rpc(name = "getEventsByCreationNumber", alias("aptosvm.getEventsByCreationNumber"))] + #[rpc( + name = "getEventsByCreationNumber", + alias("aptosvm.getEventsByCreationNumber") + )] fn get_events_by_creation_number(&self, args: RpcEventNumReq) -> BoxFuture>; - #[rpc(name = "getEventsByEventHandle", alias("aptosvm.getEventsByEventHandle"))] + #[rpc( + name = "getEventsByEventHandle", + alias("aptosvm.getEventsByEventHandle") + )] fn get_events_by_event_handle(&self, args: RpcEventHandleReq) -> BoxFuture>; #[rpc(name = "getLedgerInfo", alias("aptosvm.getLedgerInfo"))] fn get_ledger_info(&self) -> BoxFuture>; } - #[derive(Deserialize, Serialize, Debug, Clone)] pub struct GetTableItemArgs { pub table_handle: String, pub key_type: String, pub value_type: String, pub key: String, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -117,22 +141,22 @@ pub struct RpcReq { pub ledger_version: Option, pub start: Option, pub limit: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] pub struct RpcRes { pub data: String, pub header: String, + pub error: Option, } - #[derive(Deserialize, Serialize, Debug, Clone)] pub struct RpcTableReq { pub query: String, pub body: String, pub ledger_version: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -141,7 +165,7 @@ pub struct RpcEventNumReq { pub creation_number: U64, pub start: Option, pub limit: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -151,20 +175,20 @@ pub struct RpcEventHandleReq { pub address: String, pub event_handle: String, pub field_name: String, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] pub struct BlockArgs { pub height_or_version: u64, pub with_transactions: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] pub struct GetTransactionByVersionArgs { pub version: U64, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -172,17 +196,16 @@ pub struct AccountStateArgs { pub account: String, pub resource: String, pub ledger_version: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] pub struct PageArgs { pub start: Option, pub limit: Option, - pub is_bsc_format: Option, + pub is_bcs_format: Option, } - #[derive(Clone)] pub struct ChainService { pub vm: Vm, @@ -194,7 +217,6 @@ impl ChainService { } } - impl Rpc for ChainService { fn get_transactions(&self, args: PageArgs) -> BoxFuture> { let vm = self.vm.clone(); @@ -208,12 +230,14 @@ impl Rpc for ChainService { log::debug!("submit_transaction called"); let vm = self.vm.clone(); Box::pin(async move { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let r = vm.submit_transaction(hex::decode(args.data).unwrap(), accept).await; + let r = vm + .submit_transaction(hex::decode(args.data).unwrap(), accept) + .await; Ok(r) }) } @@ -221,12 +245,14 @@ impl Rpc for ChainService { fn submit_transaction_batch(&self, args: RpcReq) -> BoxFuture> { let vm = self.vm.clone(); Box::pin(async move { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let r = vm.submit_transaction_batch(hex::decode(args.data).unwrap(), accept).await; + let r = vm + .submit_transaction_batch(hex::decode(args.data).unwrap(), accept) + .await; Ok(r) }) } @@ -239,7 +265,10 @@ impl Rpc for ChainService { }) } - fn get_transaction_by_version(&self, args: GetTransactionByVersionArgs) -> BoxFuture> { + fn get_transaction_by_version( + &self, + args: GetTransactionByVersionArgs, + ) -> BoxFuture> { let vm = self.vm.clone(); Box::pin(async move { let ret = vm.get_transaction_by_version(args).await; @@ -259,7 +288,7 @@ impl Rpc for ChainService { let vm = self.vm.clone(); Box::pin(async move { let data = hex::decode(args.data).unwrap(); - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json @@ -288,8 +317,9 @@ impl Rpc for ChainService { fn faucet_apt(&self, args: RpcReq) -> BoxFuture> { let vm = self.vm.clone(); Box::pin(async move { - let acc = hex::decode(args.data).unwrap(); - let accept = if args.is_bsc_format.unwrap_or(false) { + let s = args.data.as_str(); + let acc = HexParser::parse_hex_string(s).unwrap(); + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json @@ -302,12 +332,14 @@ impl Rpc for ChainService { fn create_account(&self, args: RpcReq) -> BoxFuture> { let vm = self.vm.clone(); Box::pin(async move { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let ret = vm.create_account(args.data.as_str(), accept).await; + let s = args.data.as_str(); + let acc = HexParser::parse_hex_string(s).unwrap(); + let ret = vm.create_account(acc, accept).await; Ok(ret) }) } @@ -371,7 +403,6 @@ impl Rpc for ChainService { fn view_function(&self, args: RpcReq) -> BoxFuture> { let vm = self.vm.clone(); Box::pin(async move { - log::info!("view_function called {}",args.data.clone()); let ret = vm.view_function(args).await; return Ok(ret); }) @@ -416,6 +447,16 @@ impl Rpc for ChainService { return Ok(ret); }) } + + fn faucet_with_cli(&self, args: RpcReq) -> BoxFuture> { + let vm = self.vm.clone(); + Box::pin(async move { + let s = args.data.as_str(); + let acc = HexParser::parse_hex_string(s).unwrap(); + let ret = vm.faucet_with_cli(acc).await; + Ok(ret) + }) + } } #[derive(Clone, Debug)] @@ -426,8 +467,8 @@ pub struct ChainHandler { #[tonic::async_trait] impl Handle for ChainHandler - where - T: Rpc + Send + Sync + Clone + 'static, +where + T: Rpc + Send + Sync + Clone + 'static, { async fn request( &self, @@ -455,7 +496,7 @@ impl ChainHandler { } } - +#[allow(dead_code)] fn create_jsonrpc_error(e: std::io::Error) -> Error { let mut error = Error::new(ErrorCode::InternalError); error.message = format!("{}", e); diff --git a/m1/subnet/src/api/static_handlers.rs b/m1/subnet/src/api/static_handlers.rs index 392e5b37..40b79597 100644 --- a/m1/subnet/src/api/static_handlers.rs +++ b/m1/subnet/src/api/static_handlers.rs @@ -14,7 +14,7 @@ use crate::api::de_request; /// Defines static handler RPCs for this VM. #[rpc] pub trait Rpc { - #[rpc(name = "ping", alias("timestampvm.ping"))] + #[rpc(name = "ping")] fn ping(&self) -> BoxFuture>; } @@ -29,8 +29,6 @@ impl StaticService { } } - - impl Rpc for StaticService { fn ping(&self) -> BoxFuture> { log::debug!("ping called"); diff --git a/m1/subnet/src/block/mod.rs b/m1/subnet/src/block/mod.rs index aee144d0..1d5e8c45 100644 --- a/m1/subnet/src/block/mod.rs +++ b/m1/subnet/src/block/mod.rs @@ -106,6 +106,7 @@ impl Block { } /// Returns the parent block Id. + #[allow(dead_code)] pub fn parent_id(&self) -> ids::Id { self.parent_id } @@ -116,11 +117,13 @@ impl Block { } /// Returns the timestamp of this block. + #[allow(dead_code)] pub fn timestamp(&self) -> u64 { self.timestamp } /// Returns the data of this block. + #[allow(dead_code)] pub fn data(&self) -> &[u8] { &self.data } @@ -136,6 +139,7 @@ impl Block { } /// Returns the byte representation of this block. + #[allow(dead_code)] pub fn bytes(&self) -> &[u8] { &self.bytes } @@ -210,8 +214,8 @@ impl Block { /// Mark this [`Block`](Block) accepted and updates [`State`](crate::state::State) accordingly. pub async fn accept(&mut self) -> io::Result<()> { + log::info!("accept block height {} ", self.height); self.inner_build().await?; - println!("-----accept----1---"); self.set_status(choices::status::Status::Accepted); // only decided blocks are persistent -- no reorg self.state.write_block(&self.clone()).await?; @@ -231,7 +235,7 @@ impl Block { /// Mark this [`Block`](Block) rejected and updates [`State`](crate::state::State) accordingly. pub async fn reject(&mut self) -> io::Result<()> { self.set_status(choices::status::Status::Rejected); - println!("-----reject----1---"); + log::info!(">>>>>>>>>> reject >>>>>>>>>>"); // only decided blocks are persistent -- no reorg self.state.write_block(&self.clone()).await?; @@ -250,7 +254,6 @@ impl fmt::Display for Block { } } - #[tonic::async_trait] impl subnet::rpc::consensus::snowman::Block for Block { async fn bytes(&self) -> &[u8] { @@ -293,4 +296,3 @@ impl subnet::rpc::consensus::snowman::Decidable for Block { self.reject().await } } - diff --git a/m1/subnet/src/lib.rs b/m1/subnet/src/lib.rs index 86da6966..a953ac4d 100644 --- a/m1/subnet/src/lib.rs +++ b/m1/subnet/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod block; pub mod state; pub mod vm; +pub mod util; use std::io; diff --git a/m1/subnet/src/state/mod.rs b/m1/subnet/src/state/mod.rs index 1a21945a..42d596bf 100644 --- a/m1/subnet/src/state/mod.rs +++ b/m1/subnet/src/state/mod.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use crate::block::Block; -use crate::vm::{ Vm}; +use crate::vm::Vm; /// Manages block and chain states for this Vm, both in-memory and persistent. #[derive(Clone)] @@ -94,7 +94,7 @@ impl State { }) } - pub fn set_vm(&mut self, vm:Vm) { + pub fn set_vm(&mut self, vm: Vm) { self.vm = Some(Arc::new(RwLock::new(vm))); } @@ -120,7 +120,7 @@ impl State { return Ok(ids::Id::empty()); } Err(e) - } + }, } } @@ -138,6 +138,7 @@ impl State { } /// Returns "true" if the block Id has been already verified. + #[allow(dead_code)] pub async fn has_verified(&self, blk_id: &ids::Id) -> bool { let verified_blocks = self.verified_blocks.read().await; verified_blocks.contains_key(blk_id) @@ -179,4 +180,3 @@ impl State { Ok(blk) } } - diff --git a/m1/subnet/src/util/mod.rs b/m1/subnet/src/util/mod.rs new file mode 100644 index 00000000..873ab130 --- /dev/null +++ b/m1/subnet/src/util/mod.rs @@ -0,0 +1,13 @@ +use hex; + +pub struct HexParser; + +impl HexParser { + pub(crate) fn parse_hex_string(hex_string: &str) -> Result, hex::FromHexError> { + if hex_string.starts_with("0x") { + hex::decode(&hex_string[2..]) + } else { + hex::decode(hex_string) + } + } +} diff --git a/m1/subnet/src/vm/mod.rs b/m1/subnet/src/vm/mod.rs index e4d38893..f73d7f52 100644 --- a/m1/subnet/src/vm/mod.rs +++ b/m1/subnet/src/vm/mod.rs @@ -1,73 +1,80 @@ -use std::{collections::HashMap, fs, io::{self, Error, ErrorKind}, sync::Arc}; -use std::str::FromStr; -use std::time::{Duration}; -use avalanche_types::{ - choices, ids, - subnet::{self, rpc::snow}, -}; use avalanche_types::subnet::rpc::database::manager::{DatabaseManager, Manager}; use avalanche_types::subnet::rpc::health::Checkable; -use avalanche_types::subnet::rpc::snow::engine::common::appsender::AppSender; use avalanche_types::subnet::rpc::snow::engine::common::appsender::client::AppSenderClient; -use avalanche_types::subnet::rpc::snow::engine::common::engine::{AppHandler, CrossChainAppHandler, NetworkAppHandler}; +use avalanche_types::subnet::rpc::snow::engine::common::appsender::AppSender; +use avalanche_types::subnet::rpc::snow::engine::common::engine::{ + AppHandler, CrossChainAppHandler, NetworkAppHandler, +}; use avalanche_types::subnet::rpc::snow::engine::common::http_handler::{HttpHandler, LockOptions}; use avalanche_types::subnet::rpc::snow::engine::common::message::Message::PendingTxs; use avalanche_types::subnet::rpc::snow::engine::common::vm::{CommonVm, Connector}; use avalanche_types::subnet::rpc::snow::validators::client::ValidatorStateClient; use avalanche_types::subnet::rpc::snowman::block::{BatchedChainVm, ChainVm, Getter, Parser}; +use avalanche_types::{ + choices, ids, + subnet::{self, rpc::snow}, +}; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{channel::mpsc as futures_mpsc, StreamExt}; use hex; use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{ + collections::HashMap, + fs, + io::{self, Error, ErrorKind}, + sync::Arc, +}; use tokio::sync::{mpsc::Sender, RwLock}; -use aptos_api::{Context, get_raw_api_service, RawApi}; use aptos_api::accept_type::AcceptType; use aptos_api::response::{AptosResponseContent, BasicResponse}; -use aptos_api::transactions::{SubmitTransactionPost, SubmitTransactionResponse, SubmitTransactionsBatchPost, SubmitTransactionsBatchResponse}; -use aptos_api_types::{Address, EncodeSubmissionRequest, IdentifierWrapper, MoveStructTag, RawTableItemRequest, StateKeyWrapper, TableItemRequest, ViewRequest}; +use aptos_api::transactions::{ + SubmitTransactionPost, SubmitTransactionResponse, SubmitTransactionsBatchPost, + SubmitTransactionsBatchResponse, +}; +use aptos_api::{get_raw_api_service, Context, RawApi}; +use aptos_api_types::{ + Address, EncodeSubmissionRequest, IdentifierWrapper, MoveStructTag, RawTableItemRequest, + StateKeyWrapper, TableItemRequest, ViewRequest, U64, +}; use aptos_config::config::NodeConfig; -use aptos_crypto::{HashValue, ValidCryptoMaterialStringExt}; -use aptos_crypto::ed25519::Ed25519PublicKey; +use aptos_crypto::HashValue; use aptos_db::AptosDB; use aptos_executor::block_executor::BlockExecutor; use aptos_executor::db_bootstrapper::{generate_waypoint, maybe_bootstrap}; use aptos_executor_types::BlockExecutorTrait; -use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; use aptos_mempool::core_mempool::{CoreMempool, TimelineState}; +use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; use aptos_sdk::rest_client::aptos_api_types::MAX_RECURSIVE_TYPES_ALLOWED; use aptos_sdk::transaction_builder::TransactionFactory; use aptos_sdk::types::{AccountKey, LocalAccount}; use aptos_state_view::account_with_state_view::AsAccountWithStateView; -use aptos_storage_interface::DbReaderWriter; use aptos_storage_interface::state_view::DbStateViewAtVersion; +use aptos_storage_interface::DbReaderWriter; use aptos_types::account_address::AccountAddress; use aptos_types::account_config::aptos_test_root_address; use aptos_types::account_view::AccountView; +use aptos_types::block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}; use aptos_types::block_info::BlockInfo; use aptos_types::block_metadata::BlockMetadata; use aptos_types::chain_id::ChainId; use aptos_types::ledger_info::{generate_ledger_info_with_sig, LedgerInfo}; use aptos_types::mempool_status::{MempoolStatus, MempoolStatusCode}; -use aptos_types::transaction::{SignedTransaction, Transaction, WriteSetPayload}; use aptos_types::transaction::Transaction::UserTransaction; +use aptos_types::transaction::{SignedTransaction, Transaction, WriteSetPayload}; use aptos_types::validator_signer::ValidatorSigner; use aptos_vm::AptosVM; -use aptos_vm_genesis::{GENESIS_KEYPAIR, test_genesis_change_set_and_validators}; -use aptos_node::indexer::bootstrap_indexer; -use aptos_indexer::runtime::run_forever; -// use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; -use aptos_indexer_grpc_fullnode::runtime::FullnodeDataService; -use aptos_protos::internal::fullnode::v1::fullnode_data_server::{FullnodeData, FullnodeDataServer}; -use std::net::ToSocketAddrs; +use aptos_vm_genesis::{test_genesis_change_set_and_validators, GENESIS_KEYPAIR}; -use crate::{block::Block, state}; -use crate::api::chain_handlers::{AccountStateArgs, BlockArgs, ChainHandler, ChainService, GetTransactionByVersionArgs, PageArgs, RpcEventHandleReq, RpcEventNumReq, RpcReq, RpcRes, RpcTableReq}; +use crate::api::chain_handlers::{ + AccountStateArgs, BlockArgs, ChainHandler, ChainService, GetTransactionByVersionArgs, PageArgs, + RpcEventHandleReq, RpcEventNumReq, RpcReq, RpcRes, RpcTableReq, +}; use crate::api::static_handlers::{StaticHandler, StaticService}; -use tonic::{transport::Server, Request, Response, Status}; -use uuid::Uuid; -use aptos_types::block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}; +use crate::{block::Block, state}; const VERSION: &str = env!("CARGO_PKG_VERSION"); const MOVE_DB_DIR: &str = ".move-chain-data"; @@ -75,9 +82,10 @@ pub fn get_db_name(suffix : &str) -> String { format!("{}-{}", MOVE_DB_DIR, suffix) } + #[derive(Serialize, Deserialize, Clone)] pub struct AptosData( - pub Vec, // block info + pub Vec, // block info pub HashValue, // block id pub HashValue, pub u64, @@ -96,7 +104,6 @@ pub struct AptosHeader { cursor: Option, } - /// Represents VM-specific states. /// Defined in a separate struct, for interior mutability in [`Vm`](Vm). /// To be protected with `Arc` and `RwLock`. @@ -149,13 +156,9 @@ pub struct Vm { pub build_status: Arc>, // 0 done 1 building pub has_pending_tx: Arc>, - } - -impl Default for Vm - -{ +impl Default for Vm { fn default() -> Self { Self::new() } @@ -177,323 +180,213 @@ impl Vm { has_pending_tx: Arc::new(RwLock::new(false)), } } - + #[allow(dead_code)] pub async fn is_bootstrapped(&self) -> bool { let vm_state = self.state.read().await; vm_state.bootstrapped } + fn process_response< + T: poem_openapi::types::ToJSON + Send + Sync + serde::Serialize, + E: ToString + std::fmt::Debug, + >( + &self, + ret: Result, E>, + ) -> RpcRes { + let mut ret_str = "".to_string(); + let mut error = None; + let mut header_str = "".to_string(); + if ret.is_err() { + error = Some(ret.err().unwrap().to_string()); + } else { + let ret = ret.unwrap(); + let header; + ret_str = match ret { + BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { + header = AptosHeader { + chain_id: a, + ledger_version: b, + ledger_oldest_version: d, + ledger_timestamp_usec: e, + epoch: f, + block_height: g, + oldest_block_height: h, + cursor: k, + }; + match c { + AptosResponseContent::Json(json) => serde_json::to_string(&json.0).unwrap(), + AptosResponseContent::Bcs(bytes) => { + format!("{}", hex::encode(bytes.0)) + }, + } + }, + }; + header_str = serde_json::to_string(&header).unwrap(); + } + + RpcRes { + data: ret_str, + header: header_str, + error, + } + } + pub async fn get_transactions(&self, args: PageArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); - let ret = transactions_api.get_transactions_raw(accept, args.start, args.limit).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .transactions_api + .get_transactions_raw(accept, args.start, args.limit) + .await; + self.process_response(ret) } pub async fn get_block_by_height(&self, args: BlockArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let blocks_api = self.api_service.as_ref().unwrap().blocks_api.clone(); - let ret = blocks_api.get_block_by_height_raw(accept, args.height_or_version, args.with_transactions).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .blocks_api + .get_block_by_height_raw(accept, args.height_or_version, args.with_transactions) + .await; + + self.process_response(ret) } pub async fn get_block_by_version(&self, args: BlockArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let blocks_api = self.api_service.as_ref().unwrap().blocks_api.clone(); - let ret = blocks_api.get_block_by_version_raw(accept, args.height_or_version, args.with_transactions).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .blocks_api + .get_block_by_version_raw(accept, args.height_or_version, args.with_transactions) + .await; + self.process_response(ret) } pub async fn get_accounts_transactions(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let account = args.data.as_str(); - let accounts_api = self.api_service.as_ref().unwrap().accounts_api.clone(); - let start = match args.start { - None => None, - Some(_) => Some(StateKeyWrapper::from_str(args.start.unwrap().as_str()).unwrap()) - }; - let ret = accounts_api.get_account_resources_raw( - accept, - Address::from_str(account).unwrap(), - args.ledger_version, - start, - args.limit, - ).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let mut start = None; + if let Some(s_) = args.start { + let s: Result = s_.parse(); + start = Some(U64::from(s.unwrap())) + } + let ret = api + .transactions_api + .get_accounts_transactions_raw( + accept, + Address::from_str(account).unwrap(), + start, + args.limit, + ) + .await; + self.process_response(ret) } pub async fn get_account_resources(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let account = args.data.as_str(); - let accounts_api = self.api_service.as_ref().unwrap().accounts_api.clone(); + let api = self.api_service.as_ref().unwrap(); let start = match args.start { None => None, - Some(_) => Some(StateKeyWrapper::from_str(args.start.unwrap().as_str()).unwrap()) + Some(_) => Some(StateKeyWrapper::from_str(args.start.unwrap().as_str()).unwrap()), }; - let ret = accounts_api.get_account_resources_raw( - accept, - Address::from_str(account).unwrap(), - args.ledger_version, - start, - args.limit, - ).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let ret = api + .accounts_api + .get_account_resources_raw( + accept, + Address::from_str(account).unwrap(), + args.ledger_version, + start, + args.limit, + ) + .await; + self.process_response(ret) } pub async fn get_account(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let account = args.data.as_str(); - let accounts_api = self.api_service.as_ref().unwrap().accounts_api.clone(); - let ret = accounts_api.get_account_raw(accept, - Address::from_str(account).unwrap(), args.ledger_version).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .accounts_api + .get_account_raw( + accept, + Address::from_str(account).unwrap(), + args.ledger_version, + ) + .await; + self.process_response(ret) } pub async fn get_account_modules_state(&self, args: AccountStateArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let account = args.account.as_str(); let module_name = args.resource.as_str(); - let module_name = IdentifierWrapper::from_str(module_name).unwrap().clone(); - let state_api = self.api_service.as_ref().unwrap().state_api.clone(); - let ret = state_api.get_account_module_raw( - accept, - Address::from_str(account).unwrap(), - module_name, args.ledger_version).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let module_name = IdentifierWrapper::from_str(module_name).unwrap(); + let api = self.api_service.as_ref().unwrap(); + let ret = api + .state_api + .get_account_module_raw( + accept, + Address::from_str(account).unwrap(), + module_name, + args.ledger_version, + ) + .await; + self.process_response(ret) } pub async fn get_account_resources_state(&self, args: AccountStateArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let account = args.account.as_str(); let resource = args.resource.as_str(); - let state_api = self.api_service.as_ref().unwrap().state_api.clone(); - let ret = state_api.get_account_resource_raw(accept, - Address::from_str(account).unwrap(), - MoveStructTag::from_str(resource).unwrap(), - args.ledger_version).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .state_api + .get_account_resource_raw( + accept, + Address::from_str(account).unwrap(), + MoveStructTag::from_str(resource).unwrap(), + args.ledger_version, + ) + .await; + self.process_response(ret) } pub async fn get_account_modules(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json @@ -501,330 +394,226 @@ impl Vm { let account = args.data.as_str(); let start = match args.start { None => None, - Some(_) => Some(StateKeyWrapper::from_str(args.start.unwrap().as_str()).unwrap()) + Some(_) => Some(StateKeyWrapper::from_str(args.start.unwrap().as_str()).unwrap()), }; - let accounts_api = self.api_service.as_ref().unwrap().accounts_api.clone(); + let api = self.api_service.as_ref().unwrap(); let address = Address::from_str(account).unwrap(); - let ret = accounts_api.get_account_modules_raw(accept, - address, - args.ledger_version, - start, - args.limit).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let ret = api + .accounts_api + .get_account_modules_raw(accept, address, args.ledger_version, start, args.limit) + .await; + + self.process_response(ret) } pub async fn get_ledger_info(&self) -> RpcRes { - let index_api = self.api_service.as_ref().unwrap().index_api.clone(); - let ret = index_api.get_ledger_info_raw(AcceptType::Json).await.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api.index_api.get_ledger_info_raw(AcceptType::Json).await; + self.process_response(ret) } pub async fn view_function(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let view_function_api = self.api_service.as_ref().unwrap().view_function_api.clone(); + let api = self.api_service.as_ref().unwrap(); let req = serde_json::from_str::(args.data.as_str()).unwrap(); - let ret = view_function_api.view_function_raw( - accept, - req, - args.ledger_version, - ).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let ret = api + .view_function_api + .view_function_raw(accept, req, args.ledger_version) + .await; + + self.process_response(ret) } pub async fn get_transaction_by_hash(&self, args: RpcReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let h = args.data.as_str(); + let mut h = args.data.as_str(); + if h.starts_with("0x") { + h = &h[2..]; + } let h1 = HashValue::from_hex(h).unwrap(); let hash = aptos_api_types::hash::HashValue::from(h1); - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); - let ret = transactions_api.get_transaction_by_hash_raw(accept, - hash).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api.transactions_api.get_transaction_by_hash_raw(accept, hash).await; + self.process_response(ret) } pub async fn get_transaction_by_version(&self, args: GetTransactionByVersionArgs) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); - let ret = transactions_api.get_transaction_by_version_raw(accept, - args.version).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .transactions_api + .get_transaction_by_version_raw(accept, args.version) + .await; + self.process_response(ret) } pub async fn encode_submission(&self, data: &str) -> RpcRes { - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); + let service = self.api_service.as_ref().unwrap(); let payload = serde_json::from_str::(data).unwrap(); - let ret = - transactions_api.encode_submission_raw(AcceptType::Json, payload).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let ret = service + .transactions_api + .encode_submission_raw(AcceptType::Json, payload) + .await; + self.process_response(ret) } pub async fn submit_transaction(&self, data: Vec, accept: AcceptType) -> RpcRes { - log::info!("submit_transaction length {}",{data.len()}); - let transacions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); + log::info!("submit_transaction length {}", { data.len() }); + let service = self.api_service.as_ref().unwrap(); let payload = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(data.clone())); - let ret = - transacions_api.submit_transaction_raw(accept, payload).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - SubmitTransactionResponse::Accepted(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) + let ret = service.transactions_api.submit_transaction_raw(accept, payload).await; + let mut ret_str = "".to_string(); + let mut error = None; + let mut header_str = "".to_string(); + if ret.is_err() { + error = Some(ret.err().unwrap().to_string()); + } else { + let ret = ret.unwrap(); + let header; + ret_str = match ret { + SubmitTransactionResponse::Accepted(c, a, b, d, e, f, g, h, k) => { + header = AptosHeader { + chain_id: a, + ledger_version: b, + ledger_oldest_version: d, + ledger_timestamp_usec: e, + epoch: f, + block_height: g, + oldest_block_height: h, + cursor: k, + }; + match c { + AptosResponseContent::Json(json) => serde_json::to_string(&json.0).unwrap(), + AptosResponseContent::Bcs(bytes) => { + format!("{}", hex::encode(bytes.0)) + }, } - } + }, + }; + header_str = serde_json::to_string(&header).unwrap(); + let signed_transaction: SignedTransaction = + bcs::from_bytes_with_limit(&data, MAX_RECURSIVE_TYPES_ALLOWED as usize).unwrap(); + let sender = self.app_sender.as_ref().unwrap(); + sender + .send_app_gossip(serde_json::to_vec(&signed_transaction.clone()).unwrap()) + .await + .unwrap(); + self.add_pool(signed_transaction).await; + if data.len() >= 50 * 1024 { + self.inner_build_block(self.build_block_data().await.unwrap()) + .await + .unwrap(); + } else { + self.notify_block_ready().await; } - }; - let signed_transaction: SignedTransaction = - bcs::from_bytes_with_limit(&data, - MAX_RECURSIVE_TYPES_ALLOWED as usize).unwrap(); - let sender = self.app_sender.as_ref().unwrap(); - sender.send_app_gossip(serde_json::to_vec(&signed_transaction.clone()).unwrap()).await.unwrap(); - self.add_pool(signed_transaction).await; - self.notify_block_ready().await; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + } + RpcRes { + data: ret_str, + header: header_str, + error, + } } pub async fn submit_transaction_batch(&self, data: Vec, accept: AcceptType) -> RpcRes { - log::info!("submit_transaction_batch length {}",{data.len()}); - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); + log::info!("submit_transaction_batch length {}", { data.len() }); + let service = self.api_service.as_ref().unwrap(); let payload = SubmitTransactionsBatchPost::Bcs(aptos_api::bcs_payload::Bcs(data.clone())); - let ret = transactions_api.submit_transactions_batch_raw(accept, - payload).await; - let ret = ret.unwrap(); - let mut failed_index = vec![]; - let header; - let ret = match ret { - SubmitTransactionsBatchResponse::Accepted(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() + let ret = service + .transactions_api + .submit_transactions_batch_raw(accept, payload) + .await; + let mut ret_str = "".to_string(); + let mut error = None; + let mut header_str = "".to_string(); + if ret.is_err() { + error = Some(ret.err().unwrap().to_string()); + } else { + let ret = ret.unwrap(); + let mut failed_index = vec![]; + let header; + ret_str = match ret { + SubmitTransactionsBatchResponse::Accepted(c, a, b, d, e, f, g, h, k) => { + header = AptosHeader { + chain_id: a, + ledger_version: b, + ledger_oldest_version: d, + ledger_timestamp_usec: e, + epoch: f, + block_height: g, + oldest_block_height: h, + cursor: k, + }; + match c { + AptosResponseContent::Json(json) => serde_json::to_string(&json.0).unwrap(), + AptosResponseContent::Bcs(bytes) => { + format!("{}", hex::encode(bytes.0)) + }, } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) + }, + SubmitTransactionsBatchResponse::AcceptedPartial(c, a, b, d, e, f, g, h, k) => { + header = AptosHeader { + chain_id: a, + ledger_version: b, + ledger_oldest_version: d, + ledger_timestamp_usec: e, + epoch: f, + block_height: g, + oldest_block_height: h, + cursor: k, + }; + match c { + AptosResponseContent::Json(json) => { + for x in &json.transaction_failures { + failed_index.push(x.transaction_index.clone()); + } + serde_json::to_string(&json.0).unwrap() + }, + AptosResponseContent::Bcs(bytes) => { + format!("{}", hex::encode(bytes.0)) + }, } + }, + }; + header_str = serde_json::to_string(&header).unwrap(); + let signed_transactions: Vec = bcs::from_bytes(&data).unwrap(); + let sender = self.app_sender.as_ref().unwrap(); + let mut exist_count = 0; + for (i, signed_transaction) in signed_transactions.iter().enumerate() { + if !failed_index.contains(&i) { + sender + .send_app_gossip(serde_json::to_vec(signed_transaction).unwrap()) + .await + .unwrap(); + self.add_pool(signed_transaction.clone()).await; + } else { + exist_count += 1; } } - SubmitTransactionsBatchResponse::AcceptedPartial(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - for x in &json.transaction_failures { - failed_index.push(x.transaction_index.clone()); - } - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - let signed_transactions: Vec = - bcs::from_bytes(&data).unwrap(); - let sender = self.app_sender.as_ref().unwrap(); - let mut exist_count = 0; - for (i, signed_transaction) in signed_transactions.iter().enumerate() { - if !failed_index.contains(&i) { - sender.send_app_gossip(serde_json::to_vec(signed_transaction).unwrap()).await.unwrap(); - self.add_pool(signed_transaction.clone()).await; - } else { - exist_count += 1; + if exist_count > 0 { + self.notify_block_ready().await; } } - if exist_count > 0 { - self.notify_block_ready().await; + RpcRes { + data: ret_str, + header: header_str, + error, } - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } } + pub async fn get_table_item(&self, args: RpcTableReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json @@ -832,41 +621,21 @@ impl Vm { let account = args.query; let body = args.body; let payload = serde_json::from_str::(body.as_str()).unwrap(); - let state_api = self.api_service.as_ref().unwrap().state_api.clone(); - let ret = state_api.get_table_item_raw( - accept, - Address::from_str(account.as_str()).unwrap(), - payload, - args.ledger_version).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .state_api + .get_table_item_raw( + accept, + Address::from_str(account.as_str()).unwrap(), + payload, + args.ledger_version, + ) + .await; + self.process_response(ret) } pub async fn get_raw_table_item(&self, args: RpcTableReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json @@ -874,138 +643,105 @@ impl Vm { let account = args.query; let body = args.body; let payload = serde_json::from_str::(body.as_str()).unwrap(); - let state_api = self.api_service.as_ref().unwrap().state_api.clone(); - let ret = state_api.get_raw_table_item_raw( - accept, - Address::from_str(account.as_str()).unwrap(), - payload, - args.ledger_version).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .state_api + .get_raw_table_item_raw( + accept, + Address::from_str(account.as_str()).unwrap(), + payload, + args.ledger_version, + ) + .await; + self.process_response(ret) } pub async fn get_events_by_creation_number(&self, args: RpcEventNumReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; - let events_api = self.api_service.as_ref().unwrap().events_api.clone(); - let ret = events_api.get_events_by_creation_number_raw( - accept, - Address::from_str(args.address.as_str()).unwrap(), - args.creation_number, - args.start, args.limit).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .events_api + .get_events_by_creation_number_raw( + accept, + Address::from_str(args.address.as_str()).unwrap(), + args.creation_number, + args.start, + args.limit, + ) + .await; + self.process_response(ret) } + pub async fn get_events_by_event_handle(&self, args: RpcEventHandleReq) -> RpcRes { - let accept = if args.is_bsc_format.unwrap_or(false) { + let accept = if args.is_bcs_format.unwrap_or(false) { AcceptType::Bcs } else { AcceptType::Json }; let event_handle = MoveStructTag::from_str(args.event_handle.as_str()).unwrap(); let field_name = IdentifierWrapper::from_str(args.field_name.as_str()).unwrap(); - let events_api = self.api_service.as_ref().unwrap().events_api.clone(); - let ret = events_api.get_events_by_event_handle_raw( - accept, - Address::from_str(args.address.as_str()).unwrap(), - event_handle, - field_name, args.start, args.limit).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } + let api = self.api_service.as_ref().unwrap(); + let ret = api + .events_api + .get_events_by_event_handle_raw( + accept, + Address::from_str(args.address.as_str()).unwrap(), + event_handle, + field_name, + args.start, + args.limit, + ) + .await; + self.process_response(ret) + } + + pub async fn simulate_transaction(&self, data: Vec, accept: AcceptType) -> RpcRes { + let service = self.api_service.as_ref().unwrap(); + let ret = service + .transactions_api + .simulate_transaction_raw( + accept, + Some(true), + Some(false), + Some(true), + SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(data)), + ) + .await; + self.process_response(ret) + } + + pub async fn estimate_gas_price(&self) -> RpcRes { + let service = self.api_service.as_ref().unwrap(); + let ret = service.transactions_api.estimate_gas_price_raw(AcceptType::Json).await; + self.process_response(ret) } async fn add_pool(&self, signed_transaction: SignedTransaction) { let mut core_pool = self.core_mempool.as_ref().unwrap().write().await; - core_pool.add_txn(signed_transaction.clone(), - 0, - signed_transaction.clone().sequence_number(), - TimelineState::NonQualified, false); + core_pool.add_txn( + signed_transaction.clone(), + 0, + signed_transaction.clone().sequence_number(), + TimelineState::NonQualified, + true, + ); drop(core_pool); } + async fn get_pending_tx(&self, count: u64) -> Vec { let core_pool = self.core_mempool.as_ref().unwrap().read().await; - core_pool.get_batch(count, - 1024 * 5 * 1000, - true, - true, vec![]) + core_pool.get_batch(count, 1024 * 5 * 1000, true, true, vec![]) } async fn check_pending_tx(&self) { let shared_self = Arc::new(self.clone()); let check_duration = Duration::from_millis(2000); tokio::spawn(async move { + let mut last_check_build_time = get_current_time_seconds(); loop { _ = tokio::time::sleep(check_duration).await; let status = shared_self.build_status.try_read(); @@ -1013,22 +749,34 @@ impl Vm { Ok(s_) => { let s = s_.clone(); drop(s_); - if let 0 = s { + if s == 0 { + // build finished + last_check_build_time = get_current_time_seconds(); let more = shared_self.has_pending_tx.try_read(); match more { Ok(t_) => { let t = t_.clone(); drop(t_); if t == true { + // build finished but the memory pool still has pending transactions shared_self.update_pending_tx_flag(false).await; shared_self.notify_block_ready2().await; } - } - _ => {} + }, + _ => {}, + } + } else { + // still pending + let now = get_current_time_seconds(); + if (now - last_check_build_time) > 120 { + // 120s + // timeout for build, we can send more pending tx to the engine + shared_self.update_build_block_status(0).await; + last_check_build_time = get_current_time_seconds(); } } - } - _ => {} + }, + _ => {}, } } }); @@ -1056,7 +804,7 @@ impl Vm { }; if send_result.is_ok() { self.update_build_block_status(1).await; - println!("----------notify_block_ready----success------------------"); + log::info!("notify_block_ready:success"); } else { log::info!("send tx to_engine error ") } @@ -1073,107 +821,62 @@ impl Vm { drop(tx_); drop(status_); match status { - 1 => { // building + 1 => { + // building if tx == false { self.update_pending_tx_flag(true).await; - } else {} - println!("----------notify_block_ready----ignore------------------"); - } - 0 => {// done + } else { + } + log::info!("notify_block_ready ignore"); + }, + 0 => { + // done self.notify_block_ready2().await; - } - _ => {} + }, + _ => {}, } } - pub async fn simulate_transaction(&self, data: Vec, accept: AcceptType) -> RpcRes { - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); - let ret = transactions_api.simulate_transaction_raw( - accept, - Some(true), - Some(false), - Some(true), - SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(data))).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } - } - - pub async fn estimate_gas_price(&self) -> RpcRes { - let transactions_api = self.api_service.as_ref().unwrap().transactions_api.clone(); - let ret = transactions_api.estimate_gas_price_raw( - AcceptType::Json).await; - let ret = ret.unwrap(); - let header; - let ret = match ret { - BasicResponse::Ok(c, a, b, d, e, f, g, h, k) => { - header = AptosHeader { - chain_id: a, - ledger_version: b, - ledger_oldest_version: d, - ledger_timestamp_usec: e, - epoch: f, - block_height: g, - oldest_block_height: h, - cursor: k, - }; - match c { - AptosResponseContent::Json(json) => { - serde_json::to_string(&json.0).unwrap() - } - AptosResponseContent::Bcs(bytes) => { - format!("{}", hex::encode(bytes.0)) - } - } - } - }; - RpcRes { data: ret, header: serde_json::to_string(&header).unwrap() } - } - pub async fn faucet_apt(&self, acc: Vec, accept: AcceptType) -> RpcRes { let to = AccountAddress::from_bytes(acc).unwrap(); let db = self.db.as_ref().unwrap().read().await; - let mut core_account = self.get_core_account(&db).await; + let core_account = self.get_core_account(&db).await; let tx_factory = TransactionFactory::new(ChainId::test()); - let tx_acc_mint = core_account - .sign_with_transaction_builder( - tx_factory.mint(to, 10 * 100_000_000) - ); - return self.submit_transaction(bcs::to_bytes(&tx_acc_mint).unwrap(), accept).await; + let tx_acc_mint = + core_account.sign_with_transaction_builder(tx_factory.mint(to, 10 * 100_000_000)); + return self + .submit_transaction(bcs::to_bytes(&tx_acc_mint).unwrap(), accept) + .await; } - pub async fn create_account(&self, key: &str, accept: AcceptType) -> RpcRes { - let to = Ed25519PublicKey::from_encoded_string(key).unwrap(); + pub async fn faucet_with_cli(&self, acc: Vec) -> RpcRes { + let to = AccountAddress::from_bytes(acc).unwrap(); let db = self.db.as_ref().unwrap().read().await; - let mut core_account = self.get_core_account(&db).await; + let core_account = self.get_core_account(&db).await; let tx_factory = TransactionFactory::new(ChainId::test()); - let tx_acc_create = core_account - .sign_with_transaction_builder( - tx_factory.create_user_account(&to) - ); - return self.submit_transaction(bcs::to_bytes(&tx_acc_create).unwrap(), accept).await; + let tx_acc_mint = + core_account.sign_with_transaction_builder(tx_factory.mint(to, 10 * 100_000_000)); + let mut res: RpcRes = self + .submit_transaction( + bcs::to_bytes(&tx_acc_mint.clone()).unwrap(), + AcceptType::Bcs, + ) + .await; + let txs = vec![tx_acc_mint]; + res.data = hex::encode(aptos_sdk::bcs::to_bytes(&txs).unwrap()); + res + } + + pub async fn create_account(&self, acc: Vec, accept: AcceptType) -> RpcRes { + let to = AccountAddress::from_bytes(acc).unwrap(); + let db = self.db.as_ref().unwrap().read().await; + let core_account = self.get_core_account(&db).await; + let tx_factory = TransactionFactory::new(ChainId::test()); + let tx_acc_create = + core_account.sign_with_transaction_builder(tx_factory.create_account(to)); + return self + .submit_transaction(bcs::to_bytes(&tx_acc_create).unwrap(), accept) + .await; } /// Sets the state of the Vm. @@ -1187,30 +890,29 @@ impl Vm { log::info!("set_state: initializing"); vm_state.bootstrapped = false; Ok(()) - } + }, snow::State::StateSyncing => { log::info!("set_state: state syncing"); Err(Error::new(ErrorKind::Other, "state sync is not supported")) - } + }, // called by the bootstrapper to signal bootstrapping has started. snow::State::Bootstrapping => { log::info!("set_state: bootstrapping"); vm_state.bootstrapped = false; Ok(()) - } + }, // called when consensus has started signalling bootstrap phase is complete. snow::State::NormalOp => { log::info!("set_state: normal op"); vm_state.bootstrapped = true; Ok(()) - } + }, } } - /// Sets the container preference of the Vm. pub async fn set_preference(&self, id: ids::Id) -> io::Result<()> { let mut vm_state = self.state.write().await; @@ -1233,7 +935,10 @@ impl Vm { let acc = aptos_test_root_address(); let state_proof = db.reader.get_latest_ledger_info().unwrap(); let current_version = state_proof.ledger_info().version(); - let db_state_view = db.reader.state_view_at_version(Some(current_version)).unwrap(); + let db_state_view = db + .reader + .state_view_at_version(Some(current_version)) + .unwrap(); let view = db_state_view.as_account_with_state_view(&acc); let av = view.get_account_resource().unwrap(); let sn = av.unwrap().sequence_number(); @@ -1244,64 +949,46 @@ impl Vm { ) } - pub async fn inner_build_block(&self, data: Vec) -> io::Result<()> { - - - // obtain the executor guard for reading let executor = self.executor.as_ref().unwrap().read().await; - - // build the aptos data from the slice - let AptosData(txs, block_id, parent_block_id, next_epoch, ts) - = serde_json::from_slice::(&data).unwrap(); - let block_tx = serde_json::from_slice::>(&txs).unwrap(); + let aptos_data = serde_json::from_slice::(&data).unwrap(); + let block_tx = serde_json::from_slice::>(&aptos_data.0).unwrap(); let block_meta = block_tx.get(0).unwrap().try_as_block_metadata().unwrap(); - let block_id_now = block_meta.id(); - - if block_id_now.ne(&block_id) { - return Err(Error::new( - ErrorKind::Interrupted, - "block format error", - )); - } - - let parent_block_id_now = executor.committed_block_id(); - if parent_block_id.ne(&parent_block_id_now) { - return Err(Error::new( - ErrorKind::Interrupted, - "block error,maybe not sync ", - )); - } - println!("------------inner_build_block {}----", block_id); - - // execute the block - let output = executor - .execute_block( - ExecutableBlock::new( - block_id, - ExecutableTransactions::Unsharded(block_tx.clone()), - ), - parent_block_id, - None - ).unwrap(); - - // sign for the the ledger - let ledger_info = LedgerInfo::new( - BlockInfo::new( - next_epoch, - 0, + let block_id = block_meta.id(); + let parent_block_id = executor.committed_block_id(); + let next_epoch = aptos_data.3; + let ts = aptos_data.4; + match executor.execute_block( + ExecutableBlock::new( block_id, - output.root_hash(), - output.version(), - ts, - output.epoch_state().clone(), + ExecutableTransactions::Unsharded(block_tx.clone()), ), - HashValue::zero(), - ); - let li = generate_ledger_info_with_sig(&[self.signer.as_ref().unwrap().clone()], ledger_info); - executor.commit_blocks(vec![block_id], li.clone()).unwrap(); - - // write the transactions to the mempool + parent_block_id, + None, + ) { + Ok(output) => { + let ledger_info = LedgerInfo::new( + BlockInfo::new( + next_epoch, + 0, + block_id, + output.root_hash(), + output.version(), + ts, + output.epoch_state().clone(), + ), + HashValue::zero(), + ); + let li = generate_ledger_info_with_sig( + &[self.signer.as_ref().unwrap().clone()], + ledger_info, + ); + executor.commit_blocks(vec![block_id], li.clone()).unwrap(); + }, + Err(err) => { + log::info!("inner build error {}", err); + }, + } let mut core_pool = self.core_mempool.as_ref().unwrap().write().await; for t in block_tx.iter() { match t { @@ -1309,8 +996,8 @@ impl Vm { let sender = t.sender(); let sequence_number = t.sequence_number(); core_pool.commit_transaction(&AccountAddress::from(sender), sequence_number); - } - _ => {} + }, + _ => {}, } } drop(core_pool); @@ -1318,7 +1005,7 @@ impl Vm { Ok(()) } - async fn init_aptos(&mut self, uuid : &str) { + async fn init_aptos(&mut self, uuid: &str) { let db_name = get_db_name(uuid); @@ -1339,15 +1026,17 @@ impl Vm { fs::create_dir_all(p.as_str()).unwrap(); } - // initialize aptos db - let db = DbReaderWriter::wrap( - AptosDB::new_for_test(p.as_str())); + + if !fs::metadata(p.clone().as_str()).is_ok() { + fs::create_dir_all(p.as_str()).unwrap(); + } + let db = DbReaderWriter::wrap(AptosDB::new_for_test(p.as_str())); let waypoint = generate_waypoint::(&db.1, &genesis_txn); match waypoint { Ok(w) => { maybe_bootstrap::(&db.1, &genesis_txn, w).unwrap(); - } - _ => {} + }, + _ => {}, } // BLOCK-STM // AptosVM::set_concurrency_level_once(2); @@ -1355,48 +1044,21 @@ impl Vm { let executor = BlockExecutor::new(db.1.clone()); self.executor = Some(Arc::new(RwLock::new(executor))); - // set up the mempool - let (mempool_client_sender, - mut mempool_client_receiver) = futures_mpsc::channel::(10); - let sender = MempoolClientSender::from(mempool_client_sender.clone()); - let mut node_config = NodeConfig::default(); - - - node_config.indexer.enabled = true; - // indexer config - node_config.indexer.processor = Some("default_processor".to_string()); - node_config.indexer.check_chain_id = Some(false); - node_config.indexer.skip_migrations = Some(false); - node_config.indexer.fetch_tasks = Some(4); - node_config.indexer.processor_tasks = Some(4); - node_config.indexer.emit_every = Some(4); - node_config.indexer.batch_size = Some(8); - node_config.indexer.gap_lookback_versions = Some(4); - - node_config.indexer_grpc.enabled = true; - - // indexer_grpc config - // let processor_task_count = node_config.indexer_grpc.processor_task_count.unwrap(); - // let processor_batch_size = node_config.indexer_grpc.processor_batch_size.unwrap(); - // let output_batch_size = node_config.indexer_grpc.output_batch_size.unwrap(); - // let address = node_config.indexer_grpc.address.clone().unwrap(); - node_config.indexer_grpc.processor_batch_size = Some(4); - node_config.indexer_grpc.processor_task_count = Some(4); - node_config.indexer_grpc.output_batch_size = Some(4); - node_config.indexer_grpc.address = Some(String::from("0.0.0.0")); - - let context = Context::new(ChainId::test(), - db.1.reader.clone(), - sender, node_config.clone()); - - // initialze the raw api + let (mempool_client_sender, mut mempool_client_receiver) = + futures_mpsc::channel::(10); + let sender = MempoolClientSender::from(mempool_client_sender); + let node_config = NodeConfig::default(); + let context = Context::new( + ChainId::test(), + db.1.reader.clone(), + sender, + node_config.clone(), + ); self.api_context = Some(context.clone()); - let service = get_raw_api_service(Arc::new(context.clone())); + let service = get_raw_api_service(Arc::new(context)); self.api_service = Some(service); self.core_mempool = Some(Arc::new(RwLock::new(CoreMempool::new(&node_config)))); self.check_pending_tx().await; - - // sart the mempool client tokio::task::spawn(async move { while let Some(request) = mempool_client_receiver.next().await { match request { @@ -1404,48 +1066,52 @@ impl Vm { // accept all the transaction let ms = MempoolStatus::new(MempoolStatusCode::Accepted); let status: SubmissionStatus = (ms, None); - callback.send( - Ok(status) - ).unwrap(); - } - MempoolClientRequest::GetTransactionByHash(_, _) => {} + callback.send(Ok(status)).unwrap(); + }, + MempoolClientRequest::GetTransactionByHash(_, _) => {}, } } }); + } - // start the indexer services (this is already sent to the background, so I think we're good) - // bootstrap_indexer_grpc(&node_config, ChainId::test(), db.1.reader.clone(), mempool_client_sender.clone()).unwrap(); - let indexer_context = Arc::new(context.clone()); - let indexer_config = node_config.indexer.clone(); - let server = FullnodeDataService { - context: indexer_context.clone(), - processor_task_count: 4, - processor_batch_size: 4, - output_batch_size: 4, - }; - - tokio::spawn(async move { - - println!("Starting indexer gRPC service."); - match Server::builder() - .add_service(FullnodeDataServer::new(server)) - .serve(String::from("0.0.0.0:8090").to_socket_addrs().unwrap().next().unwrap()) - .await - { - Ok(_) => { - println!("Indexer "); - }, - Err(e) => { - eprintln!("Failed to start server"); - } - } - - }); - tokio::spawn(async move { - println!("Starting indexer trailer."); - run_forever(indexer_config, indexer_context.clone()).await; - }); - + async fn build_block_data(&self) -> Result, Error> { + let unix_now_micro = Utc::now().timestamp_micros() as u64; + let tx_arr = self.get_pending_tx(500).await; + let len = tx_arr.clone().len(); + log::info!("build_block pool tx count {}", len); + let executor = self.executor.as_ref().unwrap().read().await; + let signer = self.signer.as_ref().unwrap(); + let db = self.db.as_ref().unwrap().read().await; + let latest_ledger_info = db.reader.get_latest_ledger_info().unwrap(); + let next_epoch = latest_ledger_info.ledger_info().next_block_epoch(); + let block_id = HashValue::random(); + let block_meta = Transaction::BlockMetadata(BlockMetadata::new( + block_id, + next_epoch, + 0, + signer.author(), + vec![], + vec![], + unix_now_micro, + )); + let mut txs = vec![]; + for tx in tx_arr.iter() { + txs.push(UserTransaction(tx.clone())); + } + let mut block_tx: Vec<_> = vec![]; + block_tx.push(block_meta); + block_tx.append(&mut txs); + block_tx.push(Transaction::StateCheckpoint(HashValue::random())); + let parent_block_id = executor.committed_block_id(); + let block_tx_bytes = serde_json::to_vec(&block_tx).unwrap(); + let data = AptosData( + block_tx_bytes, + block_id.clone(), + parent_block_id, + next_epoch, + unix_now_micro, + ); + Ok(serde_json::to_vec(&data).unwrap()) } } @@ -1475,78 +1141,45 @@ impl BatchedChainVm for Vm { } #[tonic::async_trait] -impl ChainVm for Vm -{ +impl ChainVm for Vm { type Block = Block; - async fn build_block( - &self, - ) -> io::Result<::Block> { + async fn build_block(&self) -> io::Result<::Block> { let vm_state = self.state.read().await; if let Some(state_b) = vm_state.state.as_ref() { let prnt_blk = state_b.get_block(&vm_state.preferred).await.unwrap(); let unix_now = Utc::now().timestamp() as u64; - let unix_micro = Utc::now().timestamp_micros() as u64; - let tx_arr = self.get_pending_tx(10000).await; - println!("----build_block pool tx count-------{}------", tx_arr.clone().len()); - let executor = self.executor.as_ref().unwrap().read().await; - let signer = self.signer.as_ref().unwrap(); - let db = self.db.as_ref().unwrap().read().await; - let latest_ledger_info = db.reader.get_latest_ledger_info().unwrap(); - let next_epoch = latest_ledger_info.ledger_info().next_block_epoch(); - let block_id = HashValue::random(); - let block_meta = Transaction::BlockMetadata(BlockMetadata::new( - block_id, - next_epoch, - 0, - signer.author(), - vec![], - vec![], - unix_micro, - )); - let mut txs = vec![]; - for tx in tx_arr.iter() { - txs.push(UserTransaction(tx.clone())); - } - let mut block_tx: Vec<_> = vec![]; - block_tx.push(block_meta); - block_tx.append(&mut txs); - block_tx.push(Transaction::StateCheckpoint(HashValue::random())); - let parent_block_id = executor.committed_block_id(); - let block_tx_bytes = serde_json::to_vec(&block_tx).unwrap(); - let data = AptosData(block_tx_bytes, - block_id.clone(), - parent_block_id, - next_epoch, - unix_micro); - + // now we allow to build empty block + // if len == 0 { + // self.update_build_block_status(0).await; + // self.update_pending_tx_flag(false).await; + // return Err(Error::new( + // ErrorKind::Other, + // "no pending transaction found", + // )); + // } + let data = self.build_block_data().await.unwrap(); let mut block_ = Block::new( prnt_blk.id(), prnt_blk.height() + 1, unix_now, - serde_json::to_vec(&data).unwrap(), + data, choices::status::Status::Processing, - ).unwrap(); + ) + .unwrap(); block_.set_state(state_b.clone()); - println!("--------vm_build_block------{}---", block_.id()); block_.verify().await.unwrap(); return Ok(block_); } - Err(Error::new( - ErrorKind::Other, - "not implement", - )) + Err(Error::new(ErrorKind::Other, "not implement")) } - async fn issue_tx( - &self, - ) -> io::Result<::Block> { + async fn issue_tx(&self) -> io::Result<::Block> { Err(Error::new( ErrorKind::Unsupported, "issue_tx not implemented", )) } - async fn set_preference(&self, id: ids::Id) -> io::Result<()> { self.set_preference(id).await } @@ -1567,12 +1200,10 @@ impl ChainVm for Vm async fn state_sync_enabled(&self) -> io::Result { Ok(false) } - } #[tonic::async_trait] -impl NetworkAppHandler for Vm -{ +impl NetworkAppHandler for Vm { /// Currently, no app-specific messages, so returning Ok. async fn app_request( &self, @@ -1607,16 +1238,15 @@ impl NetworkAppHandler for Vm match serde_json::from_slice::(msg) { Ok(s) => { self.add_pool(s).await; - } - Err(_) => {} + }, + Err(_) => {}, } Ok(()) } } #[tonic::async_trait] -impl CrossChainAppHandler for Vm -{ +impl CrossChainAppHandler for Vm { /// Currently, no cross chain specific messages, so returning Ok. async fn cross_chain_app_request( &self, @@ -1651,9 +1281,7 @@ impl CrossChainAppHandler for Vm impl AppHandler for Vm {} #[tonic::async_trait] -impl Connector for Vm - -{ +impl Connector for Vm { async fn connected(&self, _id: &ids::node::Id) -> io::Result<()> { // no-op Ok(()) @@ -1666,22 +1294,17 @@ impl Connector for Vm } #[tonic::async_trait] -impl Checkable for Vm -{ +impl Checkable for Vm { async fn health_check(&self) -> io::Result> { Ok("200".as_bytes().to_vec()) } } #[tonic::async_trait] -impl Getter for Vm -{ +impl Getter for Vm { type Block = Block; - async fn get_block( - &self, - blk_id: ids::Id, - ) -> io::Result<::Block> { + async fn get_block(&self, blk_id: ids::Id) -> io::Result<::Block> { let vm_state = self.state.read().await; if let Some(state) = &vm_state.state { let mut block = state.get_block(&blk_id).await?; @@ -1695,13 +1318,9 @@ impl Getter for Vm } #[tonic::async_trait] -impl Parser for Vm -{ +impl Parser for Vm { type Block = Block; - async fn parse_block( - &self, - bytes: &[u8], - ) -> io::Result<::Block> { + async fn parse_block(&self, bytes: &[u8]) -> io::Result<::Block> { let vm_state = self.state.read().await; if let Some(state) = vm_state.state.as_ref() { let mut new_block = Block::from_slice(bytes)?; @@ -1710,12 +1329,8 @@ impl Parser for Vm new_state.set_vm(self.clone()); new_block.set_state(new_state); return match state.get_block(&new_block.id()).await { - Ok(prev) => { - Ok(prev) - } - Err(_) => { - Ok(new_block) - } + Ok(prev) => Ok(prev), + Err(_) => Ok(new_block), }; } Err(Error::new(ErrorKind::NotFound, "state manager not found")) @@ -1723,27 +1338,28 @@ impl Parser for Vm } #[tonic::async_trait] -impl CommonVm for Vm -{ +impl CommonVm for Vm { type DatabaseManager = DatabaseManager; type AppSender = AppSenderClient; type ChainHandler = ChainHandler; type StaticHandler = StaticHandler; type ValidatorState = ValidatorStateClient; - async fn initialize( &mut self, ctx: Option>, db_manager: Self::DatabaseManager, - genesis_bytes: &[u8], + _genesis_bytes: &[u8], _upgrade_bytes: &[u8], _config_bytes: &[u8], to_engine: Sender, _fxs: &[snow::engine::common::vm::Fx], app_sender: Self::AppSender, ) -> io::Result<()> { - let uuid = Uuid::new_v4().to_string(); + + let uuid = std::env::var("M1_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string()); log::info!("initializing M1 Vm {}", uuid); + + let mut vm_state = self.state.write().await; vm_state.ctx = ctx.clone(); let current = db_manager.current().await?; @@ -1757,7 +1373,6 @@ impl CommonVm for Vm self.app_sender = Some(app_sender); drop(vm_state); - self.init_aptos(&uuid).await; let mut vm_state = self.state.write().await; let genesis = "hello world"; @@ -1767,18 +1382,21 @@ impl CommonVm for Vm vm_state.preferred = last_accepted_blk_id; } else { let genesis_bytes = genesis.as_bytes().to_vec(); - let data = AptosData(genesis_bytes.clone(), - HashValue::zero(), - HashValue::zero(), - 0, - 0); + let data = AptosData( + genesis_bytes.clone(), + HashValue::zero(), + HashValue::zero(), + 0, + 0, + ); let mut genesis_block = Block::new( ids::Id::empty(), 0, 0, serde_json::to_vec(&data).unwrap(), choices::status::Status::default(), - ).unwrap(); + ) + .unwrap(); genesis_block.set_state(state.clone()); genesis_block.accept().await?; @@ -1835,4 +1453,11 @@ impl CommonVm for Vm Ok(handlers) } -} \ No newline at end of file +} + +fn get_current_time_seconds() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Failed to get timestamp") + .as_secs() +}