From 823215e7f66e37f003bed6df979534de8ede6487 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Wed, 13 Nov 2024 20:41:05 -0800 Subject: [PATCH 1/3] [SDK-parquet] add parquet version tracker --- .../bq_analytics/generic_parquet_processor.rs | 10 +- .../src/processors/default_processor.rs | 2 +- .../src/config/processor_config.rs | 1 - .../src/parquet_processors/mod.rs | 78 ++++++++ .../parquet_default_processor.rs | 140 ++++++-------- .../src/steps/common/gcs_uploader.rs | 52 +++-- .../steps/common/processor_status_saver.rs | 67 +++++++ .../generic_parquet_buffer_handler.rs | 171 +++++++++++++++++ .../steps/parquet_default_processor/mod.rs | 1 + .../parquet_version_tracker_step.rs | 177 ++++++++++++++++++ .../src/utils/starting_version.rs | 18 +- 11 files changed, 606 insertions(+), 111 deletions(-) create mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs create mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index b213fdeba..ba800b2ee 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -54,10 +54,10 @@ where } pub struct ParquetHandler -where + where ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, -{ + { pub schema: Arc, pub writer: SerializedFileWriter>, pub buffer: Vec, @@ -72,7 +72,8 @@ where pub last_upload_time: Instant, pub processor_name: String, } -fn create_new_writer(schema: Arc) -> Result>> { + +pub fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) .build(); @@ -83,7 +84,8 @@ fn create_new_writer(schema: Arc) -> Result>> impl ParquetHandler where - ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + ParquetType: Allocative + + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, for<'a> &'a [ParquetType]: RecordWriter, { fn create_new_writer(&self) -> Result>> { diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index fd6bfcf24..e8882aed3 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -288,7 +288,7 @@ impl ProcessorTrait for DefaultProcessor { } } -// TODO: we can further optimize this by passing in a falg to selectively parse only the required data (e.g. table_items for parquet) +// TODO: we can further optimize this by passing in a flag to selectively parse only the required data (e.g. table_items for parquet) /// Processes a list of transactions and extracts relevant data into different models. /// /// This function iterates over a list of transactions, extracting block metadata transactions, diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 57d1af2c8..e7002d38f 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -73,7 +73,6 @@ impl ProcessorConfig { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix let prefix = self.name(); - // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup let valid_table_names: HashSet = ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 218c9d8ac..9174c6cea 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,16 +1,30 @@ +use crate::{ + config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig}, + steps::{ + common::gcs_uploader::{create_new_writer, GCSUploader}, + parquet_default_processor::size_buffer::ParquetBufferStep, + }, + utils::database::{new_db_pool, ArcDbPool}, +}; use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use parquet::schema::types::Type; use processor::db::parquet::models::default_models::{ parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, parquet_write_set_changes::WriteSetChangeModel, }; use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use strum::{Display, EnumIter}; pub mod parquet_default_processor; +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + /// Enum representing the different types of Parquet files that can be processed. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Display, EnumIter)] +#[strum(serialize_all = "snake_case")] #[cfg_attr( test, derive(strum::EnumDiscriminants), @@ -123,6 +137,70 @@ impl ParquetTypeStructs { } } +async fn initialize_gcs_client(credentials: Option) -> Arc { + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + Arc::new(GCSClient::new(gcs_config)) +} + +async fn initialize_database_pool(config: &DbConfig) -> anyhow::Result { + match config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + Ok(conn_pool) + }, + } +} + +async fn initialize_parquet_buffer_step( + gcs_client: Arc, + parquet_processor_config: ParquetDefaultProcessorConfig, + parquet_type_to_schemas: HashMap>, + processor_name: String, +) -> anyhow::Result> { + let parquet_type_to_writer = parquet_type_to_schemas + .iter() + .map(|(key, schema)| { + let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); + (*key, writer) + }) + .collect(); + + let buffer_uploader = GCSUploader::new( + gcs_client, + parquet_type_to_schemas, + parquet_type_to_writer, + parquet_processor_config.bucket_name.clone(), + parquet_processor_config.bucket_root.clone(), + processor_name, + )?; + + let default_size_buffer_step = ParquetBufferStep::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + buffer_uploader, + parquet_processor_config.max_buffer_size, + ); + + Ok(default_size_buffer_step) +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index c6c0e5e62..3da28f9e7 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -3,28 +3,32 @@ use crate::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, processor_config::ProcessorConfig, }, - parquet_processors::ParquetTypeEnum, + parquet_processors::{ + initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, + ParquetTypeEnum, + }, steps::{ - common::{ - gcs_uploader::{create_new_writer, GCSUploader}, - parquet_buffer_step::ParquetBufferStep, + common::{processor_status_saver::get_parquet_processor_status_saver, + parquet_buffer_step::ParquetBufferStep, + }, + parquet_default_processor::{ + parquet_default_extractor::ParquetDefaultExtractor, + parquet_version_tracker_step::ParquetVersionTrackerStep, }, - parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, utils::{ chain_id::check_or_update_chain_id, - database::{new_db_pool, run_migrations, ArcDbPool}, - starting_version::{get_min_last_success_version_parquet, get_starting_version}, + database::{run_migrations, ArcDbPool}, + starting_version::get_min_last_success_version_parquet, }, }; use anyhow::Context; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, builder::ProcessorBuilder, - common_steps::TransactionStreamStep, + common_steps::{TransactionStreamStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS}, traits::{processor_trait::ProcessorTrait, IntoRunnableStep}, }; -use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; use processor::{ bq_analytics::generic_parquet_processor::HasParquetSchema, @@ -35,38 +39,18 @@ use processor::{ }, worker::TableFlags, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; -const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; - pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, - pub db_pool: ArcDbPool, // for processor status + pub db_pool: ArcDbPool, } impl ParquetDefaultProcessor { pub async fn new(config: IndexerProcessorConfig) -> anyhow::Result { - match config.db_config { - DbConfig::PostgresConfig(ref postgres_config) => { - let conn_pool = new_db_pool( - &postgres_config.connection_string, - Some(postgres_config.db_pool_size), - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "Failed to create connection pool for PostgresConfig: {:?}", - e - ) - })?; - - Ok(Self { - config, - db_pool: conn_pool, - }) - }, - } + let db_pool = initialize_database_pool(&config.db_config).await?; + Ok(Self { config, db_pool }) } } @@ -88,25 +72,6 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, } - // Determine the processing mode (backfill or regular) - let is_backfill = self.config.backfill_config.is_some(); - - // TODO: Revisit when parquet version tracker is available. - // Query the starting version - let starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await? - } else { - // Regular mode logic: Fetch the minimum last successful version across all relevant tables - let table_names = self - .config - .processor_config - .get_table_names() - .context("Failed to get table names for the processor")?; - - get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await? - }; - // Check and update the ledger chain id to ensure we're indexing the correct chain let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) .await? @@ -126,6 +91,22 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; + // TODO: Revisit when parquet version tracker is available. + // Query the starting version + let table_names = if let Some(backfill_config) = &self.config.backfill_config { + // for backfill we will only backfill one table per job + vec![backfill_config.backfill_alias.clone()] + } else { + self.config + .processor_config + .get_table_names() + .context("Failed to get table names for the processor")? + }; + + let starting_version = + get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) + .await?; + // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), @@ -137,20 +118,12 @@ impl ProcessorTrait for ParquetDefaultProcessor { opt_in_tables: TableFlags::empty(), }; - let credentials = parquet_processor_config - .google_application_credentials - .clone(); - - if let Some(credentials) = credentials { - std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); - } - - let gcs_config = GcsClientConfig::default() - .with_auth() - .await - .expect("Failed to create GCS client config"); - - let gcs_client = Arc::new(GCSClient::new(gcs_config)); + let gcs_client = initialize_gcs_client( + parquet_processor_config + .google_application_credentials + .clone(), + ) + .await; let parquet_type_to_schemas: HashMap> = [ (ParquetTypeEnum::MoveResource, MoveResource::schema()), @@ -165,37 +138,34 @@ impl ProcessorTrait for ParquetDefaultProcessor { .into_iter() .collect(); - let parquet_type_to_writer = parquet_type_to_schemas - .iter() - .map(|(key, schema)| { - let writer = create_new_writer(schema.clone()).expect("Failed to create writer"); - (*key, writer) - }) - .collect(); - - let buffer_uploader = GCSUploader::new( + let default_size_buffer_step = initialize_parquet_buffer_step( gcs_client.clone(), + parquet_processor_config.clone(), parquet_type_to_schemas, - parquet_type_to_writer, - parquet_processor_config.bucket_name.clone(), - parquet_processor_config.bucket_root.clone(), self.name().to_string(), - )?; + ) + .await + .unwrap_or_else(|e| { + panic!("Failed to initialize parquet buffer step: {:?}", e); + }); + + let parquet_version_tracker_step = ParquetVersionTrackerStep::new( + get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()), + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + ); let channel_size = parquet_processor_config.channel_size; - let default_size_buffer_step = ParquetBufferStep::new( - Duration::from_secs(parquet_processor_config.parquet_upload_interval), - buffer_uploader, - parquet_processor_config.max_buffer_size, - ); - // Connect processor steps together let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( transaction_stream.into_runnable_step(), ) .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) .connect_to(default_size_buffer_step.into_runnable_step(), channel_size) + .connect_to( + parquet_version_tracker_step.into_runnable_step(), + channel_size, + ) .end_and_return_output_receiver(channel_size); loop { diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index 325299b1d..034843909 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -25,6 +25,7 @@ pub struct GCSUploader { } #[async_trait] +#[warn(dead_code)] pub trait Uploadable { async fn handle_buffer( &mut self, @@ -38,28 +39,46 @@ impl Uploadable for GCSUploader { &mut self, buffer: ParquetTypeStructs, ) -> anyhow::Result<(), ProcessorError> { - let table_name = buffer.get_table_name(); - let result = match buffer { ParquetTypeStructs::Transaction(transactions) => { - self.upload_generic(&transactions[..], ParquetTypeEnum::Transaction, table_name) - .await + self.upload_generic( + &transactions[..], + ParquetTypeEnum::Transaction, + &ParquetTypeEnum::Transaction.to_string(), + ) + .await }, ParquetTypeStructs::MoveResource(resources) => { - self.upload_generic(&resources[..], ParquetTypeEnum::MoveResource, table_name) - .await + self.upload_generic( + &resources[..], + ParquetTypeEnum::MoveResource, + &ParquetTypeEnum::MoveResource.to_string(), + ) + .await }, ParquetTypeStructs::WriteSetChange(changes) => { - self.upload_generic(&changes[..], ParquetTypeEnum::WriteSetChange, table_name) - .await + self.upload_generic( + &changes[..], + ParquetTypeEnum::WriteSetChange, + &ParquetTypeEnum::WriteSetChange.to_string(), + ) + .await }, ParquetTypeStructs::TableItem(items) => { - self.upload_generic(&items[..], ParquetTypeEnum::TableItem, table_name) - .await + self.upload_generic( + &items[..], + ParquetTypeEnum::TableItem, + &ParquetTypeEnum::TableItem.to_string(), + ) + .await }, ParquetTypeStructs::MoveModule(modules) => { - self.upload_generic(&modules[..], ParquetTypeEnum::MoveModule, table_name) - .await + self.upload_generic( + &modules[..], + ParquetTypeEnum::MoveModule, + &ParquetTypeEnum::MoveModule.to_string(), + ) + .await }, }; @@ -141,7 +160,7 @@ impl GCSUploader { &mut self, data: &[ParquetType], parquet_type: ParquetTypeEnum, - table_name: &'static str, + table_name: &str, ) -> anyhow::Result<()> where ParquetType: HasVersion + GetTimeStamp + HasParquetSchema, @@ -184,6 +203,13 @@ impl GCSUploader { ) .await?; + println!( + "Uploaded parquet to GCS for table: {}, start_version: {}, end_version: {}", + table_name, + data[0].version(), + data[data.len() - 1].version() + ); + Ok(()) } } diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 50bb39ae3..dc66c725c 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,6 +4,7 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, + steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -40,6 +41,17 @@ pub fn get_processor_status_saver( } } +pub fn get_parquet_processor_status_saver( + conn_pool: ArcDbPool, + config: IndexerProcessorConfig, +) -> ParquetProcessorStatusSaverEnum { + let processor_name = config.processor_config.name().to_string(); + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } +} + pub enum ProcessorStatusSaverEnum { Default { conn_pool: ArcDbPool, @@ -53,6 +65,61 @@ pub enum ProcessorStatusSaverEnum { }, } +pub enum ParquetProcessorStatusSaverEnum { + Default { + conn_pool: ArcDbPool, + processor_name: String, + }, +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + let end_timestamp = last_success_batch + .metadata + .end_transaction_timestamp + .as_ref() + .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) + .map(|t| t.naive_utc()); + match self { + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } => { + let status = ProcessorStatus { + processor: processor_name.clone() + "_" + table_name, + last_success_version: last_success_batch.metadata.end_version as i64, + last_transaction_timestamp: end_timestamp, + }; + + // Save regular processor status to the database + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(processor_status::table) + .values(&status) + .on_conflict(processor_status::processor) + .do_update() + .set(( + processor_status::last_success_version + .eq(excluded(processor_status::last_success_version)), + processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), + )), + Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), + ) + .await?; + + Ok(()) + }, + } + } +} + #[async_trait] impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs new file mode 100644 index 000000000..197913f19 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs @@ -0,0 +1,171 @@ +use allocative::Allocative; +use anyhow::Context; +use aptos_indexer_processor_sdk::{ + common_steps::timed_size_buffer_step::BufferHandler, + traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}, + types::transaction_context::TransactionMetadata, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use processor::{ + bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp, +}; +use std::{marker::PhantomData, path::PathBuf, sync::Arc}; +use tracing::{debug, error}; + +pub struct GenericParquetBufferHandler +where + ParquetType: NamedTable + + NamedTable + + HasVersion + + HasParquetSchema + + 'static + + Allocative + + GetTimeStamp, + for<'a> &'a [ParquetType]: RecordWriter, +{ + pub schema: Arc, + pub writer: SerializedFileWriter>, + pub bucket_name: String, + pub bucket_root: String, + pub processor_name: String, + _marker: PhantomData, +} + +#[async_trait] +impl BufferHandler for GenericParquetBufferHandler +where + ParquetType: NamedTable + + NamedTable + + HasVersion + + HasParquetSchema + + 'static + + Allocative + + Send + + GetTimeStamp, + for<'a> &'a [ParquetType]: RecordWriter, +{ + async fn handle_buffer( + &mut self, + gcs_client: &GCSClient, + buffer: Vec, + metadata: &mut TransactionMetadata, + ) -> anyhow::Result<(), ProcessorError> { + if let Err(e) = self.upload_buffer(gcs_client, buffer, metadata).await { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + Ok(()) + } +} + +pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + + SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") +} + +impl GenericParquetBufferHandler +where + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn create_new_writer(&self) -> anyhow::Result>> { + processor::bq_analytics::generic_parquet_processor::create_new_writer(self.schema.clone()) + } + + fn close_writer(&mut self) -> anyhow::Result>> { + let new_writer = self.create_new_writer()?; + let old_writer = std::mem::replace(&mut self.writer, new_writer); + Ok(old_writer) + } + + pub fn new( + bucket_name: String, + bucket_root: String, + schema: Arc, + processor_name: String, + ) -> anyhow::Result { + // had to append unique id to avoid concurrent write issues + let writer = + processor::bq_analytics::generic_parquet_processor::create_new_writer(schema.clone())?; + + Ok(Self { + writer, + bucket_name, + bucket_root, + schema, + processor_name, + _marker: PhantomData, + }) + } + + async fn upload_buffer( + &mut self, + gcs_client: &GCSClient, + buffer: Vec, + metadata: &mut TransactionMetadata, + ) -> anyhow::Result<()> { + if buffer.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + let first = buffer + .first() + .context("Buffer is not empty but has no first element")?; + let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp()); + let start_version = first.version(); + let last = buffer + .last() + .context("Buffer is not empty but has no last element")?; + let end_version = last.version(); + let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); + let mut row_group_writer = self + .writer + .next_row_group() + .context("Failed to get row group")?; + + buffer + .as_slice() + .write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self.close_writer().context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.bucket_root); + + upload_parquet_to_gcs( + gcs_client, + upload_buffer, + ParquetType::TABLE_NAME, + &self.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + metadata.start_version = start_version as u64; + metadata.end_version = end_version as u64; + metadata.start_transaction_timestamp = Some(first_transaction_timestamp); + metadata.end_transaction_timestamp = Some(last_transaction_timestamp); + + Ok(()) + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index 35ba1ba68..d5ab7b760 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1 +1,2 @@ pub mod parquet_default_extractor; +pub mod parquet_version_tracker_step; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs new file mode 100644 index 000000000..b83609c29 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs @@ -0,0 +1,177 @@ +use crate::parquet_processors::ParquetTypeEnum; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::{TransactionContext, TransactionMetadata}, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use std::collections::HashMap; + +// pub const DEFAULT_UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; +/// The `ParquetProcessorStatusSaver` trait object should be implemented in order to save the latest successfully +/// +/// processed transaction versino to storage. I.e., persisting the `processor_status` to storage. +#[async_trait] +pub trait ParquetProcessorStatusSaver { + // T represents the transaction type that the processor is tracking. + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError>; +} + +/// Tracks the versioned processing of sequential transactions, ensuring no gaps +/// occur between them. +/// +/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step +/// if the transactions are not ordered. +pub struct ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. + last_success_batch: HashMap>, + polling_interval_secs: u64, + processor_status_saver: S, +} + +impl ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + pub fn new(processor_status_saver: S, polling_interval_secs: u64) -> Self { + Self { + last_success_batch: HashMap::new(), + processor_status_saver, + polling_interval_secs, + } + } + + async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { + // this shouldn't be over 5. + println!( + "Len of last_success_batch: {}", + self.last_success_batch.len() + ); + for (parquet_type, last_success_batch) in &self.last_success_batch { + let table_name = parquet_type.to_string(); + println!("Saving processor status for table: {}", table_name); + self.processor_status_saver + .save_parquet_processor_status(last_success_batch, &table_name) + .await?; + } + Ok(()) + } +} + +#[async_trait] +impl Processable for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + type Input = HashMap; + type Output = HashMap; + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + current_batch: TransactionContext, + ) -> Result>, ProcessorError> { + // Initialize a new map to store the processed metadata + let mut processed_data = HashMap::new(); + + // Check for version gap before processing each key-value pair + let upload_result = current_batch.data; + println!("Upload result len {}", upload_result.len()); + for (parquet_type, current_metadata) in &upload_result { + // we need to have a map of last_sucess_bath for parquet-Type as well. + // if there is a last_success_batch for the current parquet-Type then we need to check the version gap + println!( + "checking for parquet_type: {:?} with start version {}, end_version {}", + parquet_type.to_string(), + current_metadata.start_version, + current_metadata.end_version + ); + if let Some(last_success) = self.last_success_batch.get(parquet_type) { + if last_success.metadata.end_version + 1 != current_metadata.start_version { + println!("Gap detected for {:?} starting from version: {} when the stored end_version is {}", &parquet_type.to_string(), current_metadata.start_version, last_success.metadata.end_version); + return Err(ProcessorError::ProcessError { + message: format!( + "Gap detected for {:?} starting from version: {}", + &parquet_type.to_string(), + current_metadata.start_version + ), + }); + } + } + + processed_data.insert(*parquet_type, current_metadata.clone()); + + // Update last_success_batch for the current key + self.last_success_batch + .entry(*parquet_type) + .and_modify(|e| { + e.data = (); + e.metadata = current_metadata.clone(); + }) + .or_insert(TransactionContext { + data: (), + metadata: current_metadata.clone(), + }); + } + + // Pass through the current batch with updated metadata + Ok(Some(TransactionContext { + data: processed_data, + metadata: current_batch.metadata.clone(), + })) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // Save the last successful batch to the database + self.save_processor_status().await?; + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for ParquetVersionTrackerStep +where + Self: Sized + Send + Sync + 'static, + S: ParquetProcessorStatusSaver + Send + Sync + 'static, +{ + fn poll_interval(&self) -> std::time::Duration { + std::time::Duration::from_secs(self.polling_interval_secs) + } + + async fn poll( + &mut self, + ) -> Result< + Option>>>, + ProcessorError, + > { + // TODO: Add metrics for gap count + self.save_processor_status().await?; + // Nothing should be returned + Ok(None) + } +} + +impl NamedStep for ParquetVersionTrackerStep +where + Self: Sized + Send + 'static, + S: ParquetProcessorStatusSaver + Send + 'static, +{ + fn name(&self) -> String { + "ParquetVersionTrackerStep".to_string() + } +} diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 8a69ab77d..d0b0351cf 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -49,12 +49,11 @@ pub async fn get_starting_version( pub async fn get_min_last_success_version_parquet( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result { - let min_processed_version = - get_min_processed_version_from_db(conn_pool.clone(), processor_names) - .await - .context("Failed to get minimum last success version from DB")?; + let min_processed_version = get_min_processed_version_from_db(conn_pool.clone(), table_names) + .await + .context("Failed to get minimum last success version from DB")?; // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. Ok(min_processed_version.unwrap_or( @@ -65,14 +64,19 @@ pub async fn get_min_last_success_version_parquet( )) } +/// Get the minimum last success version from the database for the given processors. +/// +/// This should return the minimum of the last success version of the processors in the list. +/// If any of the tables handled by the parquet processor has no entry, it should use 0 as a default value. +/// To avoid skipping any versions, the minimum of the last success version should be used as the starting version. async fn get_min_processed_version_from_db( conn_pool: ArcDbPool, - processor_names: Vec, + table_names: Vec, ) -> Result> { let mut queries = Vec::new(); // Spawn all queries concurrently with separate connections - for processor_name in processor_names { + for processor_name in table_names { let conn_pool = conn_pool.clone(); let processor_name = processor_name.clone(); From b2767841375286ab47f2554fd24cbc6b3be89b5d Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Fri, 15 Nov 2024 10:21:17 -0800 Subject: [PATCH 2/3] fix conflict --- .../bq_analytics/generic_parquet_processor.rs | 7 +- .../src/config/processor_config.rs | 95 ++++++---- .../src/parquet_processors/mod.rs | 28 ++- .../parquet_default_processor.rs | 33 ++-- .../src/steps/common/gcs_uploader.rs | 3 +- rust/sdk-processor/src/steps/common/mod.rs | 2 + .../common/parquet_processor_status_saver.rs | 152 ++++++++++++++++ .../parquet_version_tracker_step.rs | 17 +- .../steps/common/processor_status_saver.rs | 67 ------- .../generic_parquet_buffer_handler.rs | 171 ------------------ .../steps/parquet_default_processor/mod.rs | 1 - rust/sdk-processor/src/utils/mod.rs | 1 + .../utils/parquet_processor_table_mapping.rs | 19 ++ 13 files changed, 280 insertions(+), 316 deletions(-) create mode 100644 rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs rename rust/sdk-processor/src/steps/{parquet_default_processor => common}/parquet_version_tracker_step.rs (87%) delete mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs create mode 100644 rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index ba800b2ee..5151c5cc0 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -54,10 +54,10 @@ where } pub struct ParquetHandler - where +where ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, - { +{ pub schema: Arc, pub writer: SerializedFileWriter>, pub buffer: Vec, @@ -84,8 +84,7 @@ pub fn create_new_writer(schema: Arc) -> Result ParquetHandler where - ParquetType: Allocative + - GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, for<'a> &'a [ParquetType]: RecordWriter, { fn create_new_writer(&self) -> Result>> { diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index e7002d38f..d2d5a2d98 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,14 +1,13 @@ use crate::{ - parquet_processors::ParquetTypeEnum, processors::{ ans_processor::AnsProcessorConfig, objects_processor::ObjectsProcessorConfig, stake_processor::StakeProcessorConfig, token_v2_processor::TokenV2ProcessorConfig, }, + utils::parquet_processor_table_mapping::VALID_TABLE_NAMES, }; use ahash::AHashMap; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use strum::IntoEnumIterator; /// This enum captures the configs for all the different processors that are defined. /// @@ -72,28 +71,18 @@ impl ProcessorConfig { match self { ProcessorConfig::ParquetDefaultProcessor(config) => { // Get the processor name as a prefix - let prefix = self.name(); - // Collect valid table names from `ParquetTypeEnum` into a set for quick lookup - let valid_table_names: HashSet = - ParquetTypeEnum::iter().map(|e| e.to_string()).collect(); + let processor_name = self.name(); - // Validate and map table names with prefix - let mut validated_table_names = Vec::new(); - for table_name in &config.tables { - // Ensure the table name is a valid `ParquetTypeEnum` variant - if !valid_table_names.contains(table_name) { - return Err(anyhow::anyhow!( - "Invalid table name '{}'. Expected one of: {:?}", - table_name, - valid_table_names - )); - } + let valid_table_names = VALID_TABLE_NAMES + .get(processor_name) + .ok_or_else(|| anyhow::anyhow!("Processor type not recognized"))?; - // Append the prefix to the validated table name - validated_table_names.push(Self::format_table_name(prefix, table_name)); - } - - Ok(validated_table_names) + // Use the helper function for validation and mapping + Self::validate_and_prefix_table_names( + &config.backfill_table, + valid_table_names, + processor_name, + ) }, _ => Err(anyhow::anyhow!( "Invalid parquet processor config: {:?}", @@ -106,6 +95,26 @@ impl ProcessorConfig { fn format_table_name(prefix: &str, table_name: &str) -> String { format!("{}.{}", prefix, table_name) } + + fn validate_and_prefix_table_names( + table_names: &HashSet, + valid_table_names: &HashSet, + prefix: &str, + ) -> anyhow::Result> { + table_names + .iter() + .map(|table_name| { + if !valid_table_names.contains(table_name) { + return Err(anyhow::anyhow!( + "Invalid table name '{}'. Expected one of: {:?}", + table_name, + valid_table_names + )); + } + Ok(Self::format_table_name(prefix, table_name)) + }) + .collect() + } } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -154,9 +163,9 @@ pub struct ParquetDefaultProcessorConfig { pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] pub parquet_upload_interval: u64, - // list of table names to backfill. Using HashSet for fast lookups, and for future extensibility. + // Set of table name to backfill. Using HashSet for fast lookups, and for future extensibility. #[serde(default)] - pub tables: HashSet, + pub backfill_table: HashSet, } impl ParquetDefaultProcessorConfig { @@ -184,7 +193,10 @@ mod tests { #[test] fn test_valid_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["MoveResource".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + ]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -199,7 +211,7 @@ mod tests { let table_names = result.unwrap(); let table_names: HashSet = table_names.into_iter().collect(); let expected_names: HashSet = - ["Transaction".to_string(), "MoveResource".to_string()] + ["transactions".to_string(), "move_resources".to_string()] .iter() .map(|e| format!("parquet_default_processor.{}", e)) .collect(); @@ -209,7 +221,7 @@ mod tests { #[test] fn test_invalid_table_name() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["InvalidTable".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from(["InvalidTable".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -229,7 +241,7 @@ mod tests { #[test] fn test_empty_tables() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::new(), + backfill_table: HashSet::new(), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -247,7 +259,7 @@ mod tests { #[test] fn test_duplicate_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: HashSet::from(["Transaction".to_string(), "Transaction".to_string()]), + backfill_table: HashSet::from(["transactions".to_string(), "transactions".to_string()]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -261,14 +273,20 @@ mod tests { let table_names = result.unwrap(); assert_eq!(table_names, vec![ - "parquet_default_processor.Transaction".to_string(), + "parquet_default_processor.transactions".to_string(), ]); } #[test] - fn test_all_enum_table_names() { + fn test_all_table_names() { let config = ProcessorConfig::ParquetDefaultProcessor(ParquetDefaultProcessorConfig { - tables: ParquetTypeEnum::iter().map(|e| e.to_string()).collect(), + backfill_table: HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]), bucket_name: "bucket_name".to_string(), bucket_root: "bucket_root".to_string(), google_application_credentials: None, @@ -281,9 +299,16 @@ mod tests { assert!(result.is_ok()); let table_names = result.unwrap(); - let expected_names: HashSet = ParquetTypeEnum::iter() - .map(|e| format!("parquet_default_processor.{}", e)) - .collect(); + let expected_names: HashSet = [ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ] + .iter() + .map(|e| format!("parquet_default_processor.{}", e)) + .collect(); let table_names: HashSet = table_names.into_iter().collect(); assert_eq!(table_names, expected_names); } diff --git a/rust/sdk-processor/src/parquet_processors/mod.rs b/rust/sdk-processor/src/parquet_processors/mod.rs index 9174c6cea..7f0febebd 100644 --- a/rust/sdk-processor/src/parquet_processors/mod.rs +++ b/rust/sdk-processor/src/parquet_processors/mod.rs @@ -1,18 +1,21 @@ use crate::{ config::{db_config::DbConfig, processor_config::ParquetDefaultProcessorConfig}, - steps::{ - common::gcs_uploader::{create_new_writer, GCSUploader}, - parquet_default_processor::size_buffer::ParquetBufferStep, + steps::common::{ + gcs_uploader::{create_new_writer, GCSUploader}, + parquet_buffer_step::ParquetBufferStep, }, utils::database::{new_db_pool, ArcDbPool}, }; use aptos_indexer_processor_sdk::utils::errors::ProcessorError; use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; use parquet::schema::types::Type; -use processor::db::parquet::models::default_models::{ - parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, - parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, - parquet_write_set_changes::WriteSetChangeModel, +use processor::{ + db::parquet::models::default_models::{ + parquet_move_modules::MoveModule, parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, + parquet_write_set_changes::WriteSetChangeModel, + }, + worker::TableFlags, }; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, sync::Arc, time::Duration}; @@ -201,6 +204,17 @@ async fn initialize_parquet_buffer_step( Ok(default_size_buffer_step) } +fn set_backfill_table_flag(table_names: Vec) -> TableFlags { + let mut backfill_table = TableFlags::empty(); + + for table_name in table_names { + if let Some(flag) = TableFlags::from_name(&table_name) { + backfill_table |= flag; + } + } + backfill_table +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 3da28f9e7..125b2c44a 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -5,16 +5,14 @@ use crate::{ }, parquet_processors::{ initialize_database_pool, initialize_gcs_client, initialize_parquet_buffer_step, - ParquetTypeEnum, + set_backfill_table_flag, ParquetTypeEnum, }, steps::{ - common::{processor_status_saver::get_parquet_processor_status_saver, - parquet_buffer_step::ParquetBufferStep, - }, - parquet_default_processor::{ - parquet_default_extractor::ParquetDefaultExtractor, + common::{ + parquet_processor_status_saver::get_parquet_processor_status_saver, parquet_version_tracker_step::ParquetVersionTrackerStep, }, + parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, utils::{ chain_id::check_or_update_chain_id, @@ -37,7 +35,6 @@ use processor::{ parquet_move_tables::TableItem, parquet_transactions::Transaction as ParquetTransaction, parquet_write_set_changes::WriteSetChangeModel, }, - worker::TableFlags, }; use std::{collections::HashMap, sync::Arc}; use tracing::{debug, info}; @@ -91,11 +88,13 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; - // TODO: Revisit when parquet version tracker is available. // Query the starting version - let table_names = if let Some(backfill_config) = &self.config.backfill_config { - // for backfill we will only backfill one table per job - vec![backfill_config.backfill_alias.clone()] + let table_names = if self.config.backfill_config.is_some() { + parquet_processor_config + .backfill_table + .clone() + .into_iter() + .collect() } else { self.config .processor_config @@ -103,9 +102,12 @@ impl ProcessorTrait for ParquetDefaultProcessor { .context("Failed to get table names for the processor")? }; - let starting_version = - get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await?; + let starting_version = get_min_last_success_version_parquet( + &self.config, + self.db_pool.clone(), + table_names.clone(), + ) + .await?; // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { @@ -114,8 +116,9 @@ impl ProcessorTrait for ParquetDefaultProcessor { }) .await?; + let backfill_table = set_backfill_table_flag(table_names); let parquet_default_extractor = ParquetDefaultExtractor { - opt_in_tables: TableFlags::empty(), + opt_in_tables: backfill_table, }; let gcs_client = initialize_gcs_client( diff --git a/rust/sdk-processor/src/steps/common/gcs_uploader.rs b/rust/sdk-processor/src/steps/common/gcs_uploader.rs index 034843909..ef176f212 100644 --- a/rust/sdk-processor/src/steps/common/gcs_uploader.rs +++ b/rust/sdk-processor/src/steps/common/gcs_uploader.rs @@ -25,7 +25,6 @@ pub struct GCSUploader { } #[async_trait] -#[warn(dead_code)] pub trait Uploadable { async fn handle_buffer( &mut self, @@ -203,7 +202,7 @@ impl GCSUploader { ) .await?; - println!( + debug!( "Uploaded parquet to GCS for table: {}, start_version: {}, end_version: {}", table_name, data[0].version(), diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 18c449997..3662c5d22 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,5 +1,7 @@ pub mod gcs_uploader; pub mod parquet_buffer_step; +pub mod parquet_processor_status_saver; +pub mod parquet_version_tracker_step; pub mod processor_status_saver; pub use processor_status_saver::get_processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs b/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs new file mode 100644 index 000000000..aebb999ff --- /dev/null +++ b/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs @@ -0,0 +1,152 @@ +use crate::{ + config::indexer_processor_config::IndexerProcessorConfig, + db::common::models::{ + backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, + processor_status::ProcessorStatus, + }, + steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, + utils::database::{execute_with_better_error, ArcDbPool}, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + types::transaction_context::TransactionContext, + utils::{errors::ProcessorError, time::parse_timestamp}, +}; +use async_trait::async_trait; +use diesel::{upsert::excluded, ExpressionMethods}; +use processor::schema::{backfill_processor_status, processor_status}; + +pub fn get_parquet_processor_status_saver( + conn_pool: ArcDbPool, + config: IndexerProcessorConfig, +) -> ParquetProcessorStatusSaverEnum { + if let Some(backfill_config) = config.backfill_config { + println!("Backfill config: {:?}", backfill_config); + let txn_stream_cfg = config.transaction_stream_config; + let backfill_start_version = txn_stream_cfg.starting_version; + let backfill_end_version = txn_stream_cfg.request_ending_version; + let backfill_alias = backfill_config.backfill_alias.clone(); + ParquetProcessorStatusSaverEnum::Backfill { + conn_pool, + backfill_alias, + backfill_start_version, + backfill_end_version, + } + } else { + let processor_name = config.processor_config.name().to_string(); + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } + } +} + +pub enum ParquetProcessorStatusSaverEnum { + Default { + conn_pool: ArcDbPool, + processor_name: String, + }, + Backfill { + conn_pool: ArcDbPool, + backfill_alias: String, + backfill_start_version: Option, + backfill_end_version: Option, + }, +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + let end_timestamp = last_success_batch + .metadata + .end_transaction_timestamp + .as_ref() + .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) + .map(|t| t.naive_utc()); + match self { + ParquetProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } => { + let status = ProcessorStatus { + processor: processor_name.clone() + "_" + table_name, + last_success_version: last_success_batch.metadata.end_version as i64, + last_transaction_timestamp: end_timestamp, + }; + + // Save regular processor status to the database + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(processor_status::table) + .values(&status) + .on_conflict(processor_status::processor) + .do_update() + .set(( + processor_status::last_success_version + .eq(excluded(processor_status::last_success_version)), + processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), + )), + Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), + ) + .await?; + + Ok(()) + }, + ParquetProcessorStatusSaverEnum::Backfill { + conn_pool, + backfill_alias, + backfill_start_version, + backfill_end_version, + } => { + let lst_success_version = last_success_batch.metadata.end_version as i64; + let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| { + lst_success_version >= backfill_end_version as i64 + }) { + BackfillStatus::Complete + } else { + BackfillStatus::InProgress + }; + let backfill_end_version_mapped = backfill_end_version.map(|v| v as i64); + let status = BackfillProcessorStatus { + backfill_alias: backfill_alias.clone(), + backfill_status, + last_success_version: lst_success_version, + last_transaction_timestamp: end_timestamp, + backfill_start_version: backfill_start_version.unwrap_or(0) as i64, + backfill_end_version: backfill_end_version_mapped, + }; + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(backfill_processor_status::table) + .values(&status) + .on_conflict(backfill_processor_status::backfill_alias) + .do_update() + .set(( + backfill_processor_status::backfill_status + .eq(excluded(backfill_processor_status::backfill_status)), + backfill_processor_status::last_success_version + .eq(excluded(backfill_processor_status::last_success_version)), + backfill_processor_status::last_updated + .eq(excluded(backfill_processor_status::last_updated)), + backfill_processor_status::last_transaction_timestamp.eq(excluded( + backfill_processor_status::last_transaction_timestamp, + )), + backfill_processor_status::backfill_start_version + .eq(excluded(backfill_processor_status::backfill_start_version)), + backfill_processor_status::backfill_end_version + .eq(excluded(backfill_processor_status::backfill_end_version)), + )), + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), + ) + .await?; + Ok(()) + }, + } + } +} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs similarity index 87% rename from rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs rename to rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs index b83609c29..f9c869d4d 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_version_tracker_step.rs +++ b/rust/sdk-processor/src/steps/common/parquet_version_tracker_step.rs @@ -9,14 +9,13 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use std::collections::HashMap; +use tracing::debug; -// pub const DEFAULT_UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; /// The `ParquetProcessorStatusSaver` trait object should be implemented in order to save the latest successfully /// /// processed transaction versino to storage. I.e., persisting the `processor_status` to storage. #[async_trait] pub trait ParquetProcessorStatusSaver { - // T represents the transaction type that the processor is tracking. async fn save_parquet_processor_status( &self, last_success_batch: &TransactionContext<()>, @@ -54,14 +53,8 @@ where } async fn save_processor_status(&mut self) -> Result<(), ProcessorError> { - // this shouldn't be over 5. - println!( - "Len of last_success_batch: {}", - self.last_success_batch.len() - ); for (parquet_type, last_success_batch) in &self.last_success_batch { let table_name = parquet_type.to_string(); - println!("Saving processor status for table: {}", table_name); self.processor_status_saver .save_parquet_processor_status(last_success_batch, &table_name) .await?; @@ -84,16 +77,13 @@ where &mut self, current_batch: TransactionContext, ) -> Result>, ProcessorError> { - // Initialize a new map to store the processed metadata let mut processed_data = HashMap::new(); // Check for version gap before processing each key-value pair - let upload_result = current_batch.data; - println!("Upload result len {}", upload_result.len()); - for (parquet_type, current_metadata) in &upload_result { + for (parquet_type, current_metadata) in ¤t_batch.data { // we need to have a map of last_sucess_bath for parquet-Type as well. // if there is a last_success_batch for the current parquet-Type then we need to check the version gap - println!( + debug!( "checking for parquet_type: {:?} with start version {}, end_version {}", parquet_type.to_string(), current_metadata.start_version, @@ -101,7 +91,6 @@ where ); if let Some(last_success) = self.last_success_batch.get(parquet_type) { if last_success.metadata.end_version + 1 != current_metadata.start_version { - println!("Gap detected for {:?} starting from version: {} when the stored end_version is {}", &parquet_type.to_string(), current_metadata.start_version, last_success.metadata.end_version); return Err(ProcessorError::ProcessError { message: format!( "Gap detected for {:?} starting from version: {}", diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index dc66c725c..50bb39ae3 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,7 +4,6 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, - steps::parquet_default_processor::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -41,17 +40,6 @@ pub fn get_processor_status_saver( } } -pub fn get_parquet_processor_status_saver( - conn_pool: ArcDbPool, - config: IndexerProcessorConfig, -) -> ParquetProcessorStatusSaverEnum { - let processor_name = config.processor_config.name().to_string(); - ParquetProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, - } -} - pub enum ProcessorStatusSaverEnum { Default { conn_pool: ArcDbPool, @@ -65,61 +53,6 @@ pub enum ProcessorStatusSaverEnum { }, } -pub enum ParquetProcessorStatusSaverEnum { - Default { - conn_pool: ArcDbPool, - processor_name: String, - }, -} - -#[async_trait] -impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum { - async fn save_parquet_processor_status( - &self, - last_success_batch: &TransactionContext<()>, - table_name: &str, - ) -> Result<(), ProcessorError> { - let end_timestamp = last_success_batch - .metadata - .end_transaction_timestamp - .as_ref() - .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) - .map(|t| t.naive_utc()); - match self { - ParquetProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, - } => { - let status = ProcessorStatus { - processor: processor_name.clone() + "_" + table_name, - last_success_version: last_success_batch.metadata.end_version as i64, - last_transaction_timestamp: end_timestamp, - }; - - // Save regular processor status to the database - execute_with_better_error( - conn_pool.clone(), - diesel::insert_into(processor_status::table) - .values(&status) - .on_conflict(processor_status::processor) - .do_update() - .set(( - processor_status::last_success_version - .eq(excluded(processor_status::last_success_version)), - processor_status::last_updated.eq(excluded(processor_status::last_updated)), - processor_status::last_transaction_timestamp - .eq(excluded(processor_status::last_transaction_timestamp)), - )), - Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), - ) - .await?; - - Ok(()) - }, - } - } -} - #[async_trait] impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs b/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs deleted file mode 100644 index 197913f19..000000000 --- a/rust/sdk-processor/src/steps/parquet_default_processor/generic_parquet_buffer_handler.rs +++ /dev/null @@ -1,171 +0,0 @@ -use allocative::Allocative; -use anyhow::Context; -use aptos_indexer_processor_sdk::{ - common_steps::timed_size_buffer_step::BufferHandler, - traits::parquet_extract_trait::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}, - types::transaction_context::TransactionMetadata, - utils::errors::ProcessorError, -}; -use async_trait::async_trait; -use google_cloud_storage::client::Client as GCSClient; -use parquet::{ - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, - schema::types::Type, -}; -use processor::{ - bq_analytics::gcs_handler::upload_parquet_to_gcs, utils::util::naive_datetime_to_timestamp, -}; -use std::{marker::PhantomData, path::PathBuf, sync::Arc}; -use tracing::{debug, error}; - -pub struct GenericParquetBufferHandler -where - ParquetType: NamedTable - + NamedTable - + HasVersion - + HasParquetSchema - + 'static - + Allocative - + GetTimeStamp, - for<'a> &'a [ParquetType]: RecordWriter, -{ - pub schema: Arc, - pub writer: SerializedFileWriter>, - pub bucket_name: String, - pub bucket_root: String, - pub processor_name: String, - _marker: PhantomData, -} - -#[async_trait] -impl BufferHandler for GenericParquetBufferHandler -where - ParquetType: NamedTable - + NamedTable - + HasVersion - + HasParquetSchema - + 'static - + Allocative - + Send - + GetTimeStamp, - for<'a> &'a [ParquetType]: RecordWriter, -{ - async fn handle_buffer( - &mut self, - gcs_client: &GCSClient, - buffer: Vec, - metadata: &mut TransactionMetadata, - ) -> anyhow::Result<(), ProcessorError> { - if let Err(e) = self.upload_buffer(gcs_client, buffer, metadata).await { - error!("Failed to upload buffer: {}", e); - return Err(ProcessorError::ProcessError { - message: format!("Failed to upload buffer: {}", e), - }); - } - Ok(()) - } -} - -pub fn create_new_writer(schema: Arc) -> anyhow::Result>> { - let props = WriterProperties::builder() - .set_compression(parquet::basic::Compression::LZ4) - .build(); - let props_arc = Arc::new(props); - - SerializedFileWriter::new(Vec::new(), schema, props_arc).context("Failed to create new writer") -} - -impl GenericParquetBufferHandler -where - ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, - for<'a> &'a [ParquetType]: RecordWriter, -{ - fn create_new_writer(&self) -> anyhow::Result>> { - processor::bq_analytics::generic_parquet_processor::create_new_writer(self.schema.clone()) - } - - fn close_writer(&mut self) -> anyhow::Result>> { - let new_writer = self.create_new_writer()?; - let old_writer = std::mem::replace(&mut self.writer, new_writer); - Ok(old_writer) - } - - pub fn new( - bucket_name: String, - bucket_root: String, - schema: Arc, - processor_name: String, - ) -> anyhow::Result { - // had to append unique id to avoid concurrent write issues - let writer = - processor::bq_analytics::generic_parquet_processor::create_new_writer(schema.clone())?; - - Ok(Self { - writer, - bucket_name, - bucket_root, - schema, - processor_name, - _marker: PhantomData, - }) - } - - async fn upload_buffer( - &mut self, - gcs_client: &GCSClient, - buffer: Vec, - metadata: &mut TransactionMetadata, - ) -> anyhow::Result<()> { - if buffer.is_empty() { - debug!("Buffer is empty, skipping upload."); - return Ok(()); - } - let first = buffer - .first() - .context("Buffer is not empty but has no first element")?; - let first_transaction_timestamp = naive_datetime_to_timestamp(first.get_timestamp()); - let start_version = first.version(); - let last = buffer - .last() - .context("Buffer is not empty but has no last element")?; - let end_version = last.version(); - let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); - let mut row_group_writer = self - .writer - .next_row_group() - .context("Failed to get row group")?; - - buffer - .as_slice() - .write_to_row_group(&mut row_group_writer) - .context("Failed to write to row group")?; - row_group_writer - .close() - .context("Failed to close row group")?; - - let old_writer = self.close_writer().context("Failed to close writer")?; - let upload_buffer = old_writer - .into_inner() - .context("Failed to get inner buffer")?; - - let bucket_root = PathBuf::from(&self.bucket_root); - - upload_parquet_to_gcs( - gcs_client, - upload_buffer, - ParquetType::TABLE_NAME, - &self.bucket_name, - &bucket_root, - self.processor_name.clone(), - ) - .await?; - - metadata.start_version = start_version as u64; - metadata.end_version = end_version as u64; - metadata.start_transaction_timestamp = Some(first_transaction_timestamp); - metadata.end_transaction_timestamp = Some(last_transaction_timestamp); - - Ok(()) - } -} diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs index d5ab7b760..35ba1ba68 100644 --- a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -1,2 +1 @@ pub mod parquet_default_extractor; -pub mod parquet_version_tracker_step; diff --git a/rust/sdk-processor/src/utils/mod.rs b/rust/sdk-processor/src/utils/mod.rs index 252b6f0b2..eb4865fc3 100644 --- a/rust/sdk-processor/src/utils/mod.rs +++ b/rust/sdk-processor/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod chain_id; pub mod database; pub mod parquet_extractor_helper; +pub mod parquet_processor_table_mapping; pub mod starting_version; diff --git a/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs new file mode 100644 index 000000000..4353e944c --- /dev/null +++ b/rust/sdk-processor/src/utils/parquet_processor_table_mapping.rs @@ -0,0 +1,19 @@ +use lazy_static::lazy_static; +use std::collections::{HashMap, HashSet}; + +lazy_static! { + pub static ref VALID_TABLE_NAMES: HashMap<&'static str, HashSet> = { + let mut map = HashMap::new(); + map.insert( + "parquet_default_processor", + HashSet::from([ + "move_resources".to_string(), + "transactions".to_string(), + "write_set_changes".to_string(), + "table_items".to_string(), + "move_modules".to_string(), + ]), + ); + map + }; +} From 9d249dfc49cda2c9e9138091e74864a47bace116 Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Fri, 15 Nov 2024 15:09:57 -0800 Subject: [PATCH 3/3] refactor parquet processor status saver into processor status saver --- .../parquet_default_processor.rs | 4 +- .../account_transactions_processor.rs | 2 +- .../src/processors/ans_processor.rs | 2 +- .../src/processors/default_processor.rs | 2 +- .../src/processors/events_processor.rs | 2 +- .../processors/fungible_asset_processor.rs | 2 +- .../src/processors/monitoring_processor.rs | 2 +- .../src/processors/objects_processor.rs | 2 +- .../src/processors/stake_processor.rs | 2 +- .../src/processors/token_v2_processor.rs | 2 +- .../processors/user_transaction_processor.rs | 2 +- rust/sdk-processor/src/steps/common/mod.rs | 1 - .../common/parquet_processor_status_saver.rs | 152 ------------------ .../steps/common/processor_status_saver.rs | 60 ++++++- 14 files changed, 65 insertions(+), 172 deletions(-) delete mode 100644 rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 125b2c44a..d7ddf44ea 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -9,8 +9,8 @@ use crate::{ }, steps::{ common::{ - parquet_processor_status_saver::get_parquet_processor_status_saver, parquet_version_tracker_step::ParquetVersionTrackerStep, + processor_status_saver::get_processor_status_saver, }, parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor, }, @@ -153,7 +153,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { }); let parquet_version_tracker_step = ParquetVersionTrackerStep::new( - get_parquet_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), true), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/account_transactions_processor.rs b/rust/sdk-processor/src/processors/account_transactions_processor.rs index 7260c3c02..e1c11e539 100644 --- a/rust/sdk-processor/src/processors/account_transactions_processor.rs +++ b/rust/sdk-processor/src/processors/account_transactions_processor.rs @@ -103,7 +103,7 @@ impl ProcessorTrait for AccountTransactionsProcessor { let acc_txns_storer = AccountTransactionsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/ans_processor.rs b/rust/sdk-processor/src/processors/ans_processor.rs index fbc5605a8..7fde412a0 100644 --- a/rust/sdk-processor/src/processors/ans_processor.rs +++ b/rust/sdk-processor/src/processors/ans_processor.rs @@ -118,7 +118,7 @@ impl ProcessorTrait for AnsProcessor { AnsExtractor::new(deprecated_table_flags, self.config.processor_config.clone()); let acc_txns_storer = AnsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/default_processor.rs b/rust/sdk-processor/src/processors/default_processor.rs index e27dd0d0b..e58181831 100644 --- a/rust/sdk-processor/src/processors/default_processor.rs +++ b/rust/sdk-processor/src/processors/default_processor.rs @@ -107,7 +107,7 @@ impl ProcessorTrait for DefaultProcessor { }; let default_storer = DefaultStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 124ed0eb9..6bb7ffb8e 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -102,7 +102,7 @@ impl ProcessorTrait for EventsProcessor { let events_extractor = EventsExtractor {}; let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index a7c16779d..3dd03dee5 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -106,7 +106,7 @@ impl ProcessorTrait for FungibleAssetProcessor { deprecated_table_flags, ); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/monitoring_processor.rs b/rust/sdk-processor/src/processors/monitoring_processor.rs index c18d81838..20d7bc65e 100644 --- a/rust/sdk-processor/src/processors/monitoring_processor.rs +++ b/rust/sdk-processor/src/processors/monitoring_processor.rs @@ -98,7 +98,7 @@ impl ProcessorTrait for MonitoringProcessor { }) .await?; let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs index d69945dd3..f63b50665 100644 --- a/rust/sdk-processor/src/processors/objects_processor.rs +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -131,7 +131,7 @@ impl ProcessorTrait for ObjectsProcessor { ObjectsStorer::new(self.db_pool.clone(), per_table_chunk_sizes.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/stake_processor.rs b/rust/sdk-processor/src/processors/stake_processor.rs index 4ae5e322a..c1e7eea9a 100644 --- a/rust/sdk-processor/src/processors/stake_processor.rs +++ b/rust/sdk-processor/src/processors/stake_processor.rs @@ -130,7 +130,7 @@ impl ProcessorTrait for StakeProcessor { ); let storer = StakeStorer::new(self.db_pool.clone(), processor_config.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/token_v2_processor.rs b/rust/sdk-processor/src/processors/token_v2_processor.rs index 0f75af7c2..fac50cf3b 100644 --- a/rust/sdk-processor/src/processors/token_v2_processor.rs +++ b/rust/sdk-processor/src/processors/token_v2_processor.rs @@ -127,7 +127,7 @@ impl ProcessorTrait for TokenV2Processor { ); let token_v2_storer = TokenV2Storer::new(self.db_pool.clone(), processor_config.clone()); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); // Connect processor steps together diff --git a/rust/sdk-processor/src/processors/user_transaction_processor.rs b/rust/sdk-processor/src/processors/user_transaction_processor.rs index 73d08f5e3..80e4670ad 100644 --- a/rust/sdk-processor/src/processors/user_transaction_processor.rs +++ b/rust/sdk-processor/src/processors/user_transaction_processor.rs @@ -104,7 +104,7 @@ impl ProcessorTrait for UserTransactionProcessor { let user_txn_extractor = UserTransactionExtractor::new(deprecated_tables); let user_txn_storer = UserTransactionStorer::new(self.db_pool.clone(), processor_config); let version_tracker = VersionTrackerStep::new( - get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + get_processor_status_saver(self.db_pool.clone(), self.config.clone(), false), DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, ); diff --git a/rust/sdk-processor/src/steps/common/mod.rs b/rust/sdk-processor/src/steps/common/mod.rs index 3662c5d22..91d0d9f88 100644 --- a/rust/sdk-processor/src/steps/common/mod.rs +++ b/rust/sdk-processor/src/steps/common/mod.rs @@ -1,6 +1,5 @@ pub mod gcs_uploader; pub mod parquet_buffer_step; -pub mod parquet_processor_status_saver; pub mod parquet_version_tracker_step; pub mod processor_status_saver; diff --git a/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs b/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs deleted file mode 100644 index aebb999ff..000000000 --- a/rust/sdk-processor/src/steps/common/parquet_processor_status_saver.rs +++ /dev/null @@ -1,152 +0,0 @@ -use crate::{ - config::indexer_processor_config::IndexerProcessorConfig, - db::common::models::{ - backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, - processor_status::ProcessorStatus, - }, - steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, - utils::database::{execute_with_better_error, ArcDbPool}, -}; -use anyhow::Result; -use aptos_indexer_processor_sdk::{ - types::transaction_context::TransactionContext, - utils::{errors::ProcessorError, time::parse_timestamp}, -}; -use async_trait::async_trait; -use diesel::{upsert::excluded, ExpressionMethods}; -use processor::schema::{backfill_processor_status, processor_status}; - -pub fn get_parquet_processor_status_saver( - conn_pool: ArcDbPool, - config: IndexerProcessorConfig, -) -> ParquetProcessorStatusSaverEnum { - if let Some(backfill_config) = config.backfill_config { - println!("Backfill config: {:?}", backfill_config); - let txn_stream_cfg = config.transaction_stream_config; - let backfill_start_version = txn_stream_cfg.starting_version; - let backfill_end_version = txn_stream_cfg.request_ending_version; - let backfill_alias = backfill_config.backfill_alias.clone(); - ParquetProcessorStatusSaverEnum::Backfill { - conn_pool, - backfill_alias, - backfill_start_version, - backfill_end_version, - } - } else { - let processor_name = config.processor_config.name().to_string(); - ParquetProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, - } - } -} - -pub enum ParquetProcessorStatusSaverEnum { - Default { - conn_pool: ArcDbPool, - processor_name: String, - }, - Backfill { - conn_pool: ArcDbPool, - backfill_alias: String, - backfill_start_version: Option, - backfill_end_version: Option, - }, -} - -#[async_trait] -impl ParquetProcessorStatusSaver for ParquetProcessorStatusSaverEnum { - async fn save_parquet_processor_status( - &self, - last_success_batch: &TransactionContext<()>, - table_name: &str, - ) -> Result<(), ProcessorError> { - let end_timestamp = last_success_batch - .metadata - .end_transaction_timestamp - .as_ref() - .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) - .map(|t| t.naive_utc()); - match self { - ParquetProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, - } => { - let status = ProcessorStatus { - processor: processor_name.clone() + "_" + table_name, - last_success_version: last_success_batch.metadata.end_version as i64, - last_transaction_timestamp: end_timestamp, - }; - - // Save regular processor status to the database - execute_with_better_error( - conn_pool.clone(), - diesel::insert_into(processor_status::table) - .values(&status) - .on_conflict(processor_status::processor) - .do_update() - .set(( - processor_status::last_success_version - .eq(excluded(processor_status::last_success_version)), - processor_status::last_updated.eq(excluded(processor_status::last_updated)), - processor_status::last_transaction_timestamp - .eq(excluded(processor_status::last_transaction_timestamp)), - )), - Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), - ) - .await?; - - Ok(()) - }, - ParquetProcessorStatusSaverEnum::Backfill { - conn_pool, - backfill_alias, - backfill_start_version, - backfill_end_version, - } => { - let lst_success_version = last_success_batch.metadata.end_version as i64; - let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| { - lst_success_version >= backfill_end_version as i64 - }) { - BackfillStatus::Complete - } else { - BackfillStatus::InProgress - }; - let backfill_end_version_mapped = backfill_end_version.map(|v| v as i64); - let status = BackfillProcessorStatus { - backfill_alias: backfill_alias.clone(), - backfill_status, - last_success_version: lst_success_version, - last_transaction_timestamp: end_timestamp, - backfill_start_version: backfill_start_version.unwrap_or(0) as i64, - backfill_end_version: backfill_end_version_mapped, - }; - execute_with_better_error( - conn_pool.clone(), - diesel::insert_into(backfill_processor_status::table) - .values(&status) - .on_conflict(backfill_processor_status::backfill_alias) - .do_update() - .set(( - backfill_processor_status::backfill_status - .eq(excluded(backfill_processor_status::backfill_status)), - backfill_processor_status::last_success_version - .eq(excluded(backfill_processor_status::last_success_version)), - backfill_processor_status::last_updated - .eq(excluded(backfill_processor_status::last_updated)), - backfill_processor_status::last_transaction_timestamp.eq(excluded( - backfill_processor_status::last_transaction_timestamp, - )), - backfill_processor_status::backfill_start_version - .eq(excluded(backfill_processor_status::backfill_start_version)), - backfill_processor_status::backfill_end_version - .eq(excluded(backfill_processor_status::backfill_end_version)), - )), - Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), - ) - .await?; - Ok(()) - }, - } - } -} diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 50bb39ae3..2b283e6c8 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -4,6 +4,7 @@ use crate::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, }, + steps::common::parquet_version_tracker_step::ParquetProcessorStatusSaver, utils::database::{execute_with_better_error, ArcDbPool}, }; use anyhow::Result; @@ -19,6 +20,7 @@ use processor::schema::{backfill_processor_status, processor_status}; pub fn get_processor_status_saver( conn_pool: ArcDbPool, config: IndexerProcessorConfig, + is_parquet: bool, ) -> ProcessorStatusSaverEnum { if let Some(backfill_config) = config.backfill_config { let txn_stream_cfg = config.transaction_stream_config; @@ -33,9 +35,16 @@ pub fn get_processor_status_saver( } } else { let processor_name = config.processor_config.name().to_string(); - ProcessorStatusSaverEnum::Default { - conn_pool, - processor_name, + if is_parquet { + ProcessorStatusSaverEnum::Parquet { + conn_pool, + processor_name, + } + } else { + ProcessorStatusSaverEnum::Default { + conn_pool, + processor_name, + } } } } @@ -51,6 +60,10 @@ pub enum ProcessorStatusSaverEnum { backfill_start_version: Option, backfill_end_version: Option, }, + Parquet { + conn_pool: ArcDbPool, + processor_name: String, + }, } #[async_trait] @@ -58,6 +71,32 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { async fn save_processor_status( &self, last_success_batch: &TransactionContext<()>, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names(last_success_batch, None) + .await + } +} + +#[async_trait] +impl ParquetProcessorStatusSaver for ProcessorStatusSaverEnum { + async fn save_parquet_processor_status( + &self, + last_success_batch: &TransactionContext<()>, + table_name: &str, + ) -> Result<(), ProcessorError> { + self.save_processor_status_with_optional_table_names( + last_success_batch, + Some(table_name.to_string()), + ) + .await + } +} + +impl ProcessorStatusSaverEnum { + async fn save_processor_status_with_optional_table_names( + &self, + last_success_batch: &TransactionContext<()>, + table_name: Option, ) -> Result<(), ProcessorError> { let end_timestamp = last_success_batch .metadata @@ -70,8 +109,14 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { conn_pool, processor_name, } => { + let processor_name = if table_name.is_some() { + format!("{}_{}", processor_name, table_name.unwrap()) + } else { + processor_name.clone() + }; + let status = ProcessorStatus { - processor: processor_name.clone(), + processor: processor_name, last_success_version: last_success_batch.metadata.end_version as i64, last_transaction_timestamp: end_timestamp, }; @@ -92,7 +137,7 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, @@ -140,11 +185,12 @@ impl ProcessorStatusSaver for ProcessorStatusSaverEnum { backfill_processor_status::backfill_end_version .eq(excluded(backfill_processor_status::backfill_end_version)), )), - Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), + Some(" WHERE backfill_processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?; Ok(()) }, + _ => Ok(()), } } }