Skip to content

Commit

Permalink
Add test for internal indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed May 22, 2024
1 parent fada677 commit a7ff4f9
Show file tree
Hide file tree
Showing 25 changed files with 311 additions and 57 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ use aptos_api_types::{
};
use aptos_config::config::{NodeConfig, RoleType};
use aptos_crypto::HashValue;
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule};
use aptos_logger::{error, info, Schema};
use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus};
Expand All @@ -36,6 +33,9 @@ use aptos_types::{
chain_id::ChainId,
contract_event::EventWithVersion,
event::EventKey,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
ledger_info::LedgerInfoWithSignatures,
on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig},
state_store::{
Expand Down
10 changes: 6 additions & 4 deletions api/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use crate::{
};
use anyhow::Context as AnyhowContext;
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReader;
use aptos_types::chain_id::ChainId;
use aptos_types::{
chain_id::ChainId,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
};
use poem::{
handler,
http::Method,
Expand Down
1 change: 0 additions & 1 deletion api/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-config = { workspace = true }
aptos-crypto = { workspace = true }
aptos-db-indexer = { workspace = true }
aptos-framework = { workspace = true }
aptos-logger = { workspace = true }
aptos-openapi = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion api/types/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use crate::{
};
use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::{sample, sample::SampleRate};
use aptos_resource_viewer::AptosValueAnnotator;
use aptos_storage_interface::DbReader;
use aptos_types::{
access_path::{AccessPath, Path},
chain_id::ChainId,
contract_event::{ContractEvent, EventWithVersion},
indexer::table_info_reader::TableInfoReader,
state_store::{
state_key::{inner::StateKeyInner, StateKey},
table::{TableHandle, TableInfo},
Expand Down
10 changes: 6 additions & 4 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ use aptos_consensus::{
};
use aptos_consensus_notifications::ConsensusNotifier;
use aptos_data_client::client::AptosDataClient;
use aptos_db_indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
};
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc;
use aptos_indexer_grpc_table_info::runtime::{
Expand All @@ -34,7 +31,12 @@ use aptos_peer_monitoring_service_server::{
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_time_service::TimeService;
use aptos_types::chain_id::ChainId;
use aptos_types::{
chain_id::ChainId,
indexer::{
db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader,
},
};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
Expand Down
14 changes: 14 additions & 0 deletions config/src/config/index_db_tailer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ pub struct IndexDBTailerConfig {
pub batch_size: usize,
}

impl IndexDBTailerConfig {
pub fn new(enable: bool, batch_size: usize) -> Self {
Self { enable, batch_size }
}

pub fn enable(&self) -> bool {
self.enable
}

pub fn batch_size(&self) -> usize {
self.batch_size
}
}

impl Default for IndexDBTailerConfig {
fn default() -> Self {
Self {
Expand Down
3 changes: 1 addition & 2 deletions ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use aptos_api::context::Context;
use aptos_config::config::NodeConfig;
use aptos_db_indexer::table_info_reader::TableInfoReader;
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_protos::{
Expand All @@ -19,7 +18,7 @@ use aptos_protos::{
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
};
use aptos_storage_interface::DbReader;
use aptos_types::chain_id::ChainId;
use aptos_types::{chain_id::ChainId, indexer::table_info_reader::TableInfoReader};
use std::{net::ToSocketAddrs, sync::Arc};
use tokio::runtime::Runtime;
use tonic::{codec::CompressionEncoding, transport::Server};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ impl TailerService {
}

pub async fn run(&mut self) {
let mut start_version = self.db_tailer.get_last_version();
let mut start_version = self.db_tailer.get_persisted_version();
loop {
let start_time: std::time::Instant = std::time::Instant::now();
let cur_version = self
.db_tailer
.process_a_batch(Some(start_version))
.expect("Failed to run indexer db tailer");

if cur_version == start_version {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
start_version = cur_version;
log_grpc_step(
SERVICE_TYPE,
Expand Down
2 changes: 2 additions & 0 deletions execution/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aptos-drop-helper = { workspace = true }
aptos-executor-service = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-sdk = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
Expand All @@ -42,6 +43,7 @@ serde = { workspace = true }
aptos-cached-packages = { workspace = true }
aptos-config = { workspace = true }
aptos-db = { workspace = true }
aptos-db-indexer = { workspace = true, features = ["test"] }
aptos-executor-test-helpers = { workspace = true }
aptos-genesis = { workspace = true }
aptos-storage-interface = { workspace = true }
Expand Down
161 changes: 161 additions & 0 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_cached_packages::aptos_stdlib;
use aptos_config::config::{index_db_tailer_config::IndexDBTailerConfig, RocksdbConfig};
use aptos_db::AptosDB;
use aptos_db_indexer::{db_ops::open_tailer_db, db_tailer::DBTailer};
use aptos_executor_test_helpers::{
gen_block_id, gen_ledger_info_with_sigs, integration_test_impl::create_db_and_executor,
};
use aptos_executor_types::BlockExecutorTrait;
use aptos_sdk::{
transaction_builder::TransactionFactory,
types::{AccountKey, LocalAccount},
};
use aptos_storage_interface::DbReader;
use aptos_temppath::TempPath;
use aptos_types::{
account_config::aptos_test_root_address,
block_metadata::BlockMetadata,
chain_id::ChainId,
test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
transaction::{
signature_verified_transaction::into_signature_verified_block, Transaction,
Transaction::UserTransaction, WriteSetPayload,
},
};
use rand::SeedableRng;
use std::sync::Arc;

const B: u64 = 1_000_000_000;

#[cfg(test)]
pub fn create_test_db() -> (Arc<AptosDB>, LocalAccount) {
// create test db
let path = aptos_temppath::TempPath::new();
let (genesis, validators) = aptos_vm_genesis::test_genesis_change_set_and_validators(Some(1));
let genesis_txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis));
let core_resources_account: LocalAccount = LocalAccount::new(
aptos_test_root_address(),
AccountKey::from_private_key(aptos_vm_genesis::GENESIS_KEYPAIR.0.clone()),
0,
);
let (aptos_db, _db, executor, _waypoint) =
create_db_and_executor(path.path(), &genesis_txn, true);
let parent_block_id = executor.committed_block_id();

// This generates accounts that do not overlap with genesis
let seed = [3u8; 32];
let mut rng = ::rand::rngs::StdRng::from_seed(seed);
let signer = aptos_types::validator_signer::ValidatorSigner::new(
validators[0].data.owner_address,
validators[0].consensus_key.clone(),
);
let account1 = LocalAccount::generate(&mut rng);
let account2 = LocalAccount::generate(&mut rng);
let account3 = LocalAccount::generate(&mut rng);

let txn_factory = TransactionFactory::new(ChainId::test());

let block1_id = gen_block_id(1);
let block1_meta = Transaction::BlockMetadata(BlockMetadata::new(
block1_id,
1,
0,
signer.author(),
vec![0],
vec![],
1,
));
let tx1 = core_resources_account
.sign_with_transaction_builder(txn_factory.create_user_account(account1.public_key()));
let tx2 = core_resources_account
.sign_with_transaction_builder(txn_factory.create_user_account(account2.public_key()));
let tx3 = core_resources_account
.sign_with_transaction_builder(txn_factory.create_user_account(account3.public_key()));
// Create account1 with 2T coins.
let txn1 = core_resources_account
.sign_with_transaction_builder(txn_factory.mint(account1.address(), 2_000 * B));
// Create account2 with 1.2T coins.
let txn2 = core_resources_account
.sign_with_transaction_builder(txn_factory.mint(account2.address(), 1_200 * B));
// Create account3 with 1T coins.
let txn3 = core_resources_account
.sign_with_transaction_builder(txn_factory.mint(account3.address(), 1_000 * B));

// Transfer 20B coins from account1 to account2.
// balance: <1.98T, 1.22T, 1T
let txn4 =
account1.sign_with_transaction_builder(txn_factory.transfer(account2.address(), 20 * B));

// Transfer 10B coins from account2 to account3.
// balance: <1.98T, <1.21T, 1.01T
let txn5 =
account2.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 10 * B));

// Transfer 70B coins from account1 to account3.
// balance: <1.91T, <1.21T, 1.08T
let txn6 =
account1.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 70 * B));

let reconfig1 = core_resources_account.sign_with_transaction_builder(
txn_factory.payload(aptos_stdlib::aptos_governance_force_end_epoch_test_only()),
);

let block1: Vec<_> = into_signature_verified_block(vec![
block1_meta,
UserTransaction(tx1),
UserTransaction(tx2),
UserTransaction(tx3),
UserTransaction(txn1),
UserTransaction(txn2),
UserTransaction(txn3),
UserTransaction(txn4),
UserTransaction(txn5),
UserTransaction(txn6),
UserTransaction(reconfig1),
]);
let output1 = executor
.execute_block(
(block1_id, block1.clone()).into(),
parent_block_id,
TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
)
.unwrap();
let li1 = gen_ledger_info_with_sigs(1, &output1, block1_id, &[signer.clone()]);
executor.commit_blocks(vec![block1_id], li1).unwrap();
(aptos_db, core_resources_account)
}

#[test]
fn test_db_tailer_data() {
// create test db
let (aptos_db, core_account) = create_test_db();
let total_version = aptos_db.get_latest_version().unwrap();
// create db tailer
let rocksdb_config = RocksdbConfig::default();
let temp_path = TempPath::new();
let db = Arc::new(
open_tailer_db(temp_path.as_ref(), &rocksdb_config)
.expect("Failed to open up indexer db tailer initially"),
);
let tailer = DBTailer::new(db, aptos_db, &IndexDBTailerConfig::new(true, 2));
// assert the data matches the expected data
let mut version = tailer.get_persisted_version();
assert_eq!(version, 0);
while version < total_version {
version = tailer.process_a_batch(Some(version)).unwrap();
}
let txn_iter = tailer
.get_account_transaction_version_iter(core_account.address(), 0, 1000, 1000)
.unwrap();
let res: Vec<_> = txn_iter.collect();
// core account submitted 7 transactions, and the first transaction is version 2
assert!(res.len() == 7);
assert!(res[0].as_ref().unwrap().1 == 2);

let x = tailer.get_event_by_key_iter().unwrap();
let res: Vec<_> = x.collect();
assert!(!res.is_empty());
}
3 changes: 3 additions & 0 deletions storage/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ aptos-logger = { workspace = true }
aptos-resource-viewer = { workspace = true }
aptos-rocksdb-options = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-sdk = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-vm-genesis = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
bytes = { workspace = true }
Expand All @@ -40,4 +42,5 @@ rand = { workspace = true }

[features]
default = []
test = []
fuzzing = ["proptest", "proptest-derive", "aptos-types/fuzzing", "aptos-schemadb/fuzzing"]
11 changes: 10 additions & 1 deletion storage/indexer/src/db_ops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::schema::column_families;
use crate::schema::{column_families, tailer_column_families};
use anyhow::Result;
use aptos_config::config::RocksdbConfig;
use aptos_rocksdb_options::gen_rocksdb_options;
Expand All @@ -17,6 +17,15 @@ pub fn open_db<P: AsRef<Path>>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re
)?)
}

pub fn open_tailer_db<P: AsRef<Path>>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result<DB> {
Ok(DB::open(
db_path,
"tailer_db",
tailer_column_families(),
&gen_rocksdb_options(rocksdb_config, false),
)?)
}

pub fn close_db(db: DB) {
mem::drop(db)
}
Loading

0 comments on commit a7ff4f9

Please sign in to comment.