diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index ba800b2e..5151c5cc 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -54,10 +54,10 @@ where } pub struct ParquetHandler - where +where ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, - { +{ pub schema: Arc, pub writer: SerializedFileWriter>, pub buffer: Vec, @@ -84,8 +84,7 @@ pub fn create_new_writer(schema: Arc) -> Result ParquetHandler where - ParquetType: Allocative + - GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, for<'a> &'a [ParquetType]: RecordWriter, { fn create_new_writer(&self) -> Result>> { diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index e7002d38..ddeab825 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,14 +1,10 @@ -use crate::{ - parquet_processors::ParquetTypeEnum, - processors::{ - ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, - stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, - }, +use crate::processors::{ + ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, + stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, }; use ahash::AHashMap; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use strum::IntoEnumIterator; /// This enum captures the configs for all the different processors that are defined. /// @@ -74,13 +70,20 @@ impl ProcessorConfig { // Get the processor name as a prefix let prefix = self.name(); // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup - let valid_table_names: HashSet = - ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); + let valid_table_names = HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]); // Validate and map table names with prefix let mut validated_table_names = Vec::new(); for table_name in &config.tables { // Ensure the table name is a valid `ParquetTypeEnum` variant + println!("table_name: {}", table_name); + println!("valid_table_names: {:?}", valid_table_names); if !valid_table_names.contains(table_name) { return Err(anyhow::anyhow!( "Invalid table name '{}'. Expected one of: {:?}", @@ -184,7 +187,7 @@ mod tests { #[test] fn test_valid_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]), + tables: HashSet::from(["move_resources".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -199,7 +202,7 @@ mod tests { let table_names = result.unwrap(); let table_names: HashSet = table_names.into_iter().collect(); let expected_names: HashSet = - ["Transaction".to_string(), "MoveResource".to_string()] + ["transactions".to_string(), "move_resources".to_string()] .iter() .map(|e| format!("parquet_default_processor.{}", e)) .collect(); @@ -209,7 +212,7 @@ mod tests { #[test] fn test_invalid_table_name() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]), + tables: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -247,7 +250,7 @@ mod tests { #[test] fn test_duplicate_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]), + tables: HashSet::from(["transactions".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -261,14 +264,20 @@ mod tests { let table_names = result.unwrap(); assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), + "parquet_default_processor.transactions".to_string(), ]); } #[test] - fn test_all_enum_table_names() { + fn test_all_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(), + tables: HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -281,9 +290,16 @@ mod tests { assert!(result.is_ok()); let table_names = result.unwrap(); - let expected_names: HashSet = ParquetTypeEnum::iter() - .map(|e| format!("parquet_default_processor.{}", e)) - .collect(); + let expected_names: HashSet = [ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ] + .iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); let table_names: HashSet = table_names.into_iter().collect(); assert_eq!(table_names, expected_names); } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 9174c6ce..e35d803a 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,8 +1,8 @@ use crate::{ config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig}, - steps::{ - common::gcs_uploader::{create_new_writer, GCSUploader}, - parquet_default_processor::size_buffer::ParquetBufferStep, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, }, utils::database::{new_db_pool, ArcDbPool}, }; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 3da28f9e..001703aa 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -8,13 +8,11 @@ use crate::{ ParquetTypeEnum, }, steps::{ - common::{processor_status_saver::get_parquet_processor_status_saver, - parquet_buffer_step::ParquetBufferStep, - }, - parquet_default_processor::{ - parquet_default_extractor::ParquetDefaultExtractor, + common::{ parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_parquet_processor_status_saver, }, + parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, utils::{ chain_id::check_or_update_chain_id, diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 18c44999..91d0d9f8 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,5 +1,6 @@ pub mod gcs_uploader; pub mod parquet_buffer_step; +pub mod parquet_version_tracker_step; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs similarity index 100% rename from rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs rename to rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index dc66c725..00aabadd 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,7 +4,7 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, - steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver, + steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs deleted file mode 100644 index 197913f1..00000000 --- a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs +++ /dev/null @@ -1,171 +0,0 @@ -use allocative::Allocative; -use anyhow::Context; -use aptos_indexer_processor_sdk::{ - common_steps::timed_size_buffer_step::BufferHandler, - traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}, - types::transaction_context::TransactionMetadata, - utils::errors::ProcessorError, -}; -use async_trait::async_trait; -use google_cloud_storage::client::Client as GCSClient; -use parquet::{ - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, - schema::types::Type, -}; -use processor::{ - bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp, -}; -use std::{marker::PhantomData, path::PathBuf, sync::Arc}; -use tracing::{debug, error}; - -pub struct GenericParquetBufferHandler -where - ParquetType: NamedTable - + NamedTable - + HasVersion - + HasParquetSchema - + 'static - + Allocative - + GetTimeStamp, - for<'a> &'a [ParquetType]: RecordWriter, -{ - pub schema: Arc, - pub writer: SerializedFileWriter>, - pub bucket_name: String, - pub bucket_root: String, - pub processor_name: String, - _marker: PhantomData, -} - -#[async_trait] -impl BufferHandler for GenericParquetBufferHandler -where - ParquetType: NamedTable - + NamedTable - + HasVersion - + HasParquetSchema - + 'static - + Allocative - + Send - + GetTimeStamp, - for<'a> &'a [ParquetType]: RecordWriter, -{ - async fn handle_buffer( - &mut self, - gcs_client: &GCSClient, - buffer: Vec, - metadata: &mut TransactionMetadata, - ) -> anyhow::Result<(), ProcessorError> { - if let Err(e) = self.upload_buffer(gcs_client, buffer, metadata).await { - error!("Failed to upload buffer: {}", e); - return Err(ProcessorError::ProcessError { - message: format!("Failed to upload buffer: {}", e), - }); - } - Ok(()) - } -} - -pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { - let props = WriterProperties::builder() - .set_compression(parquet::basic::Compression::LZ4) - .build(); - let props_arc = Arc::new(props); - - SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") -} - -impl GenericParquetBufferHandler -where - ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, - for<'a> &'a [ParquetType]: RecordWriter, -{ - fn create_new_writer(&self) -> anyhow::Result>> { - processor::bq_analytics::generic_parquet_processor::create_new_writer(self.schema.clone()) - } - - fn close_writer(&mut self) -> anyhow::Result>> { - let new_writer = self.create_new_writer()?; - let old_writer = std::mem::replace(&mut self.writer, new_writer); - Ok(old_writer) - } - - pub fn new( - bucket_name: String, - bucket_root: String, - schema: Arc, - processor_name: String, - ) -> anyhow::Result { - // had to append unique id to avoid concurrent write issues - let writer = - processor::bq_analytics::generic_parquet_processor::create_new_writer(schema.clone())?; - - Ok(Self { - writer, - bucket_name, - bucket_root, - schema, - processor_name, - _marker: PhantomData, - }) - } - - async fn upload_buffer( - &mut self, - gcs_client: &GCSClient, - buffer: Vec, - metadata: &mut TransactionMetadata, - ) -> anyhow::Result<()> { - if buffer.is_empty() { - debug!("Buffer is empty, skipping upload."); - return Ok(()); - } - let first = buffer - .first() - .context("Buffer is not empty but has no first element")?; - let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp()); - let start_version = first.version(); - let last = buffer - .last() - .context("Buffer is not empty but has no last element")?; - let end_version = last.version(); - let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); - let mut row_group_writer = self - .writer - .next_row_group() - .context("Failed to get row group")?; - - buffer - .as_slice() - .write_to_row_group(&mut row_group_writer) - .context("Failed to write to row group")?; - row_group_writer - .close() - .context("Failed to close row group")?; - - let old_writer = self.close_writer().context("Failed to close writer")?; - let upload_buffer = old_writer - .into_inner() - .context("Failed to get inner buffer")?; - - let bucket_root = PathBuf::from(&self.bucket_root); - - upload_parquet_to_gcs( - gcs_client, - upload_buffer, - ParquetType::TABLE_NAME, - &self.bucket_name, - &bucket_root, - self.processor_name.clone(), - ) - .await?; - - metadata.start_version = start_version as u64; - metadata.end_version = end_version as u64; - metadata.start_transaction_timestamp = Some(first_transaction_timestamp); - metadata.end_transaction_timestamp = Some(last_transaction_timestamp); - - Ok(()) - } -} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index d5ab7b76..35ba1ba6 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1,2 +1 @@ pub mod parquet_default_extractor; -pub mod parquet_version_tracker_step;