Skip to content

Commit

Permalink
[api] migrate event and transaction schemas
Browse files Browse the repository at this point in the history
Add test for internal indexer

merge two traits

address review
  • Loading branch information
areshand committed Jun 17, 2024
1 parent 712d1eb commit 331364e
Show file tree
Hide file tree
Showing 61 changed files with 1,304 additions and 241 deletions.
33 changes: 32 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ members = [
"storage/db-tool",
"storage/executable-store",
"storage/indexer",
"storage/indexer_schemas",
"storage/jellyfish-merkle",
"storage/rocksdb-options",
"storage/schemadb",
Expand Down Expand Up @@ -314,6 +315,7 @@ aptos-data-client = { path = "state-sync/aptos-data-client" }
aptos-data-streaming-service = { path = "state-sync/data-streaming-service" }
aptos-db = { path = "storage/aptosdb" }
aptos-db-indexer = { path = "storage/indexer" }
aptos-db-indexer-schemas = { path = "storage/indexer_schemas" }
aptos-db-tool = { path = "storage/db-tool" }
aptos-debugger = { path = "crates/aptos-debugger" }
aptos-dkg = { path = "crates/aptos-dkg" }
Expand Down
16 changes: 4 additions & 12 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,8 @@ impl Account {
let state_view = self
.context
.latest_state_view_poem(&self.latest_ledger_info)?;
let converter = state_view.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
);
let converter = state_view
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone());
let converted_resources = converter
.try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice())))
.context("Failed to build move resource response from data in DB")
Expand Down Expand Up @@ -522,10 +520,7 @@ impl Account {
self.context.state_view(Some(self.ledger_version))?;

let bytes = state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.find_resource(&state_view, self.address, resource_type)
.context(format!(
"Failed to query DB to check for {} at {}",
Expand All @@ -543,10 +538,7 @@ impl Account {
})?;

state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.move_struct_fields(resource_type, &bytes)
.context("Failed to convert move structs from storage")
.map_err(|err| {
Expand Down
14 changes: 7 additions & 7 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use aptos_api_types::{
};
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, info, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
Expand All @@ -34,6 +33,7 @@ use aptos_types::{
chain_id::ChainId,
contract_event::EventWithVersion,
event::EventKey,
indexer::indexer_db_reader::IndexerReader,
ledger_info::LedgerInfoWithSignatures,
on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig},
state_store::{
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct Context {
gas_limit_cache: Arc<RwLock<GasLimitCache>>,
view_function_stats: Arc<FunctionStats>,
simulate_txn_stats: Arc<FunctionStats>,
pub table_info_reader: Option<Arc<dyn TableInfoReader>>,
pub indexer_reader: Option<Arc<dyn IndexerReader>>,
pub wait_for_hash_active_connections: Arc<AtomicUsize>,
}

Expand All @@ -92,7 +92,7 @@ impl Context {
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
node_config: NodeConfig,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
indexer_reader: Option<Arc<dyn IndexerReader>>,
) -> Self {
let (view_function_stats, simulate_txn_stats) = {
let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some();
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Context {
})),
view_function_stats,
simulate_txn_stats,
table_info_reader,
indexer_reader,
wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)),
}
}
Expand Down Expand Up @@ -418,7 +418,7 @@ impl Context {

// We should be able to do an unwrap here, otherwise the above db read would fail.
let state_view = self.state_view_at_version(version)?;
let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone());
let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone());

// Extract resources from resource groups and flatten into all resources
let kvs = kvs
Expand Down Expand Up @@ -618,7 +618,7 @@ impl Context {
}

