Skip to content

Commit

Permalink
[api] migrate event and transaction schemas
Browse files Browse the repository at this point in the history
Add test for internal indexer
  • Loading branch information
areshand committed May 28, 2024
1 parent 415d7e1 commit f253b5c
Show file tree
Hide file tree
Showing 43 changed files with 1,319 additions and 33 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/openapi-spec-generator/src/fake_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub fn get_fake_context() -> Context {
mempool.ac_client,
NodeConfig::default(),
None, /* table info reader */
None,
)
}
7 changes: 6 additions & 1 deletion api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use aptos_api_types::{
};
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, info, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
Expand All @@ -34,6 +33,9 @@ use aptos_types::{
chain_id::ChainId,
contract_event::EventWithVersion,
event::EventKey,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
ledger_info::LedgerInfoWithSignatures,
on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig},
state_store::{
Expand Down Expand Up @@ -76,6 +78,7 @@ pub struct Context {
simulate_txn_stats: Arc<FunctionStats>,
pub table_info_reader: Option<Arc<dyn TableInfoReader>>,
pub wait_for_hash_active_connections: Arc<AtomicUsize>,
pub txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
}

impl std::fmt::Debug for Context {
Expand All @@ -91,6 +94,7 @@ impl Context {
mp_sender: MempoolClientSender,
node_config: NodeConfig,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
) -> Self {
let (view_function_stats, simulate_txn_stats) = {
let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some();
Expand Down Expand Up @@ -129,6 +133,7 @@ impl Context {
simulate_txn_stats,
table_info_reader,
wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)),
txn_event_reader,
}
}

Expand Down
19 changes: 16 additions & 3 deletions api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ use crate::{
};
use anyhow::Context as AnyhowContext;
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReader;
use aptos_types::chain_id::ChainId;
use aptos_types::{
chain_id::ChainId,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
};
use poem::{
handler,
http::Method,
Expand All @@ -36,11 +40,19 @@ pub fn bootstrap(
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
) -> anyhow::Result<Runtime> {
let max_runtime_workers = get_max_runtime_workers(&config.api);
let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers));

let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader);
let context = Context::new(
chain_id,
db,
mp_sender,
config.clone(),
table_info_reader,
txn_event_reader,
);

attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
.context("Failed to attach poem to runtime")?;
Expand Down Expand Up @@ -342,6 +354,7 @@ mod tests {
context.db.clone(),
context.mempool.ac_client.clone(),
None,
None,
);
assert!(ret.is_ok());

Expand Down
1 change: 1 addition & 0 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub fn new_test_context(
mempool.ac_client.clone(),
node_config.clone(),
None, /* table info reader */
None,
);

