diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index b213fdeba..ba800b2ee 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -54,10 +54,10 @@ where } pub struct ParquetHandler -where + where ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, -{ + { pub schema: Arc, pub writer: SerializedFileWriter>, pub buffer: Vec, @@ -72,7 +72,8 @@ where pub last_upload_time: Instant, pub processor_name: String, } -fn create_new_writer(schema: Arc) -> Result>> { + +pub fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) .build(); @@ -83,7 +84,8 @@ fn create_new_writer(schema: Arc) -> Result>> impl ParquetHandler where - ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + ParquetType: Allocative + + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, for<'a> &'a [ParquetType]: RecordWriter, { fn create_new_writer(&self) -> Result>> { diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index fd6bfcf24..e8882aed3 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -288,7 +288,7 @@ impl ProcessorTrait for DefaultProcessor { } } -// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet) +// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet) /// Processes a list of transactions and extracts relevant data into different models. /// /// This function iterates over a list of transactions, extracting block metadata transactions, diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 57d1af2c8..e7002d38f 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -73,7 +73,6 @@ impl ProcessorConfig { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix let prefix = self.name(); - // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup let valid_table_names: HashSet = ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 218c9d8ac..9174c6cea 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,16 +1,30 @@ +use crate::{ + config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig}, + steps::{ + common::gcs_uploader::{create_new_writer, GCSUploader}, + parquet_default_processor::size_buffer::ParquetBufferStep, + }, + utils::database::{new_db_pool, ArcDbPool}, +}; use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; use processor::db::parquet::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, parquet_write_set_changes::WriteSetChangeModel, }; use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use strum::{Display, EnumIter}; pub mod parquet_default_processor; +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + /// Enum representing the different types of Parquet files that can be processed. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] +#[strum(serialize_all = "snake_case")] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -123,6 +137,70 @@ impl ParquetTypeStructs { } } +async fn initialize_gcs_client(credentials: Option) -> Arc { + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + Arc::new(GCSClient::new(gcs_config)) +} + +async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result { + match config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + Ok(conn_pool) + }, + } +} + +async fn initialize_parquet_buffer_step( + gcs_client: Arc, + parquet_processor_config: ParquetDefaultProcessorConfig, + parquet_type_to_schemas: HashMap>, + processor_name: String, +) -> anyhow::Result> { + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + processor_name, + )?; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + Ok(default_size_buffer_step) +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 4dbf84f7a..222b9f48c 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,27 +3,30 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, - parquet_processors::ParquetTypeEnum, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + ParquetTypeEnum, + }, steps::{ - common::gcs_uploader::{create_new_writer, GCSUploader}, + common::processor_status_saver::get_parquet_processor_status_saver, parquet_default_processor::{ - parquet_default_extractor::ParquetDefaultExtractor, size_buffer::ParquetBufferStep, + parquet_default_extractor::ParquetDefaultExtractor, + parquet_version_tracker_step::ParquetVersionTrackerStep, }, }, utils::{ chain_id::check_or_update_chain_id, - database::{new_db_pool, run_migrations, ArcDbPool}, - starting_version::{get_min_last_success_version_parquet, get_starting_version}, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, }, }; use anyhow::Context; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; -use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, @@ -34,38 +37,18 @@ use processor::{ }, worker::TableFlags, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; -const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; - pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, - pub db_pool: ArcDbPool, // for processor status + pub db_pool: ArcDbPool, } impl ParquetDefaultProcessor { pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { - match config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - let conn_pool = new_db_pool( - &postgres_config.connection_string, - Some(postgres_config.db_pool_size), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to create connection pool for PostgresConfig: {:?}", - e - ) - })?; - - Ok(Self { - config, - db_pool: conn_pool, - }) - }, - } + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) } } @@ -87,25 +70,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, } - // Determine the processing mode (backfill or regular) - let is_backfill = self.config.backfill_config.is_some(); - - // TODO: Revisit when parquet version tracker is available. - // Query the starting version - let starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await? - } else { - // Regular mode logic: Fetch the minimum last successful version across all relevant tables - let table_names = self - .config - .processor_config - .get_table_names() - .context("Failed to get table names for the processor")?; - - get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await? - }; - // Check and update the ledger chain id to ensure we're indexing the correct chain let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) .await? @@ -125,6 +89,22 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; + // TODO: Revisit when parquet version tracker is available. + // Query the starting version + let table_names = if let Some(backfill_config) = &self.config.backfill_config { + // for backfill we will only backfill one table per job + vec![backfill_config.backfill_alias.clone()] + } else { + self.config + .processor_config + .get_table_names() + .context("Failed to get table names for the processor")? + }; + + let starting_version = + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) + .await?; + // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), @@ -136,20 +116,12 @@ impl ProcessorTrait for ParquetDefaultProcessor { opt_in_tables: TableFlags::empty(), }; - let credentials = parquet_processor_config - .google_application_credentials - .clone(); - - if let Some(credentials) = credentials { - std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); - } - - let gcs_config = GcsClientConfig::default() - .with_auth() - .await - .expect("Failed to create GCS client config"); - - let gcs_client = Arc::new(GCSClient::new(gcs_config)); + let gcs_client = initialize_gcs_client( + parquet_processor_config + .google_application_credentials + .clone(), + ) + .await; let parquet_type_to_schemas: HashMap> = [ (ParquetTypeEnum::MoveResource, MoveResource::schema()), @@ -164,37 +136,34 @@ impl ProcessorTrait for ParquetDefaultProcessor { .into_iter() .collect(); - let parquet_type_to_writer = parquet_type_to_schemas - .iter() - .map(|(key, schema)| { - let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); - (*key, writer) - }) - .collect(); - - let buffer_uploader = GCSUploader::new( + let default_size_buffer_step = initialize_parquet_buffer_step( gcs_client.clone(), + parquet_processor_config.clone(), parquet_type_to_schemas, - parquet_type_to_writer, - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), self.name().to_string(), - )?; - - let channel_size = parquet_processor_config.channel_size; - - let default_size_buffer_step = ParquetBufferStep::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - buffer_uploader, - parquet_processor_config.max_buffer_size, + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); + let channel_size = parquet_processor_config.channel_size; + // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) .end_and_return_output_receiver(channel_size); loop { diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index dbe9fae41..49c9fc390 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -25,6 +25,7 @@ pub struct GCSUploader { } #[async_trait] +#[warn(dead_code)] pub trait Uploadable { async fn handle_buffer( &mut self, @@ -38,29 +39,46 @@ impl Uploadable for GCSUploader { &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - // Directly call `upload_generic` for each buffer type - let table_name = buffer.get_table_name(); - let result = match buffer { ParquetTypeStructs::Transaction(transactions) => { - self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) - .await + self.upload_generic( + &transactions[..], + ParquetTypeEnum::Transaction, + &ParquetTypeEnum::Transaction.to_string(), + ) + .await }, ParquetTypeStructs::MoveResource(resources) => { - self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) - .await + self.upload_generic( + &resources[..], + ParquetTypeEnum::MoveResource, + &ParquetTypeEnum::MoveResource.to_string(), + ) + .await }, ParquetTypeStructs::WriteSetChange(changes) => { - self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) - .await + self.upload_generic( + &changes[..], + ParquetTypeEnum::WriteSetChange, + &ParquetTypeEnum::WriteSetChange.to_string(), + ) + .await }, ParquetTypeStructs::TableItem(items) => { - self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) - .await + self.upload_generic( + &items[..], + ParquetTypeEnum::TableItem, + &ParquetTypeEnum::TableItem.to_string(), + ) + .await }, ParquetTypeStructs::MoveModule(modules) => { - self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) - .await + self.upload_generic( + &modules[..], + ParquetTypeEnum::MoveModule, + &ParquetTypeEnum::MoveModule.to_string(), + ) + .await }, }; @@ -137,7 +155,7 @@ impl GCSUploader { &mut self, data: &[ParquetType], parquet_type: ParquetTypeEnum, - table_name: &'static str, + table_name: &str, ) -> anyhow::Result<()> where ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, @@ -180,6 +198,13 @@ impl GCSUploader { ) .await?; + println!( + "Uploaded parquet to GCS for table: {}, start_version: {}, end_version: {}", + table_name, + data[0].version(), + data[data.len() - 1].version() + ); + Ok(()) } } diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 50bb39ae3..dc66c725c 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,6 +4,7 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, + steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -40,6 +41,17 @@ pub fn get_processor_status_saver( } } +pub fn get_parquet_processor_status_saver( + conn_pool: ArcDbPool, + config: IndexerProcessorConfig, +) -> ParquetProcessorStatusSaverEnum { + let processor_name = config.processor_config.name().to_string(); + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } +} + pub enum ProcessorStatusSaverEnum { Default { conn_pool: ArcDbPool, @@ -53,6 +65,61 @@ pub enum ProcessorStatusSaverEnum { }, } +pub enum ParquetProcessorStatusSaverEnum { + Default { + conn_pool: ArcDbPool, + processor_name: String, + }, +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + let end_timestamp = last_success_batch + .metadata + .end_transaction_timestamp + .as_ref() + .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) + .map(|t| t.naive_utc()); + match self { + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } => { + let status = ProcessorStatus { + processor: processor_name.clone() + "_" + table_name, + last_success_version: last_success_batch.metadata.end_version as i64, + last_transaction_timestamp: end_timestamp, + }; + + // Save regular processor status to the database + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(processor_status::table) + .values(&status) + .on_conflict(processor_status::processor) + .do_update() + .set(( + processor_status::last_success_version + .eq(excluded(processor_status::last_success_version)), + processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), + )), + Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), + ) + .await?; + + Ok(()) + }, + } + } +} + #[async_trait] impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs new file mode 100644 index 000000000..197913f19 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs @@ -0,0 +1,171 @@ +use allocative::Allocative; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + common_steps::timed_size_buffer_step::BufferHandler, + traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}, + types::transaction_context::TransactionMetadata, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::{ + bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp, +}; +use std::{marker::PhantomData, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; + +pub struct GenericParquetBufferHandler +where + ParquetType: NamedTable + + NamedTable + + HasVersion + + HasParquetSchema + + 'static + + Allocative + + GetTimeStamp, + for<'a> &'a [ParquetType]: RecordWriter, +{ + pub schema: Arc, + pub writer: SerializedFileWriter>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, + _marker: PhantomData, +} + +#[async_trait] +impl BufferHandler for GenericParquetBufferHandler +where + ParquetType: NamedTable + + NamedTable + + HasVersion + + HasParquetSchema + + 'static + + Allocative + + Send + + GetTimeStamp, + for<'a> &'a [ParquetType]: RecordWriter, +{ + async fn handle_buffer( + &mut self, + gcs_client: &GCSClient, + buffer: Vec, + metadata: &mut TransactionMetadata, + ) -> anyhow::Result<(), ProcessorError> { + if let Err(e) = self.upload_buffer(gcs_client, buffer, metadata).await { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GenericParquetBufferHandler +where + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn create_new_writer(&self) -> anyhow::Result>> { + processor::bq_analytics::generic_parquet_processor::create_new_writer(self.schema.clone()) + } + + fn close_writer(&mut self) -> anyhow::Result>> { + let new_writer = self.create_new_writer()?; + let old_writer = std::mem::replace(&mut self.writer, new_writer); + Ok(old_writer) + } + + pub fn new( + bucket_name: String, + bucket_root: String, + schema: Arc, + processor_name: String, + ) -> anyhow::Result { + // had to append unique id to avoid concurrent write issues + let writer = + processor::bq_analytics::generic_parquet_processor::create_new_writer(schema.clone())?; + + Ok(Self { + writer, + bucket_name, + bucket_root, + schema, + processor_name, + _marker: PhantomData, + }) + } + + async fn upload_buffer( + &mut self, + gcs_client: &GCSClient, + buffer: Vec, + metadata: &mut TransactionMetadata, + ) -> anyhow::Result<()> { + if buffer.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + let first = buffer + .first() + .context("Buffer is not empty but has no first element")?; + let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp()); + let start_version = first.version(); + let last = buffer + .last() + .context("Buffer is not empty but has no last element")?; + let end_version = last.version(); + let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); + let mut row_group_writer = self + .writer + .next_row_group() + .context("Failed to get row group")?; + + buffer + .as_slice() + .write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self.close_writer().context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.bucket_root); + + upload_parquet_to_gcs( + gcs_client, + upload_buffer, + ParquetType::TABLE_NAME, + &self.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + metadata.start_version = start_version as u64; + metadata.end_version = end_version as u64; + metadata.start_transaction_timestamp = Some(first_transaction_timestamp); + metadata.end_transaction_timestamp = Some(last_transaction_timestamp); + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index fe2e64cf0..f2e5b2243 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1,2 +1,3 @@ pub mod parquet_default_extractor; +pub mod parquet_version_tracker_step; pub mod size_buffer; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs new file mode 100644 index 000000000..b83609c29 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs @@ -0,0 +1,177 @@ +use crate::parquet_processors::ParquetTypeEnum; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::collections::HashMap; + +// pub const DEFAULT_UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; +/// The `ParquetProcessorStatusSaver` trait object should be implemented in order to save the latest successfully +/// +/// processed transaction versino to storage. I.e., persisting the `processor_status` to storage. +#[async_trait] +pub trait ParquetProcessorStatusSaver { + // T represents the transaction type that the processor is tracking. + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError>; +} + +/// Tracks the versioned processing of sequential transactions, ensuring no gaps +/// occur between them. +/// +/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step +/// if the transactions are not ordered. +pub struct ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. + last_success_batch: HashMap>, + polling_interval_secs: u64, + processor_status_saver: S, +} + +impl ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + pub fn new(processor_status_saver: S, polling_interval_secs: u64) -> Self { + Self { + last_success_batch: HashMap::new(), + processor_status_saver, + polling_interval_secs, + } + } + + async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { + // this shouldn't be over 5. + println!( + "Len of last_success_batch: {}", + self.last_success_batch.len() + ); + for (parquet_type, last_success_batch) in &self.last_success_batch { + let table_name = parquet_type.to_string(); + println!("Saving processor status for table: {}", table_name); + self.processor_status_saver + .save_parquet_processor_status(last_success_batch, &table_name) + .await?; + } + Ok(()) + } +} + +#[async_trait] +impl Processable for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + current_batch: TransactionContext, + ) -> Result>, ProcessorError> { + // Initialize a new map to store the processed metadata + let mut processed_data = HashMap::new(); + + // Check for version gap before processing each key-value pair + let upload_result = current_batch.data; + println!("Upload result len {}", upload_result.len()); + for (parquet_type, current_metadata) in &upload_result { + // we need to have a map of last_sucess_bath for parquet-Type as well. + // if there is a last_success_batch for the current parquet-Type then we need to check the version gap + println!( + "checking for parquet_type: {:?} with start version {}, end_version {}", + parquet_type.to_string(), + current_metadata.start_version, + current_metadata.end_version + ); + if let Some(last_success) = self.last_success_batch.get(parquet_type) { + if last_success.metadata.end_version + 1 != current_metadata.start_version { + println!("Gap detected for {:?} starting from version: {} when the stored end_version is {}", &parquet_type.to_string(), current_metadata.start_version, last_success.metadata.end_version); + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected for {:?} starting from version: {}", + &parquet_type.to_string(), + current_metadata.start_version + ), + }); + } + } + + processed_data.insert(*parquet_type, current_metadata.clone()); + + // Update last_success_batch for the current key + self.last_success_batch + .entry(*parquet_type) + .and_modify(|e| { + e.data = (); + e.metadata = current_metadata.clone(); + }) + .or_insert(TransactionContext { + data: (), + metadata: current_metadata.clone(), + }); + } + + // Pass through the current batch with updated metadata + Ok(Some(TransactionContext { + data: processed_data, + metadata: current_batch.metadata.clone(), + })) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // Save the last successful batch to the database + self.save_processor_status().await?; + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for ParquetVersionTrackerStep +where + Self: Sized + Send + Sync + 'static, + S: ParquetProcessorStatusSaver + Send + Sync + 'static, +{ + fn poll_interval(&self) -> std::time::Duration { + std::time::Duration::from_secs(self.polling_interval_secs) + } + + async fn poll( + &mut self, + ) -> Result< + Option>>>, + ProcessorError, + > { + // TODO: Add metrics for gap count + self.save_processor_status().await?; + // Nothing should be returned + Ok(None) + } +} + +impl NamedStep for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + fn name(&self) -> String { + "ParquetVersionTrackerStep".to_string() + } +} diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 8a69ab77d..d0b0351cf 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -49,12 +49,11 @@ pub async fn get_starting_version( pub async fn get_min_last_success_version_parquet( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result { - let min_processed_version = - get_min_processed_version_from_db(conn_pool.clone(), processor_names) - .await - .context("Failed to get minimum last success version from DB")?; + let min_processed_version = get_min_processed_version_from_db(conn_pool.clone(), table_names) + .await + .context("Failed to get minimum last success version from DB")?; // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. Ok(min_processed_version.unwrap_or( @@ -65,14 +64,19 @@ pub async fn get_min_last_success_version_parquet( )) } +/// Get the minimum last success version from the database for the given processors. +/// +/// This should return the minimum of the last success version of the processors in the list. +/// If any of the tables handled by the parquet processor has no entry, it should use 0 as a default value. +/// To avoid skipping any versions, the minimum of the last success version should be used as the starting version. async fn get_min_processed_version_from_db( conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result> { let mut queries = Vec::new(); // Spawn all queries concurrently with separate connections - for processor_name in processor_names { + for processor_name in table_names { let conn_pool = conn_pool.clone(); let processor_name = processor_name.clone();