Skip to content

Commit

Permalink
[api] add kv indexer db
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed May 17, 2024
1 parent e2f261d commit 13555fa
Show file tree
Hide file tree
Showing 34 changed files with 1,049 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/openapi-spec-generator/src/fake_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub fn get_fake_context() -> Context {
mempool.ac_client,
NodeConfig::default(),
None, /* table info reader */
None,
)
}
7 changes: 6 additions & 1 deletion api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use aptos_api_types::{
};
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, info, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
Expand Down Expand Up @@ -78,6 +80,7 @@ pub struct Context {
simulate_txn_stats: Arc<FunctionStats>,
pub table_info_reader: Option<Arc<dyn TableInfoReader>>,
pub wait_for_hash_active_connections: Arc<AtomicUsize>,
pub txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
}

impl std::fmt::Debug for Context {
Expand All @@ -93,6 +96,7 @@ impl Context {
mp_sender: MempoolClientSender,
node_config: NodeConfig,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
) -> Self {
let (view_function_stats, simulate_txn_stats) = {
let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some();
Expand Down Expand Up @@ -131,6 +135,7 @@ impl Context {
simulate_txn_stats,
table_info_reader,
wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)),
txn_event_reader,
}
}

Expand Down
15 changes: 13 additions & 2 deletions api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::{
};
use anyhow::Context as AnyhowContext;
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReader;
Expand All @@ -36,11 +38,19 @@ pub fn bootstrap(
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
table_info_reader: Option<Arc<dyn TableInfoReader>>,
txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>>,
) -> anyhow::Result<Runtime> {
let max_runtime_workers = get_max_runtime_workers(&config.api);
let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers));

let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader);
let context = Context::new(
chain_id,
db,
mp_sender,
config.clone(),
table_info_reader,
txn_event_reader,
);

attach_poem_to_runtime(runtime.handle(), context.clone(), config, false)
.context("Failed to attach poem to runtime")?;
Expand Down Expand Up @@ -342,6 +352,7 @@ mod tests {
context.db.clone(),
context.mempool.ac_client.clone(),
None,
None,
);
assert!(ret.is_ok());

Expand Down
1 change: 1 addition & 0 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub fn new_test_context(
mempool.ac_client.clone(),
node_config.clone(),
None, /* table info reader */
None,
);

// Configure the testing depending on which API version we're testing.
Expand Down
3 changes: 3 additions & 0 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub struct AptosHandle {
_peer_monitoring_service_runtime: Runtime,
_state_sync_runtimes: StateSyncRuntimes,
_telemetry_runtime: Option<Runtime>,
_indexer_db_tailer_runtime: Option<Runtime>,
}

/// Start an Aptos node
Expand Down Expand Up @@ -694,6 +695,7 @@ pub fn setup_environment_and_start_node(
indexer_table_info_runtime,
indexer_runtime,
indexer_grpc_runtime,
db_tailer_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?;

// Create mempool and get the consensus to mempool sender
Expand Down Expand Up @@ -835,6 +837,7 @@ pub fn setup_environment_and_start_node(
_peer_monitoring_service_runtime: peer_monitoring_service_runtime,
_state_sync_runtimes: state_sync_runtimes,
_telemetry_runtime: telemetry_runtime,
_indexer_db_tailer_runtime: db_tailer_runtime,
})
}

Expand Down
24 changes: 22 additions & 2 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ use aptos_consensus::{
};
use aptos_consensus_notifications::ConsensusNotifier;
use aptos_data_client::client::AptosDataClient;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc;
use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info;
use aptos_indexer_grpc_table_info::runtime::{
bootstrap as bootstrap_indexer_table_info, bootstrap_db_tailer,
};
use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater};
use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest};
use aptos_mempool_notifications::MempoolNotificationListener;
Expand Down Expand Up @@ -51,6 +55,7 @@ pub fn bootstrap_api_and_indexer(
Option<Runtime>,
Option<Runtime>,
Option<Runtime>,
Option<Runtime>,
)> {
// Create the mempool client and sender
let (mempool_client_sender, mempool_client_receiver) =
Expand All @@ -66,18 +71,32 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_tailer_runtime, txn_event_reader) =
match bootstrap_db_tailer(node_config, db_rw.clone()) {
Some((runtime, db_tailer)) => (Some(runtime), Some(db_tailer)),
None => (None, None),
};

// Create the API runtime
let table_info_reader: Option<Arc<dyn TableInfoReader>> = indexer_async_v2.map(|arc| {
let trait_object: Arc<dyn TableInfoReader> = arc;
trait_object
});

let txn_event_reader: Option<Arc<dyn IndexerTransactionEventReader>> =
txn_event_reader.map(|arc| {
let trait_object: Arc<dyn IndexerTransactionEventReader> = arc;
trait_object
});

let api_runtime = if node_config.api.enabled {
Some(bootstrap_api(
node_config,
chain_id,
db_rw.reader.clone(),
mempool_client_sender.clone(),
table_info_reader.clone(),
txn_event_reader.clone(),
)?)
} else {
None
Expand Down Expand Up @@ -106,6 +125,7 @@ pub fn bootstrap_api_and_indexer(
indexer_table_info_runtime,
indexer_runtime,
indexer_grpc,
db_tailer_runtime,
))
}

Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub(crate) fn bootstrap_db(

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);

