Skip to content

Commit

Permalink
[indexer-grpc] Overhaul indexer configuration, error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
banool committed Sep 19, 2023
1 parent 271e118 commit 9c893ac
Show file tree
Hide file tree
Showing 21 changed files with 587 additions and 407 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ docker/compose/indexer-grpc/data-service-grpc-server.key
.*\#
\#*\#

# Aptos CLI files
# Aptos CLI / local testnet files
.aptos
**/*.rdb

# VSCode settings
.vscode/
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 75 additions & 59 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use hex::{FromHex, FromHexError};
use rand::{rngs::StdRng, SeedableRng};
use std::{
fs,
io::{Read, Write},
path::PathBuf,
io::Write,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -123,6 +123,7 @@ impl AptosNodeArgs {
setup_test_environment_and_start_node(
self.config,
self.test_config_override,
None,
self.test_dir,
self.random_ports,
self.lazy,
Expand Down Expand Up @@ -231,6 +232,7 @@ pub fn start(
pub fn setup_test_environment_and_start_node<R>(
config_path: Option<PathBuf>,
test_config_override_path: Option<PathBuf>,
config: Option<NodeConfig>,
test_dir: Option<PathBuf>,
random_ports: bool,
enable_lazy_mode: bool,
Expand All @@ -253,46 +255,22 @@ where
let aptos_root_key_path = test_dir.join("mint.key");

// If there's already a config, use it. Otherwise create a test one.
let config = if validator_config_path.exists() {
let config = if let Some(config) = config {
config
} else if validator_config_path.exists() {
NodeConfig::load_from_path(&validator_config_path)
.map_err(|error| anyhow!("Unable to load config: {:?}", error))?
} else {
// Create a test only config for a single validator node
let node_config = create_single_node_test_config(
config_path.clone(),
test_config_override_path.clone(),
// Create a test only config for a single validator node.
create_single_node_test_config(
&config_path,
&test_config_override_path,
&test_dir,
random_ports,
enable_lazy_mode,
)?;

// Build genesis and the validator node
let builder = aptos_genesis::builder::Builder::new(&test_dir, framework.clone())?
.with_init_config(Some(Arc::new(move |_, config, _| {
*config = node_config.clone();
})))
.with_init_genesis_config(Some(Arc::new(|genesis_config| {
genesis_config.allow_new_validators = true;
genesis_config.epoch_duration_secs = EPOCH_LENGTH_SECS;
genesis_config.recurring_lockup_duration_secs = 7200;
})))
.with_randomize_first_validator_ports(random_ports);
let (root_key, _genesis, genesis_waypoint, mut validators) = builder.build(rng)?;

// Write the mint key to disk
let serialized_keys = bcs::to_bytes(&root_key)?;
let mut key_file = fs::File::create(&aptos_root_key_path)?;
key_file.write_all(&serialized_keys)?;

// Build a waypoint file so that clients / docker can grab it easily
let waypoint_file_path = test_dir.join("waypoint.txt");
Write::write_all(
&mut fs::File::create(waypoint_file_path)?,
genesis_waypoint.to_string().as_bytes(),
)?;

aptos_config::config::sanitize_node_config(validators[0].config.override_config_mut())?;

// Return the validator config
validators[0].config.override_config().clone()
framework,
rng,
)?
};

// Prepare log file since we cannot automatically route logs to stderr
Expand All @@ -316,7 +294,7 @@ where
println!("\tTest dir: {:?}", test_dir);
println!("\tAptos root key path: {:?}", aptos_root_key_path);
println!("\tWaypoint: {}", config.base.waypoint.genesis_waypoint());
println!("\tChainId: {}", ChainId::test());
println!("\tChainId: {}", ChainId::test().id());
println!("\tREST API endpoint: http://{}", &config.api.address);
println!(
"\tMetrics endpoint: http://{}:{}/metrics",
Expand All @@ -327,9 +305,10 @@ where
&config.full_node_networks[0].listen_address
);
if config.indexer_grpc.enabled {
if let Some(ref indexer_grpc_address) = config.indexer_grpc.address {
println!("\tIndexer gRPC endpoint: {}", indexer_grpc_address);
}
println!(
"\tIndexer gRPC node stream endpoint: {}",
config.indexer_grpc.address
);
}
if enable_lazy_mode {
println!("\tLazy mode is enabled");
Expand All @@ -340,26 +319,31 @@ where
}

/// Creates a single node test config, with a few config tweaks to reduce
/// the overhead of running the node on a local machine.
fn create_single_node_test_config(
config_path: Option<PathBuf>,
test_config_override_path: Option<PathBuf>,
/// the overhead of running the node on a local machine. It writes necessary
/// configuration artifacts (e.g. the mint key) to disk.
pub fn create_single_node_test_config<R>(
config_path: &Option<PathBuf>,
test_config_override_path: &Option<PathBuf>,
test_dir: &Path,
random_ports: bool,
enable_lazy_mode: bool,
) -> anyhow::Result<NodeConfig> {
framework: &ReleaseBundle,
rng: R,
) -> anyhow::Result<NodeConfig>
where
R: rand::RngCore + rand::CryptoRng,
{
let mut node_config = match test_config_override_path {
// If a config override path was provided, merge it with the default config
Some(test_config_override_path) => {
let mut contents = String::new();
fs::File::open(&test_config_override_path)
.map_err(|e| {
anyhow!(
"Unable to open config override file {:?}. Error: {}",
test_config_override_path,
e
)
})?
.read_to_string(&mut contents)?;
let values = serde_yaml::from_str::<serde_yaml::Value>(&contents).map_err(|e| {
let reader = fs::File::open(test_config_override_path).map_err(|e| {
anyhow!(
"Unable to open config override file {:?}. Error: {}",
test_config_override_path,
e
)
})?;
let values: serde_yaml::Value = serde_yaml::from_reader(&reader).map_err(|e| {
anyhow!(
"Unable to read config override file as YAML {:?}. Error: {}",
test_config_override_path,
Expand Down Expand Up @@ -433,7 +417,7 @@ fn create_single_node_test_config(

// If a config path was provided, use that as the template
if let Some(config_path) = config_path {
node_config = NodeConfig::load_config(&config_path).map_err(|e| {
node_config = NodeConfig::load_config(config_path).map_err(|e| {
anyhow!(
"Unable to load config from path: {:?}. Error: {:?}",
config_path,
Expand All @@ -455,6 +439,38 @@ fn create_single_node_test_config(
node_config.consensus.quorum_store_poll_time_ms = 3_600_000;
}

// The validator builder puts the first node in the 0 directory
let aptos_root_key_path = test_dir.join("mint.key");

// Build genesis and the validator node
let builder = aptos_genesis::builder::Builder::new(test_dir, framework.clone())?
.with_init_config(Some(Arc::new(move |_, config, _| {
*config = node_config.clone();
})))
.with_init_genesis_config(Some(Arc::new(|genesis_config| {
genesis_config.allow_new_validators = true;
genesis_config.epoch_duration_secs = EPOCH_LENGTH_SECS;
genesis_config.recurring_lockup_duration_secs = 7200;
})))
.with_randomize_first_validator_ports(random_ports);
let (root_key, _genesis, genesis_waypoint, mut validators) = builder.build(rng)?;

// Write the mint key to disk
let serialized_keys = bcs::to_bytes(&root_key)?;
let mut key_file = fs::File::create(aptos_root_key_path)?;
key_file.write_all(&serialized_keys)?;

// Build a waypoint file so that clients / docker can grab it easily
let waypoint_file_path = test_dir.join("waypoint.txt");
Write::write_all(
&mut fs::File::create(waypoint_file_path)?,
genesis_waypoint.to_string().as_bytes(),
)?;

aptos_config::config::sanitize_node_config(&mut validators[0].config)?;

let node_config = validators[0].config.clone();

Ok(node_config)
}

Expand Down
16 changes: 13 additions & 3 deletions aptos-node/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use aptos_infallible::RwLock;
use aptos_storage_interface::{DbReader, DbReaderWriter, DbWriter};
use aptos_temppath::TempPath;
use aptos_types::{chain_id::ChainId, waypoint::Waypoint};
use rand::SeedableRng;
use std::{fs, sync::Arc};

/// A mock database implementing DbReader and DbWriter
Expand Down Expand Up @@ -48,10 +49,11 @@ fn test_aptos_vm_does_not_have_test_natives() {
aptos_vm::natives::assert_no_test_natives(crate::utils::ERROR_MSG_BAD_FEATURE_FLAGS)
}

// This test confirms that the overriding behavior works as intended.
#[test]
fn test_create_single_node_test_config() {
// create a test config override and merge it with the default config
// this will get cleaned up by the tempdir when it goes out of scope
// this will get cleaned up by the tempdir when it goes out of scope
let test_dir = aptos_temppath::TempPath::new().as_ref().to_path_buf();
fs::DirBuilder::new()
.recursive(true)
Expand Down Expand Up @@ -82,8 +84,16 @@ fn test_create_single_node_test_config() {

// merge it
let default_node_config = NodeConfig::get_default_validator_config();
let merged_config =
create_single_node_test_config(None, Some(config_override_path), false).unwrap();
let merged_config = create_single_node_test_config(
&None,
&Some(config_override_path),
&test_dir,
false,
false,
aptos_cached_packages::head_release_bundle(),
rand::rngs::StdRng::from_entropy(),
)
.unwrap();

// overriden configs
assert!(merged_config.storage.enable_indexer);
Expand Down
67 changes: 36 additions & 31 deletions config/src/config/indexer_grpc_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,48 @@ use crate::config::{
};
use aptos_types::chain_id::ChainId;
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

// Useful indexer defaults
const DEFAULT_ADDRESS: &str = "0.0.0.0:50051";
const DEFAULT_OUTPUT_BATCH_SIZE: u16 = 100;
const DEFAULT_PROCESSOR_BATCH_SIZE: u16 = 1000;
const DEFAULT_PROCESSOR_TASK_COUNT: u16 = 20;
const DEFAULT_PROCESSOR_BATCH_SIZE: u16 = 1000;
const DEFAULT_OUTPUT_BATCH_SIZE: u16 = 100;
pub const DEFAULT_GRPC_STREAM_PORT: u16 = 50051;

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct IndexerGrpcConfig {
pub enabled: bool,

/// The address that the grpc server will listen on
#[serde(default, skip_serializing_if = "Option::is_none")]
pub address: Option<String>,
/// The address that the grpc server will listen on.
pub address: SocketAddr,

/// Number of processor tasks to fan out
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processor_task_count: Option<u16>,
pub processor_task_count: u16,

/// Number of transactions each processor will process
#[serde(default, skip_serializing_if = "Option::is_none")]
pub processor_batch_size: Option<u16>,
pub processor_batch_size: u16,

/// Number of transactions returned in a single stream response
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_batch_size: Option<u16>,
pub output_batch_size: u16,
}

// Reminder, #[serde(default)] on IndexerGrpcConfig means that the default values for
// fields will come from this Default impl, unless the field has a specific
// #[serde(default)] on it (which none of the above do).
impl Default for IndexerGrpcConfig {
fn default() -> Self {
Self {
enabled: false,
address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
DEFAULT_GRPC_STREAM_PORT,
)),
processor_task_count: DEFAULT_PROCESSOR_TASK_COUNT,
processor_batch_size: DEFAULT_PROCESSOR_BATCH_SIZE,
output_batch_size: DEFAULT_OUTPUT_BATCH_SIZE,
}
}
}

impl ConfigSanitizer for IndexerGrpcConfig {
Expand All @@ -41,28 +56,18 @@ impl ConfigSanitizer for IndexerGrpcConfig {
_node_type: NodeType,
_chain_id: ChainId,
) -> Result<(), Error> {
let indexer_grpc_config = &mut node_config.indexer_grpc;
let sanitizer_name = Self::get_sanitizer_name();

// If the indexer is not enabled, we don't need to do anything
if !indexer_grpc_config.enabled {
if !node_config.indexer_grpc.enabled {
return Ok(());
}

// Set appropriate defaults
indexer_grpc_config.address = indexer_grpc_config
.address
.clone()
.or_else(|| Some(DEFAULT_ADDRESS.into()));
indexer_grpc_config.processor_task_count = indexer_grpc_config
.processor_task_count
.or(Some(DEFAULT_PROCESSOR_TASK_COUNT));
indexer_grpc_config.processor_batch_size = indexer_grpc_config
.processor_batch_size
.or(Some(DEFAULT_PROCESSOR_BATCH_SIZE));
indexer_grpc_config.output_batch_size = indexer_grpc_config
.output_batch_size
.or(Some(DEFAULT_OUTPUT_BATCH_SIZE));

if !node_config.storage.enable_indexer {
return Err(Error::ConfigSanitizerFailed(
sanitizer_name,
"storage.enable_indexer must be true if indexer_grpc.enabled is true".to_string(),
));
}
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/aptos/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,7 @@ impl CliCommand<()> for RunLocalTestnet {
let result = aptos_node::setup_test_environment_and_start_node(
config_path,
self.test_config_override,
None,
Some(test_dir_copy),
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ serde_yaml = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
aptos-config = { workspace = true }
Expand Down
Loading

0 comments on commit 9c893ac

Please sign in to comment.