diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index b213fdeba..5151c5cc0 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -72,7 +72,8 @@ where pub last_upload_time: Instant, pub processor_name: String, } -fn create_new_writer(schema: Arc) -> Result>> { + +pub fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) .build(); diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index fd6bfcf24..e8882aed3 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -288,7 +288,7 @@ impl ProcessorTrait for DefaultProcessor { } } -// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet) +// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet) /// Processes a list of transactions and extracts relevant data into different models. /// /// This function iterates over a list of transactions, extracting block metadata transactions, diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 57d1af2c8..d2d5a2d98 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,14 +1,13 @@ use crate::{ - parquet_processors::ParquetTypeEnum, processors::{ ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, }, + utils::parquet_processor_table_mapping::VALID_TABLE_NAMES, }; use ahash::AHashMap; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use strum::IntoEnumIterator; /// This enum captures the configs for all the different processors that are defined. /// @@ -72,29 +71,18 @@ impl ProcessorConfig { match self { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix - let prefix = self.name(); + let processor_name = self.name(); - // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup - let valid_table_names: HashSet = - ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); + let valid_table_names = VALID_TABLE_NAMES + .get(processor_name) + .ok_or_else(|| anyhow::anyhow!("Processor type not recognized"))?; - // Validate and map table names with prefix - let mut validated_table_names = Vec::new(); - for table_name in &config.tables { - // Ensure the table name is a valid `ParquetTypeEnum` variant - if !valid_table_names.contains(table_name) { - return Err(anyhow::anyhow!( - "Invalid table name '{}'. Expected one of: {:?}", - table_name, - valid_table_names - )); - } - - // Append the prefix to the validated table name - validated_table_names.push(Self::format_table_name(prefix, table_name)); - } - - Ok(validated_table_names) + // Use the helper function for validation and mapping + Self::validate_and_prefix_table_names( + &config.backfill_table, + valid_table_names, + processor_name, + ) }, _ => Err(anyhow::anyhow!( "Invalid parquet processor config: {:?}", @@ -107,6 +95,26 @@ impl ProcessorConfig { fn format_table_name(prefix: &str, table_name: &str) -> String { format!("{}.{}", prefix, table_name) } + + fn validate_and_prefix_table_names( + table_names: &HashSet, + valid_table_names: &HashSet, + prefix: &str, + ) -> anyhow::Result> { + table_names + .iter() + .map(|table_name| { + if !valid_table_names.contains(table_name) { + return Err(anyhow::anyhow!( + "Invalid table name '{}'. Expected one of: {:?}", + table_name, + valid_table_names + )); + } + Ok(Self::format_table_name(prefix, table_name)) + }) + .collect() + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -155,9 +163,9 @@ pub struct ParquetDefaultProcessorConfig { pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] pub parquet_upload_interval: u64, - // list of table names to backfill. Using HashSet for fast lookups, and for future extensibility. + // Set of table name to backfill. Using HashSet for fast lookups, and for future extensibility. #[serde(default)] - pub tables: HashSet, + pub backfill_table: HashSet, } impl ParquetDefaultProcessorConfig { @@ -185,7 +193,10 @@ mod tests { #[test] fn test_valid_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + ]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -200,7 +211,7 @@ mod tests { let table_names = result.unwrap(); let table_names: HashSet = table_names.into_iter().collect(); let expected_names: HashSet = - ["Transaction".to_string(), "MoveResource".to_string()] + ["transactions".to_string(), "move_resources".to_string()] .iter() .map(|e| format!("parquet_default_processor.{}", e)) .collect(); @@ -210,7 +221,7 @@ mod tests { #[test] fn test_invalid_table_name() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -230,7 +241,7 @@ mod tests { #[test] fn test_empty_tables() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::new(), + backfill_table: HashSet::new(), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -248,7 +259,7 @@ mod tests { #[test] fn test_duplicate_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from(["transactions".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -262,14 +273,20 @@ mod tests { let table_names = result.unwrap(); assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), + "parquet_default_processor.transactions".to_string(), ]); } #[test] - fn test_all_enum_table_names() { + fn test_all_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(), + backfill_table: HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -282,9 +299,16 @@ mod tests { assert!(result.is_ok()); let table_names = result.unwrap(); - let expected_names: HashSet = ParquetTypeEnum::iter() - .map(|e| format!("parquet_default_processor.{}", e)) - .collect(); + let expected_names: HashSet = [ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ] + .iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); let table_names: HashSet = table_names.into_iter().collect(); assert_eq!(table_names, expected_names); } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 218c9d8ac..7f0febebd 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,16 +1,33 @@ +use crate::{ + config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig}, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, + }, + utils::database::{new_db_pool, ArcDbPool}, +}; use aptos_indexer_processor_sdk::utils::errors::ProcessorError; -use processor::db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; +use processor::{ + db::parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + worker::TableFlags, }; use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use strum::{Display, EnumIter}; pub mod parquet_default_processor; +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + /// Enum representing the different types of Parquet files that can be processed. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] +#[strum(serialize_all = "snake_case")] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -123,6 +140,81 @@ impl ParquetTypeStructs { } } +async fn initialize_gcs_client(credentials: Option) -> Arc { + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + Arc::new(GCSClient::new(gcs_config)) +} + +async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result { + match config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + Ok(conn_pool) + }, + } +} + +async fn initialize_parquet_buffer_step( + gcs_client: Arc, + parquet_processor_config: ParquetDefaultProcessorConfig, + parquet_type_to_schemas: HashMap>, + processor_name: String, +) -> anyhow::Result> { + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + processor_name, + )?; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + Ok(default_size_buffer_step) +} + +fn set_backfill_table_flag(table_names: Vec) -> TableFlags { + let mut backfill_table = TableFlags::empty(); + + for table_name in table_names { + if let Some(flag) = TableFlags::from_name(&table_name) { + backfill_table |= flag; + } + } + backfill_table +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index c6c0e5e62..d7ddf44ea 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,28 +3,30 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, - parquet_processors::ParquetTypeEnum, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + set_backfill_table_flag, ParquetTypeEnum, + }, steps::{ common::{ - gcs_uploader::{create_new_writer, GCSUploader}, - parquet_buffer_step::ParquetBufferStep, + parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, }, parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, utils::{ chain_id::check_or_update_chain_id, - database::{new_db_pool, run_migrations, ArcDbPool}, - starting_version::{get_min_last_success_version_parquet, get_starting_version}, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, }, }; use anyhow::Context; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; -use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, @@ -33,40 +35,19 @@ use processor::{ parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, parquet_write_set_changes::WriteSetChangeModel, }, - worker::TableFlags, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; -const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; - pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, - pub db_pool: ArcDbPool, // for processor status + pub db_pool: ArcDbPool, } impl ParquetDefaultProcessor { pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { - match config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - let conn_pool = new_db_pool( - &postgres_config.connection_string, - Some(postgres_config.db_pool_size), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to create connection pool for PostgresConfig: {:?}", - e - ) - })?; - - Ok(Self { - config, - db_pool: conn_pool, - }) - }, - } + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) } } @@ -88,25 +69,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, } - // Determine the processing mode (backfill or regular) - let is_backfill = self.config.backfill_config.is_some(); - - // TODO: Revisit when parquet version tracker is available. - // Query the starting version - let starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await? - } else { - // Regular mode logic: Fetch the minimum last successful version across all relevant tables - let table_names = self - .config - .processor_config - .get_table_names() - .context("Failed to get table names for the processor")?; - - get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await? - }; - // Check and update the ledger chain id to ensure we're indexing the correct chain let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) .await? @@ -126,6 +88,27 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; + // Query the starting version + let table_names = if self.config.backfill_config.is_some() { + parquet_processor_config + .backfill_table + .clone() + .into_iter() + .collect() + } else { + self.config + .processor_config + .get_table_names() + .context("Failed to get table names for the processor")? + }; + + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + table_names.clone(), + ) + .await?; + // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), @@ -133,24 +116,17 @@ impl ProcessorTrait for ParquetDefaultProcessor { }) .await?; + let backfill_table = set_backfill_table_flag(table_names); let parquet_default_extractor = ParquetDefaultExtractor { - opt_in_tables: TableFlags::empty(), + opt_in_tables: backfill_table, }; - let credentials = parquet_processor_config - .google_application_credentials - .clone(); - - if let Some(credentials) = credentials { - std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); - } - - let gcs_config = GcsClientConfig::default() - .with_auth() - .await - .expect("Failed to create GCS client config"); - - let gcs_client = Arc::new(GCSClient::new(gcs_config)); + let gcs_client = initialize_gcs_client( + parquet_processor_config + .google_application_credentials + .clone(), + ) + .await; let parquet_type_to_schemas: HashMap> = [ (ParquetTypeEnum::MoveResource, MoveResource::schema()), @@ -165,37 +141,34 @@ impl ProcessorTrait for ParquetDefaultProcessor { .into_iter() .collect(); - let parquet_type_to_writer = parquet_type_to_schemas - .iter() - .map(|(key, schema)| { - let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); - (*key, writer) - }) - .collect(); - - let buffer_uploader = GCSUploader::new( + let default_size_buffer_step = initialize_parquet_buffer_step( gcs_client.clone(), + parquet_processor_config.clone(), parquet_type_to_schemas, - parquet_type_to_writer, - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), self.name().to_string(), - )?; + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), true), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); let channel_size = parquet_processor_config.channel_size; - let default_size_buffer_step = ParquetBufferStep::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - buffer_uploader, - parquet_processor_config.max_buffer_size, - ); - // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) .end_and_return_output_receiver(channel_size); loop { diff --git a/rust/sdk-processor/src/processors/account_transactions_processor.rs b/rust/sdk-processor/src/processors/account_transactions_processor.rs index 7260c3c02..e1c11e539 100644 --- a/rust/sdk-processor/src/processors/account_transactions_processor.rs +++ b/rust/sdk-processor/src/processors/account_transactions_processor.rs @@ -103,7 +103,7 @@ impl ProcessorTrait for AccountTransactionsProcessor { let acc_txns_storer = AccountTransactionsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/ans_processor.rs b/rust/sdk-processor/src/processors/ans_processor.rs index fbc5605a8..7fde412a0 100644 --- a/rust/sdk-processor/src/processors/ans_processor.rs +++ b/rust/sdk-processor/src/processors/ans_processor.rs @@ -118,7 +118,7 @@ impl ProcessorTrait for AnsProcessor { AnsExtractor::new(deprecated_table_flags, self.config.processor_config.clone()); let acc_txns_storer = AnsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/default_processor.rs b/rust/sdk-processor/src/processors/default_processor.rs index e27dd0d0b..e58181831 100644 --- a/rust/sdk-processor/src/processors/default_processor.rs +++ b/rust/sdk-processor/src/processors/default_processor.rs @@ -107,7 +107,7 @@ impl ProcessorTrait for DefaultProcessor { }; let default_storer = DefaultStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 124ed0eb9..6bb7ffb8e 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -102,7 +102,7 @@ impl ProcessorTrait for EventsProcessor { let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index a7c16779d..3dd03dee5 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -106,7 +106,7 @@ impl ProcessorTrait for FungibleAssetProcessor { deprecated_table_flags, ); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/monitoring_processor.rs b/rust/sdk-processor/src/processors/monitoring_processor.rs index c18d81838..20d7bc65e 100644 --- a/rust/sdk-processor/src/processors/monitoring_processor.rs +++ b/rust/sdk-processor/src/processors/monitoring_processor.rs @@ -98,7 +98,7 @@ impl ProcessorTrait for MonitoringProcessor { }) .await?; let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs index d69945dd3..f63b50665 100644 --- a/rust/sdk-processor/src/processors/objects_processor.rs +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -131,7 +131,7 @@ impl ProcessorTrait for ObjectsProcessor { ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/stake_processor.rs b/rust/sdk-processor/src/processors/stake_processor.rs index 4ae5e322a..c1e7eea9a 100644 --- a/rust/sdk-processor/src/processors/stake_processor.rs +++ b/rust/sdk-processor/src/processors/stake_processor.rs @@ -130,7 +130,7 @@ impl ProcessorTrait for StakeProcessor { ); let storer = StakeStorer::new(self.db_pool.clone(), processor_config.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/token_v2_processor.rs b/rust/sdk-processor/src/processors/token_v2_processor.rs index 0f75af7c2..fac50cf3b 100644 --- a/rust/sdk-processor/src/processors/token_v2_processor.rs +++ b/rust/sdk-processor/src/processors/token_v2_processor.rs @@ -127,7 +127,7 @@ impl ProcessorTrait for TokenV2Processor { ); let token_v2_storer = TokenV2Storer::new(self.db_pool.clone(), processor_config.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/user_transaction_processor.rs b/rust/sdk-processor/src/processors/user_transaction_processor.rs index 73d08f5e3..80e4670ad 100644 --- a/rust/sdk-processor/src/processors/user_transaction_processor.rs +++ b/rust/sdk-processor/src/processors/user_transaction_processor.rs @@ -104,7 +104,7 @@ impl ProcessorTrait for UserTransactionProcessor { let user_txn_extractor = UserTransactionExtractor::new(deprecated_tables); let user_txn_storer = UserTransactionStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index 325299b1d..ef176f212 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -38,28 +38,46 @@ impl Uploadable for GCSUploader { &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - let table_name = buffer.get_table_name(); - let result = match buffer { ParquetTypeStructs::Transaction(transactions) => { - self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) - .await + self.upload_generic( + &transactions[..], + ParquetTypeEnum::Transaction, + &ParquetTypeEnum::Transaction.to_string(), + ) + .await }, ParquetTypeStructs::MoveResource(resources) => { - self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) - .await + self.upload_generic( + &resources[..], + ParquetTypeEnum::MoveResource, + &ParquetTypeEnum::MoveResource.to_string(), + ) + .await }, ParquetTypeStructs::WriteSetChange(changes) => { - self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) - .await + self.upload_generic( + &changes[..], + ParquetTypeEnum::WriteSetChange, + &ParquetTypeEnum::WriteSetChange.to_string(), + ) + .await }, ParquetTypeStructs::TableItem(items) => { - self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) - .await + self.upload_generic( + &items[..], + ParquetTypeEnum::TableItem, + &ParquetTypeEnum::TableItem.to_string(), + ) + .await }, ParquetTypeStructs::MoveModule(modules) => { - self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) - .await + self.upload_generic( + &modules[..], + ParquetTypeEnum::MoveModule, + &ParquetTypeEnum::MoveModule.to_string(), + ) + .await }, }; @@ -141,7 +159,7 @@ impl GCSUploader { &mut self, data: &[ParquetType], parquet_type: ParquetTypeEnum, - table_name: &'static str, + table_name: &str, ) -> anyhow::Result<()> where ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, @@ -184,6 +202,13 @@ impl GCSUploader { ) .await?; + debug!( + "Uploaded parquet to GCS for table: {}, start_version: {}, end_version: {}", + table_name, + data[0].version(), + data[data.len() - 1].version() + ); + Ok(()) } } diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 18c449997..91d0d9f88 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,5 +1,6 @@ pub mod gcs_uploader; pub mod parquet_buffer_step; +pub mod parquet_version_tracker_step; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs new file mode 100644 index 000000000..f9c869d4d --- /dev/null +++ b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs @@ -0,0 +1,166 @@ +use crate::parquet_processors::ParquetTypeEnum; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::collections::HashMap; +use tracing::debug; + +/// The `ParquetProcessorStatusSaver` trait object should be implemented in order to save the latest successfully +/// +/// processed transaction versino to storage. I.e., persisting the `processor_status` to storage. +#[async_trait] +pub trait ParquetProcessorStatusSaver { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError>; +} + +/// Tracks the versioned processing of sequential transactions, ensuring no gaps +/// occur between them. +/// +/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step +/// if the transactions are not ordered. +pub struct ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. + last_success_batch: HashMap>, + polling_interval_secs: u64, + processor_status_saver: S, +} + +impl ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + pub fn new(processor_status_saver: S, polling_interval_secs: u64) -> Self { + Self { + last_success_batch: HashMap::new(), + processor_status_saver, + polling_interval_secs, + } + } + + async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { + for (parquet_type, last_success_batch) in &self.last_success_batch { + let table_name = parquet_type.to_string(); + self.processor_status_saver + .save_parquet_processor_status(last_success_batch, &table_name) + .await?; + } + Ok(()) + } +} + +#[async_trait] +impl Processable for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + current_batch: TransactionContext, + ) -> Result>, ProcessorError> { + let mut processed_data = HashMap::new(); + + // Check for version gap before processing each key-value pair + for (parquet_type, current_metadata) in ¤t_batch.data { + // we need to have a map of last_sucess_bath for parquet-Type as well. + // if there is a last_success_batch for the current parquet-Type then we need to check the version gap + debug!( + "checking for parquet_type: {:?} with start version {}, end_version {}", + parquet_type.to_string(), + current_metadata.start_version, + current_metadata.end_version + ); + if let Some(last_success) = self.last_success_batch.get(parquet_type) { + if last_success.metadata.end_version + 1 != current_metadata.start_version { + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected for {:?} starting from version: {}", + &parquet_type.to_string(), + current_metadata.start_version + ), + }); + } + } + + processed_data.insert(*parquet_type, current_metadata.clone()); + + // Update last_success_batch for the current key + self.last_success_batch + .entry(*parquet_type) + .and_modify(|e| { + e.data = (); + e.metadata = current_metadata.clone(); + }) + .or_insert(TransactionContext { + data: (), + metadata: current_metadata.clone(), + }); + } + + // Pass through the current batch with updated metadata + Ok(Some(TransactionContext { + data: processed_data, + metadata: current_batch.metadata.clone(), + })) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // Save the last successful batch to the database + self.save_processor_status().await?; + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for ParquetVersionTrackerStep +where + Self: Sized + Send + Sync + 'static, + S: ParquetProcessorStatusSaver + Send + Sync + 'static, +{ + fn poll_interval(&self) -> std::time::Duration { + std::time::Duration::from_secs(self.polling_interval_secs) + } + + async fn poll( + &mut self, + ) -> Result< + Option>>>, + ProcessorError, + > { + // TODO: Add metrics for gap count + self.save_processor_status().await?; + // Nothing should be returned + Ok(None) + } +} + +impl NamedStep for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + fn name(&self) -> String { + "ParquetVersionTrackerStep".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 50bb39ae3..2b283e6c8 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,6 +4,7 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, + steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -19,6 +20,7 @@ use processor::schema::{backfill_processor_status, processor_status}; pub fn get_processor_status_saver( conn_pool: ArcDbPool, config: IndexerProcessorConfig, + is_parquet: bool, ) -> ProcessorStatusSaverEnum { if let Some(backfill_config) = config.backfill_config { let txn_stream_cfg = config.transaction_stream_config; @@ -33,9 +35,16 @@ pub fn get_processor_status_saver( } } else { let processor_name = config.processor_config.name().to_string(); - ProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, + if is_parquet { + ProcessorStatusSaverEnum::Parquet { + conn_pool, + processor_name, + } + } else { + ProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } } } } @@ -51,6 +60,10 @@ pub enum ProcessorStatusSaverEnum { backfill_start_version: Option, backfill_end_version: Option, }, + Parquet { + conn_pool: ArcDbPool, + processor_name: String, + }, } #[async_trait] @@ -58,6 +71,32 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( &self, last_success_batch: &TransactionContext<()>, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names(last_success_batch, None) + .await + } +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names( + last_success_batch, + Some(table_name.to_string()), + ) + .await + } +} + +impl ProcessorStatusSaverEnum { + async fn save_processor_status_with_optional_table_names( + &self, + last_success_batch: &TransactionContext<()>, + table_name: Option, ) -> Result<(), ProcessorError> { let end_timestamp = last_success_batch .metadata @@ -70,8 +109,14 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { conn_pool, processor_name, } => { + let processor_name = if table_name.is_some() { + format!("{}_{}", processor_name, table_name.unwrap()) + } else { + processor_name.clone() + }; + let status = ProcessorStatus { - processor: processor_name.clone(), + processor: processor_name, last_success_version: last_success_batch.metadata.end_version as i64, last_transaction_timestamp: end_timestamp, }; @@ -92,7 +137,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, @@ -140,11 +185,12 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { backfill_processor_status::backfill_end_version .eq(excluded(backfill_processor_status::backfill_end_version)), )), - Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, + _ => Ok(()), } } } diff --git a/rust/sdk-processor/src/utils/mod.rs b/rust/sdk-processor/src/utils/mod.rs index 252b6f0b2..eb4865fc3 100644 --- a/rust/sdk-processor/src/utils/mod.rs +++ b/rust/sdk-processor/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod chain_id; pub mod database; pub mod parquet_extractor_helper; +pub mod parquet_processor_table_mapping; pub mod starting_version; diff --git a/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs new file mode 100644 index 000000000..4353e944c --- /dev/null +++ b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs @@ -0,0 +1,19 @@ +use lazy_static::lazy_static; +use std::collections::{HashMap, HashSet}; + +lazy_static! { + pub static ref VALID_TABLE_NAMES: HashMap<&'static str, HashSet> = { + let mut map = HashMap::new(); + map.insert( + "parquet_default_processor", + HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]), + ); + map + }; +} diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 8a69ab77d..d0b0351cf 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -49,12 +49,11 @@ pub async fn get_starting_version( pub async fn get_min_last_success_version_parquet( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result { - let min_processed_version = - get_min_processed_version_from_db(conn_pool.clone(), processor_names) - .await - .context("Failed to get minimum last success version from DB")?; + let min_processed_version = get_min_processed_version_from_db(conn_pool.clone(), table_names) + .await + .context("Failed to get minimum last success version from DB")?; // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. Ok(min_processed_version.unwrap_or( @@ -65,14 +64,19 @@ pub async fn get_min_last_success_version_parquet( )) } +/// Get the minimum last success version from the database for the given processors. +/// +/// This should return the minimum of the last success version of the processors in the list. +/// If any of the tables handled by the parquet processor has no entry, it should use 0 as a default value. +/// To avoid skipping any versions, the minimum of the last success version should be used as the starting version. async fn get_min_processed_version_from_db( conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result> { let mut queries = Vec::new(); // Spawn all queries concurrently with separate connections - for processor_name in processor_names { + for processor_name in table_names { let conn_pool = conn_pool.clone(); let processor_name = processor_name.clone();