let state_view = self.latest_state_view_poem(ledger_info)?;
let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone());
let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone());
let txns: Vec<aptos_api_types::Transaction> = data
.into_iter()
.map(|t| {
Expand Down Expand Up @@ -650,7 +650,7 @@ impl Context {
}

let state_view = self.latest_state_view_poem(ledger_info)?;
let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone());
let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone());
let txns: Vec<aptos_api_types::Transaction> = data
.into_iter()
.map(|t| {
Expand Down
5 changes: 1 addition & 4 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ impl EventsApi {
let events = self
.context
.latest_state_view_poem(&latest_ledger_info)?
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_versioned_events(&events)
.context("Failed to convert events from storage into response")
.map_err(|err| {
Expand Down
7 changes: 3 additions & 4 deletions api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use crate::{
};
use anyhow::Context as AnyhowContext;
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReader;
use aptos_types::chain_id::ChainId;
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use poem::{
handler,
http::Method,
Expand All @@ -35,12 +34,12 @@ pub fn bootstrap(
chain_id: ChainId,
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
indexer_reader: Option<Arc<dyn IndexerReader>>,
) -> anyhow::Result<Runtime> {
let max_runtime_workers = get_max_runtime_workers(&config.api);
let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers));

let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader);
let context = Context::new(chain_id, db, mp_sender, config.clone(), indexer_reader);

attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
.context("Failed to attach poem to runtime")?;
Expand Down
16 changes: 4 additions & 12 deletions api/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,7 @@ impl StateApi {

let (ledger_info, ledger_version, state_view) = self.context.state_view(ledger_version)?;
let bytes = state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.find_resource(&state_view, address, &tag)
.context(format!(
"Failed to query DB to check for {} at {}",
Expand All @@ -308,10 +305,7 @@ impl StateApi {
match accept_type {
AcceptType::Json => {
let resource = state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_resource(&tag, &bytes)
.context("Failed to deserialize resource data retrieved from DB")
.map_err(|err| {
Expand Down Expand Up @@ -412,10 +406,8 @@ impl StateApi {
.context
.state_view(ledger_version.map(|inner| inner.0))?;

let converter = state_view.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
);
let converter =
state_view.as_converter(self.context.db.clone(), self.context.indexer_reader.clone());

// Convert key to lookup version for DB
let vm_key = converter
Expand Down
21 changes: 6 additions & 15 deletions api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ impl TransactionsApi {
state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
self.context.indexer_reader.clone(),
)
.try_into_onchain_transaction(timestamp, txn)
.context("Failed to convert on chain transaction to Transaction")
Expand All @@ -911,10 +911,7 @@ impl TransactionsApi {
})?
},
TransactionData::Pending(txn) => state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_pending_transaction(*txn)
.context("Failed to convert on pending transaction to Transaction")
.map_err(|err| {
Expand Down Expand Up @@ -1092,10 +1089,7 @@ impl TransactionsApi {
SubmitTransactionPost::Json(data) => self
.context
.latest_state_view_poem(ledger_info)?
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_signed_transaction_poem(data.0, self.context.chain_id())
.context("Failed to create SignedTransaction from SubmitTransactionRequest")
.map_err(|err| {
Expand Down Expand Up @@ -1173,7 +1167,7 @@ impl TransactionsApi {
.enumerate()
.map(|(index, txn)| {
self.context.latest_state_view_poem(ledger_info)?
.as_converter(self.context.db.clone(), self.context.table_info_reader.clone())
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_signed_transaction_poem(txn, self.context.chain_id())
.context(format!("Failed to create SignedTransaction from SubmitTransactionRequest at position {}", index))
.map_err(|err| {
Expand Down Expand Up @@ -1264,7 +1258,7 @@ impl TransactionsApi {

// We provide the pending transaction so that users have the hash associated
let pending_txn = state_view
.as_converter(self.context.db.clone(), self.context.table_info_reader.clone())
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_pending_transaction_poem(txn)
.context("Failed to build PendingTransaction from mempool response, even though it said the request was accepted")
.map_err(|err| SubmitTransactionError::internal_with_code(
Expand Down Expand Up @@ -1495,10 +1489,7 @@ impl TransactionsApi {
let ledger_info = self.context.get_latest_ledger_info()?;
let state_view = self.context.latest_state_view_poem(&ledger_info)?;
let raw_txn: RawTransaction = state_view
.as_converter(
self.context.db.clone(),
self.context.table_info_reader.clone(),
)
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_raw_transaction_poem(request.transaction, self.context.chain_id())
.context("The given transaction is invalid")
.map_err(|err| {
Expand Down
6 changes: 3 additions & 3 deletions api/src/view_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn view_request(

let view_function: ViewFunction = match request {
ViewFunctionRequest::Json(data) => state_view
.as_converter(context.db.clone(), context.table_info_reader.clone())
.as_converter(context.db.clone(), context.indexer_reader.clone())
.convert_view_function(data.0)
.map_err(|err| {
BasicErrorWith404::bad_request_with_code(
Expand Down Expand Up @@ -167,7 +167,7 @@ fn view_request(
},
AcceptType::Json => {
let return_types = state_view
.as_converter(context.db.clone(), context.table_info_reader.clone())
.as_converter(context.db.clone(), context.indexer_reader.clone())
.function_return_types(&view_function)
.and_then(|tys| {
tys.into_iter()
Expand All @@ -187,7 +187,7 @@ fn view_request(
.zip(return_types.into_iter())
.map(|(v, ty)| {
state_view
.as_converter(context.db.clone(), context.table_info_reader.clone())
.as_converter(context.db.clone(), context.indexer_reader.clone())
.try_into_move_value(&ty, &v)
})
.collect::<anyhow::Result<Vec<_>>>()
Expand Down
1 change: 0 additions & 1 deletion api/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-config = { workspace = true }
aptos-crypto = { workspace = true }
aptos-db-indexer = { workspace = true }
aptos-framework = { workspace = true }
aptos-logger = { workspace = true }
aptos-openapi = { workspace = true }
Expand Down
Loading

0 comments on commit 331364e

Please sign in to comment.