(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Expand Down
19 changes: 19 additions & 0 deletions config/src/config/index_db_tailer_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct IndexDBTailerConfig {
pub enable: bool,
pub batch_size: usize,
}

impl Default for IndexDBTailerConfig {
fn default() -> Self {
Self {
enable: false,
batch_size: 10_000,
}
}
}
1 change: 1 addition & 0 deletions config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod error;
mod execution_config;
mod gas_estimation_config;
mod identity_config;
pub mod index_db_tailer_config;
mod indexer_config;
mod indexer_grpc_config;
mod indexer_table_info_config;
Expand Down
17 changes: 10 additions & 7 deletions config/src/config/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
use super::{DagConsensusConfig, IndexerTableInfoConfig};
use crate::{
config::{
dkg_config::DKGConfig, jwk_consensus_config::JWKConsensusConfig,
netbench_config::NetbenchConfig, node_config_loader::NodeConfigLoader,
node_startup_config::NodeStartupConfig, observer_config::ObserverConfig,
persistable_config::PersistableConfig, utils::RootPath, AdminServiceConfig, ApiConfig,
BaseConfig, ConsensusConfig, Error, ExecutionConfig, IndexerConfig, IndexerGrpcConfig,
InspectionServiceConfig, LoggerConfig, MempoolConfig, NetworkConfig,
PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, StorageConfig,
dkg_config::DKGConfig, index_db_tailer_config::IndexDBTailerConfig,
jwk_consensus_config::JWKConsensusConfig, netbench_config::NetbenchConfig,
node_config_loader::NodeConfigLoader, node_startup_config::NodeStartupConfig,
observer_config::ObserverConfig, persistable_config::PersistableConfig, utils::RootPath,
AdminServiceConfig, ApiConfig, BaseConfig, ConsensusConfig, Error, ExecutionConfig,
IndexerConfig, IndexerGrpcConfig, InspectionServiceConfig, LoggerConfig, MempoolConfig,
NetworkConfig, PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig,
StorageConfig,
},
network_id::NetworkId,
};
Expand Down Expand Up @@ -83,6 +84,8 @@ pub struct NodeConfig {
pub storage: StorageConfig,
#[serde(default)]
pub validator_network: Option<NetworkConfig>,
#[serde(default)]
pub index_db_tailer: IndexDBTailerConfig,
}

impl NodeConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/indexer/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn bootstrap(
mp_sender,
node_config,
None, /* table info reader */
None,
));
run_forever(indexer_config, context).await;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub fn bootstrap(
mp_sender,
node_config,
table_info_reader,
None,
));
let service_context = ServiceContext {
context: context.clone(),
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
pub mod backup_restore;
pub mod runtime;
pub mod table_info_service;
pub mod tailer_service;
25 changes: 23 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::table_info_service::TableInfoService;
use crate::{table_info_service::TableInfoService, tailer_service::TailerService};
use aptos_api::context::Context;
use aptos_config::config::NodeConfig;
use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2};
use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer, db_v2::IndexerAsyncV2};
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::chain_id::ChainId;
Expand All @@ -13,6 +13,26 @@ use tokio::runtime::Runtime;

const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";

pub fn bootstrap_db_tailer(
config: &NodeConfig,
db_rw: DbReaderWriter,
) -> Option<(Runtime, Arc<DBTailer>)> {
if !config.indexer_table_info.enabled {
return None;
}
let runtime = aptos_runtimes::spawn_named_runtime("index-db-tailer".to_string(), None);
// Set up db config and open up the db initially to read metadata
let node_config = config.clone();
let mut tailer_service = TailerService::new(db_rw.reader, &node_config);
let db_tailer = tailer_service.get_db_tailer();
// Spawn the runtime for db tailer
runtime.spawn(async move {
tailer_service.run().await;
});

Some((runtime, db_tailer))
}

/// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service
/// Returns corresponding Tokio runtime
pub fn bootstrap(
Expand Down Expand Up @@ -50,6 +70,7 @@ pub fn bootstrap(
mp_sender,
node_config.clone(),
None,
None,
));

let mut parser = TableInfoService::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl TableInfoService {
/// 6. retry all the txns in the loop sequentially to clean up the pending on items
pub async fn run(&mut self) {
loop {
let start_time = std::time::Instant::now();
let start_time: std::time::Instant = std::time::Instant::now();
let ledger_version = self.get_highest_known_version().await.unwrap_or_default();
let batches = self.get_batches(ledger_version).await;
let results = self
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_config::config::NodeConfig;
use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer};
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_storage_interface::DbReader;
use std::sync::Arc;

const SERVICE_TYPE: &str = "db_tailer_service";
const INDEX_ASYNC_DB_TAILER: &str = "index_async_db_tailer";

pub struct TailerService {
pub db_tailer: Arc<DBTailer>,
}

impl TailerService {
pub fn new(db_reader: Arc<dyn DbReader>, node_config: &NodeConfig) -> Self {
let db_path = node_config
.storage
.get_dir_paths()
.default_root_path()
.join(INDEX_ASYNC_DB_TAILER);
let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config;
let db = Arc::new(
open_db(db_path, &rocksdb_config)
.expect("Failed to open up indexer db tailer initially"),
);

let indexer_db_tailer =
Arc::new(DBTailer::new(db, db_reader, &node_config.index_db_tailer));
Self {
db_tailer: indexer_db_tailer,
}
}

pub fn get_db_tailer(&self) -> Arc<DBTailer> {
Arc::clone(&self.db_tailer)
}

pub async fn run(&mut self) {
let mut start_version = self.db_tailer.get_last_version();
loop {
let start_time: std::time::Instant = std::time::Instant::now();
let cur_version = self
.db_tailer
.process_a_batch(Some(start_version))
.expect("Failed to run indexer db tailer");
start_version = cur_version;
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::DBTailerProcessed,
None,
None,
None,
None,
Some(start_time.elapsed().as_secs_f64()),
None,
Some(cur_version as i64),
None,
);
}
}
}
Loading

0 comments on commit 13555fa

Please sign in to comment.