// Configure the testing depending on which API version we're testing.
Expand Down
1 change: 0 additions & 1 deletion api/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-config = { workspace = true }
aptos-crypto = { workspace = true }
aptos-db-indexer = { workspace = true }
aptos-framework = { workspace = true }
aptos-logger = { workspace = true }
aptos-openapi = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion api/types/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use crate::{
};
use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::{sample, sample::SampleRate};
use aptos_resource_viewer::AptosValueAnnotator;
use aptos_storage_interface::DbReader;
use aptos_types::{
access_path::{AccessPath, Path},
chain_id::ChainId,
contract_event::{ContractEvent, EventWithVersion},
indexer::table_info_reader::TableInfoReader,
state_store::{
state_key::{inner::StateKeyInner, StateKey},
table::{TableHandle, TableInfo},
Expand Down
3 changes: 3 additions & 0 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub struct AptosHandle {
_peer_monitoring_service_runtime: Runtime,
_state_sync_runtimes: StateSyncRuntimes,
_telemetry_runtime: Option<Runtime>,
_indexer_db_tailer_runtime: Option<Runtime>,
}

/// Start an Aptos node
Expand Down Expand Up @@ -694,6 +695,7 @@ pub fn setup_environment_and_start_node(
indexer_table_info_runtime,
indexer_runtime,
indexer_grpc_runtime,
db_tailer_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?;

// Create mempool and get the consensus to mempool sender
Expand Down Expand Up @@ -835,6 +837,7 @@ pub fn setup_environment_and_start_node(
_peer_monitoring_service_runtime: peer_monitoring_service_runtime,
_state_sync_runtimes: state_sync_runtimes,
_telemetry_runtime: telemetry_runtime,
_indexer_db_tailer_runtime: db_tailer_runtime,
})
}

Expand Down
28 changes: 25 additions & 3 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use aptos_consensus::{
};
use aptos_consensus_notifications::ConsensusNotifier;
use aptos_data_client::client::AptosDataClient;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc;
use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info;
use aptos_indexer_grpc_table_info::runtime::{
bootstrap as bootstrap_indexer_table_info, bootstrap_db_tailer,
};
use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater};
use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest};
use aptos_mempool_notifications::MempoolNotificationListener;
Expand All @@ -30,7 +31,12 @@ use aptos_peer_monitoring_service_server::{
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_time_service::TimeService;
use aptos_types::chain_id::ChainId;
use aptos_types::{
chain_id::ChainId,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
Expand All @@ -51,6 +57,7 @@ pub fn bootstrap_api_and_indexer(
Option<Runtime>,
Option<Runtime>,
Option<Runtime>,
Option<Runtime>,
)> {
// Create the mempool client and sender
let (mempool_client_sender, mempool_client_receiver) =
Expand All @@ -66,18 +73,32 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_tailer_runtime, txn_event_reader) =
match bootstrap_db_tailer(node_config, db_rw.clone()) {
Some((runtime, db_tailer)) => (Some(runtime), Some(db_tailer)),
None => (None, None),
};

// Create the API runtime
let table_info_reader: Option<Arc<dyn TableInfoReader>> = indexer_async_v2.map(|arc| {
let trait_object: Arc<dyn TableInfoReader> = arc;
trait_object
});

let txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>> =
txn_event_reader.map(|arc| {
let trait_object: Arc<dyn IndexerTransactionEventReader> = arc;
trait_object
});

let api_runtime = if node_config.api.enabled {
Some(bootstrap_api(
node_config,
chain_id,
db_rw.reader.clone(),
mempool_client_sender.clone(),
table_info_reader.clone(),
txn_event_reader.clone(),
)?)
} else {
None
Expand Down Expand Up @@ -106,6 +127,7 @@ pub fn bootstrap_api_and_indexer(
indexer_table_info_runtime,
indexer_runtime,
indexer_grpc,
db_tailer_runtime,
))
}

Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub(crate) fn bootstrap_db(

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);

(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Expand Down
33 changes: 33 additions & 0 deletions config/src/config/index_db_tailer_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct IndexDBTailerConfig {
pub enable: bool,
pub batch_size: usize,
}

impl IndexDBTailerConfig {
pub fn new(enable: bool, batch_size: usize) -> Self {
Self { enable, batch_size }
}

pub fn enable(&self) -> bool {
self.enable
}

pub fn batch_size(&self) -> usize {
self.batch_size
}
}

impl Default for IndexDBTailerConfig {
fn default() -> Self {
Self {
enable: false,
batch_size: 10_000,
}
}
}
1 change: 1 addition & 0 deletions config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod error;
mod execution_config;
mod gas_estimation_config;
mod identity_config;
pub mod index_db_tailer_config;
mod indexer_config;
mod indexer_grpc_config;
mod indexer_table_info_config;
Expand Down
17 changes: 10 additions & 7 deletions config/src/config/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
use super::{DagConsensusConfig, IndexerTableInfoConfig};
use crate::{
config::{
dkg_config::DKGConfig, jwk_consensus_config::JWKConsensusConfig,
netbench_config::NetbenchConfig, node_config_loader::NodeConfigLoader,
node_startup_config::NodeStartupConfig, observer_config::ObserverConfig,
persistable_config::PersistableConfig, utils::RootPath, AdminServiceConfig, ApiConfig,
BaseConfig, ConsensusConfig, Error, ExecutionConfig, IndexerConfig, IndexerGrpcConfig,
InspectionServiceConfig, LoggerConfig, MempoolConfig, NetworkConfig,
PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, StorageConfig,
dkg_config::DKGConfig, index_db_tailer_config::IndexDBTailerConfig,
jwk_consensus_config::JWKConsensusConfig, netbench_config::NetbenchConfig,
node_config_loader::NodeConfigLoader, node_startup_config::NodeStartupConfig,
observer_config::ObserverConfig, persistable_config::PersistableConfig, utils::RootPath,
AdminServiceConfig, ApiConfig, BaseConfig, ConsensusConfig, Error, ExecutionConfig,
IndexerConfig, IndexerGrpcConfig, InspectionServiceConfig, LoggerConfig, MempoolConfig,
NetworkConfig, PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig,
StorageConfig,
},
network_id::NetworkId,
};
Expand Down Expand Up @@ -83,6 +84,8 @@ pub struct NodeConfig {
pub storage: StorageConfig,
#[serde(default)]
pub validator_network: Option<NetworkConfig>,
#[serde(default)]
pub index_db_tailer: IndexDBTailerConfig,
}

impl NodeConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/indexer/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn bootstrap(
mp_sender,
node_config,
None, /* table info reader */
None,
));
run_forever(indexer_config, context).await;
});
Expand Down
4 changes: 2 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use aptos_api::context::Context;
use aptos_config::config::NodeConfig;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_protos::{
Expand All @@ -19,7 +18,7 @@ use aptos_protos::{
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
};
use aptos_storage_interface::DbReader;
use aptos_types::chain_id::ChainId;
use aptos_types::{chain_id::ChainId, indexer::table_info_reader::TableInfoReader};
use std::{net::ToSocketAddrs, sync::Arc};
use tokio::runtime::Runtime;
use tonic::{codec::CompressionEncoding, transport::Server};
Expand Down Expand Up @@ -58,6 +57,7 @@ pub fn bootstrap(
mp_sender,
node_config,
table_info_reader,
None,
));
let service_context = ServiceContext {
context: context.clone(),
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
pub mod backup_restore;
pub mod runtime;
pub mod table_info_service;
pub mod tailer_service;
Loading

0 comments on commit f253b5c

Please sign in to comment.