From e98b828ed39fafb133c384e0f9e5a56458e0b40d Mon Sep 17 00:00:00 2001 From: yuunlimm Date: Mon, 4 Nov 2024 09:57:58 -0800 Subject: [PATCH] fanout builder failing with send trait --- rust/Cargo.lock | 4 + .../bq_analytics/generic_parquet_processor.rs | 6 +- .../default_models/parquet_move_resources.rs | 2 +- rust/sdk-processor/Cargo.toml | 6 + .../src/config/processor_config.rs | 8 +- .../parquet_default_processor.rs | 238 +++++++++++++- .../default_processor/default_extractor.rs | 1 - rust/sdk-processor/src/steps/mod.rs | 1 + .../steps/parquet_default_processor/mod.rs | 2 + .../parquet_default_extractor.rs | 124 +++++++ .../timed_size_buffer.rs | 307 ++++++++++++++++++ 11 files changed, 683 insertions(+), 16 deletions(-) create mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/mod.rs create mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs create mode 100644 rust/sdk-processor/src/steps/parquet_default_processor/timed_size_buffer.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 17a7dfbed..426c50dc2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4181,6 +4181,7 @@ name = "sdk-processor" version = "0.1.0" dependencies = [ "ahash", + "allocative", "anyhow", "aptos-indexer-processor-sdk", "aptos-indexer-processor-sdk-server-framework", @@ -4196,12 +4197,15 @@ dependencies = [ "field_count", "futures", "futures-util", + "google-cloud-storage", "hex", "jemallocator", "kanal", "lazy_static", "native-tls", "num_cpus", + "parquet", + "parquet_derive", "postgres-native-tls", "processor", "rayon", diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index b213fdeba..0fe6127bd 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -54,10 +54,10 @@ where } pub struct ParquetHandler -where + where ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, -{ + { pub schema: Arc, pub writer: SerializedFileWriter>, pub buffer: Vec, @@ -72,7 +72,7 @@ where pub last_upload_time: Instant, pub processor_name: String, } -fn create_new_writer(schema: Arc) -> Result>> { +pub fn create_new_writer(schema: Arc) -> Result>> { let props = WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) .build(); diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs index adffb194c..b7c1a1a4a 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; #[derive( Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, -)] + )] pub struct MoveResource { pub txn_version: i64, pub write_set_change_index: i64, diff --git a/rust/sdk-processor/Cargo.toml b/rust/sdk-processor/Cargo.toml index 6859a36df..0dcbdd5ac 100644 --- a/rust/sdk-processor/Cargo.toml +++ b/rust/sdk-processor/Cargo.toml @@ -42,6 +42,12 @@ native-tls = { workspace = true } postgres-native-tls = { workspace = true } tokio-postgres = { workspace = true } +google-cloud-storage = { workspace = true } +# Parquet support +parquet = { workspace = true } +parquet_derive = { workspace = true } +allocative = { workspace = true } + [features] libpq = ["diesel/postgres"] # When using the default features we enable the diesel/postgres feature. We configure diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index db245d1a4..7fe536976 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -119,10 +119,8 @@ pub struct ParquetDefaultProcessorConfig { pub bucket_name: String, #[serde(default)] pub bucket_root: String, - #[serde( - default = "ParquetDefaultProcessorConfig::default_parquet_handler_response_channel_size" - )] - pub parquet_handler_response_channel_size: usize, + #[serde(default = "ParquetDefaultProcessorConfig::default_channel_size")] + pub channel_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_max_buffer_size")] pub max_buffer_size: usize, #[serde(default = "ParquetDefaultProcessorConfig::default_parquet_upload_interval")] @@ -135,7 +133,7 @@ pub struct ParquetDefaultProcessorConfig { impl ParquetDefaultProcessorConfig { /// Make the default very large on purpose so that by default it's not chunked /// This prevents any unexpected changes in behavior - pub const fn default_parquet_handler_response_channel_size() -> usize { + pub const fn default_channel_size() -> usize { 100_000 } diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index 4799794c6..665c06407 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; +use std::time::Duration; use crate::{ config::{ db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, @@ -13,6 +15,38 @@ use anyhow::Context; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::TransactionStream, traits::processor_trait::ProcessorTrait, }; +use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; +use aptos_indexer_processor_sdk::builder::ProcessorBuilder; +use aptos_indexer_processor_sdk::common_steps::{DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, TransactionStreamStep, VersionTrackerStep}; +use aptos_indexer_processor_sdk::traits::IntoRunnableStep; +use crate::steps::common::get_processor_status_saver; +use google_cloud_storage::{ + client::{Client as GCSClient, ClientConfig as GcsClientConfig}, + http::Error as StorageError, +}; +use tracing::{debug, info}; +use processor::db::common::models::default_models::parquet_move_resources::MoveResource; +use crate::steps::parquet_default_processor::timed_size_buffer::TableConfig; +use crate::steps::parquet_default_processor::timed_size_buffer::TimedSizeBufferStep; +use crate::steps::parquet_default_processor::parquet_default_extractor::ParquetDefaultExtractor; +use processor::db::common::models::default_models::parquet_write_set_changes::WriteSetChangeModel; +use processor::db::common::models::default_models::parquet_move_tables::TableItem; +use processor::db::common::models::default_models::parquet_transactions::Transaction as ParquetTransaction; +use processor::db::common::models::default_models::parquet_move_modules::MoveModule; +use aptos_indexer_processor_sdk::test::steps::pass_through_step::PassThroughStep; +use aptos_indexer_processor_sdk::traits::RunnableAsyncStep; +use aptos_indexer_processor_sdk::types::transaction_context::TransactionContext; +use aptos_indexer_processor_sdk::utils::errors::ProcessorError; +use aptos_indexer_processor_sdk::traits::async_step::AsyncStep; +use aptos_indexer_processor_sdk::traits::processable::Processable; +use aptos_indexer_processor_sdk::traits::NamedStep; +use async_trait::async_trait; +use aptos_indexer_processor_sdk::aptos_protos::transaction::v1::Transaction; +use aptos_indexer_processor_sdk::traits::RunnableStepWithInputReceiver; +use aptos_indexer_processor_sdk::instrumented_channel::instrumented_bounded_channel; + + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub struct ParquetDefaultProcessor { pub config: IndexerProcessorConfig, @@ -44,6 +78,14 @@ impl ParquetDefaultProcessor { } } +type Input = ( + Vec, + Vec, + Vec, + Vec, + Vec, +); + #[async_trait::async_trait] impl ProcessorTrait for ParquetDefaultProcessor { fn name(&self) -> &'static str { @@ -66,8 +108,8 @@ impl ProcessorTrait for ParquetDefaultProcessor { let is_backfill = self.config.backfill_config.is_some(); // Query the starting version - let _starting_version = if is_backfill { - get_starting_version(&self.config, self.db_pool.clone()).await?; + let starting_version = if is_backfill { + get_starting_version(&self.config, self.db_pool.clone()).await? } else { // Regular mode logic: Fetch the minimum last successful version across all relevant tables let table_names = self @@ -79,7 +121,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { self.config.processor_config.name() ))?; get_min_last_success_version_parquet(&self.config, self.db_pool.clone(), table_names) - .await?; + .await? }; // Check and update the ledger chain id to ensure we're indexing the correct chain @@ -89,7 +131,7 @@ impl ProcessorTrait for ParquetDefaultProcessor { .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let _parquet_processor_config = match self.config.processor_config.clone() { + let parquet_processor_config = match self.config.processor_config.clone() { ProcessorConfig::ParquetDefaultProcessor(parquet_processor_config) => { parquet_processor_config }, @@ -101,7 +143,191 @@ impl ProcessorTrait for ParquetDefaultProcessor { }, }; - // Define processor steps - Ok(()) + println!("===============Starting version: {}===============", starting_version); + + // Define processor transaction stream config + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config.clone() + }) + .await?; + // + // // TODO: look at the config to dynamically set the opt_in_tables, tables + let parquet_default_extractor = ParquetDefaultExtractor { + opt_in_tables: None + // : parquet_processor_config.tables.iter().map(|s| s.to_string()).collect(), + }; + + let credentials = parquet_processor_config.google_application_credentials.clone(); + + if let Some(credentials) = credentials { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + + let move_resource_step = TimedSizeBufferStep::::new( + Duration::from_secs(parquet_processor_config.parquet_upload_interval), + TableConfig { + table_name: "move_resources".to_string(), + bucket_name: parquet_processor_config.bucket_name.clone(), + bucket_root: parquet_processor_config.bucket_root.clone(), + max_size: parquet_processor_config.max_buffer_size, + }, + gcs_client.clone(), + self.name(), + ); + + // TODO: add other steps later + // let move_module_step = TimedSizeBufferStep::::new( + // Duration::from_secs(parquet_processor_config.parquet_upload_interval), + // TableConfig { + // table_name: "move_modules".to_string(), + // bucket_name: parquet_processor_config.bucket_name.clone(), + // bucket_root: parquet_processor_config.bucket_root.clone(), + // max_size: parquet_processor_config.max_buffer_size, + // }, + // gcs_client.clone(), + // ); + + let channel_size = parquet_processor_config.channel_size; + // let version_tracker = VersionTrackerStep::new( + // get_processor_status_saver(self.db_pool.clone(), self.config.clone()), + // DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, + // ); + + /// TODO: Figure out how to make this work + /// + let (input_sender, input_receiver) = instrumented_bounded_channel("input", 1); + + let input_step = RunnableStepWithInputReceiver::new( + input_receiver, + RunnableAsyncStep::new(PassThroughStep::default()), + ); + let mut fanout_builder = ProcessorBuilder::new_with_inputless_first_step( + input_step + // transaction_stream.into_runnable_step(), + ).fanout_broadcast(1); + + // let builder = builder.connect_to(parquet_default_extractor.into_runnable_step(), channel_size); + // let mut fanout_builder = builder. + // fanout_broadcast(1); + + let (first_builder, first_output_receiver) = fanout_builder + .get_processor_builder() + .unwrap() + .connect_to( + RunnableAsyncStep::new(PassThroughStep::new_named("FanoutStep1".to_string())), + channel_size + ) + .end_and_return_output_receiver(channel_size); + + let test_step = TestStep; + let test_step = RunnableAsyncStep::new(test_step); + + let (_, buffer_receiver) = ProcessorBuilder::new_with_fanin_step_with_receivers( + vec![ + (first_output_receiver, first_builder.graph), + ], + RunnableAsyncStep::new(PassThroughStep::new_named("FaninStep".to_string())), + channel_size, + ) + .connect_to(test_step, channel_size) + // .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + // transaction_stream.into_runnable_step(), + // ) + // .connect_to(parquet_default_extractor.into_runnable_step(), channel_size) + // .connect_to(move_resource_step.into_runnable_step(), channel_size) + // .connect_to(version_tracker.into_runnable_step(), channel_size) + // .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} + +pub struct TestStep; + +impl AsyncStep for TestStep {} + +impl NamedStep for TestStep { + fn name(&self) -> String { + "TestStep".to_string() + } +} + +#[async_trait] +impl Processable for TestStep { + type Input = Vec; + type Output = (); + type RunType = (); + + async fn process( + &mut self, + item: TransactionContext>, + ) -> Result>, ProcessorError> { + println!("processtedprocesstedprocesstedprocesstedprocessted"); + Ok(None) + // let processed = item.data.into_iter().map(|i| TestStruct { i }).collect(); + // Ok(Some(TransactionContext { + // data: processed, + // metadata: item.metadata, + // })) + } +} + +pub trait ExtractResources { + fn extract(&self) -> Vec; +} + +impl ExtractResources for Input{ + fn extract(&self) -> Vec { + self.0.clone() + } +} + +impl ExtractResources for Input { + fn extract(&self) -> Vec { + self.1.clone() + } +} + +impl ExtractResources for Input { + fn extract(&self) -> Vec { + self.2.clone() + } +} + +impl ExtractResources for Input { + fn extract(&self) -> Vec { + self.3.clone() + } +} + +impl ExtractResources for Input { + fn extract(&self) -> Vec { + self.4.clone() } } diff --git a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs index b4cd700db..e7428c839 100644 --- a/rust/sdk-processor/src/steps/default_processor/default_extractor.rs +++ b/rust/sdk-processor/src/steps/default_processor/default_extractor.rs @@ -13,7 +13,6 @@ use processor::{ processors::default_processor::process_transactions, worker::TableFlags, }; -pub const MIN_TRANSACTIONS_PER_RAYON_JOB: usize = 64; pub struct DefaultExtractor where diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index a1632e818..0f07f11dc 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -4,5 +4,6 @@ pub mod common; pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; +pub mod parquet_default_processor; pub mod stake_processor; pub mod token_v2_processor; diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs new file mode 100644 index 000000000..2b938d987 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/mod.rs @@ -0,0 +1,2 @@ +pub mod parquet_default_extractor; +pub mod timed_size_buffer; \ No newline at end of file diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs new file mode 100644 index 000000000..fd5ffeff7 --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/parquet_default_extractor.rs @@ -0,0 +1,124 @@ +use ahash::AHashMap; +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::db::common::models::default_models::{ + parquet_move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::TableItem, + parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, + parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, +}; +use processor::bq_analytics::generic_parquet_processor::HasParquetSchema; + +pub struct ParquetDefaultExtractor +where + Self: Processable + Send + Sized + 'static, +{ + pub opt_in_tables: Option>, +} + +#[async_trait] +impl Processable for ParquetDefaultExtractor { + type Input = Vec; + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext, + ) -> anyhow::Result< + Option< + TransactionContext<( + Vec, + Vec, + Vec, + Vec, + Vec, + )>, + >, + ProcessorError, + > { + println!("Processing transactions-----------------------------------------"); + let backfill_mode = self.opt_in_tables.is_some(); + + let (move_resources, write_set_changes, parquet_transactions, table_items, move_modules) = + process_transactions(transactions.data); + tracing::info!("Processed {} transactions", parquet_transactions.len()); + println!("Processed {} transactions", parquet_transactions.len()); + // Output the processed transactions + Ok(Some(TransactionContext { + data: ( + parquet_transactions, + move_resources, + write_set_changes, + table_items, + move_modules, + ), + metadata: transactions.metadata, + + })) + } +} + +pub fn process_transactions( + transactions: Vec, +) -> ( + Vec, + Vec, + Vec, + Vec, + Vec, +) { + // this will be removed in the future. + let mut transaction_version_to_struct_count = AHashMap::new(); + let (txns, _block_metadata_txns, write_set_changes, wsc_details) = + TransactionModel::from_transactions( + &transactions, + &mut transaction_version_to_struct_count, + ); + + let mut move_modules = vec![]; + let mut move_resources = vec![]; + let mut table_items = vec![]; + + for detail in wsc_details { + match detail { + WriteSetChangeDetail::Module(module) => { + move_modules.push(module); + }, + WriteSetChangeDetail::Resource(resource) => { + move_resources.push(resource); + }, + WriteSetChangeDetail::Table(item, _current_item, _) => { + table_items.push(item); + }, + } + } + + ( + move_resources, + write_set_changes, + txns, + table_items, + move_modules, + ) +} + +impl AsyncStep for ParquetDefaultExtractor {} + +impl NamedStep for ParquetDefaultExtractor { + fn name(&self) -> String { + "ParquetDefaultExtractor".to_string() + } +} \ No newline at end of file diff --git a/rust/sdk-processor/src/steps/parquet_default_processor/timed_size_buffer.rs b/rust/sdk-processor/src/steps/parquet_default_processor/timed_size_buffer.rs new file mode 100644 index 000000000..0980c878c --- /dev/null +++ b/rust/sdk-processor/src/steps/parquet_default_processor/timed_size_buffer.rs @@ -0,0 +1,307 @@ +use anyhow::{Context, Result}; +use aptos_indexer_processor_sdk::{ + traits::{ + pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable, + }, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use google_cloud_storage::{ + client::Client as GCSClient, + http::objects::upload::{Media, UploadObjectRequest, UploadType}, +}; +use std::time::{Duration, Instant}; +use std::sync::Arc; +use processor::bq_analytics::generic_parquet_processor::{GetTimeStamp, HasParquetSchema, HasVersion, NamedTable}; +use allocative::Allocative; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use std::marker::PhantomData; +use std::path::PathBuf; +use tracing::{debug, error, info}; +use processor::bq_analytics::gcs_handler::upload_parquet_to_gcs; +use processor::bq_analytics::ParquetProcessingResult; +use processor::gap_detectors::ProcessingResult; +use processor::utils::counters::PARQUET_STRUCT_SIZE; +use processor::utils::util::naive_datetime_to_timestamp; +use processor::bq_analytics::generic_parquet_processor::create_new_writer; +use crate::parquet_processors::parquet_default_processor::ExtractResources; + + +pub struct TableConfig { + pub table_name: String, + pub bucket_name: String, + pub bucket_root: String, + pub max_size: usize, +} + +pub struct TimedSizeBufferStep +where + Input: Send + 'static + Sized, + ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative + Send + Sync, + for<'a> &'a [ParquetType]: RecordWriter, +{ + pub internal_buffer: Vec, + pub internal_buffer_size_bytes: usize, + pub writer: SerializedFileWriter>, + pub schema: Arc, + + pub poll_interval: Duration, + pub table_config: TableConfig, + pub gcs_client: Arc, + _marker: PhantomData, // Use PhantomData to indicate that Input is relevant + pub processor_name: String, +} + +impl TimedSizeBufferStep +where + Input: Send + 'static + Sized, + ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative + Send + Sync + GetTimeStamp, + for<'a> &'a [ParquetType]: RecordWriter, +{ + pub fn new(poll_interval: Duration, table_config: TableConfig, gcs_client: Arc, processor_name: &str) -> Self { + let schema = ParquetType::schema(); + let writer = create_new_writer(schema.clone()).unwrap(); + Self { + internal_buffer: Vec::new(), + writer, + internal_buffer_size_bytes: 0, + schema, + poll_interval, + table_config, + gcs_client, + _marker: PhantomData, + processor_name: processor_name.to_string(), + } + } + + fn create_new_writer(&self) -> Result>> { + create_new_writer(self.schema.clone()) + } + + fn close_writer(&mut self) -> Result>> { + let new_writer = self.create_new_writer()?; + let old_writer = std::mem::replace(&mut self.writer, new_writer); + Ok(old_writer) + } + + async fn upload_to_gcs(&self) -> Result<(), ProcessorError> { + // let parquet_data = self.convert_to_parquet(&self.internal_buffer)?; + + let object_name = format!( + "{}/data_{}.parquet", + self.table_config.table_name, + chrono::Utc::now().timestamp() + ); + + // self.gcs_client.upload_object(&self.table_config.bucket_name, &object_name, parquet_data).await?; + + Ok(()) + } + + // Conversion logic specific to each table based on `table_config` + fn convert_to_parquet( + &self, + data: &[ParquetType], + ) + -> Result, ProcessorError> + where + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable + Send + Sync, + for<'a> &'a [ParquetType]: RecordWriter,{ + Ok(vec![]) + } + + async fn upload_buffer(&mut self) -> Result<()> { + // This is to cover the case when interval duration has passed but buffer is empty + if self.internal_buffer.is_empty() { + debug!("Buffer is empty, skipping upload."); + return Ok(()); + } + let start_version = self + .internal_buffer + .first() + .context("Buffer is not empty but has no first element")? + .version(); + let last = self + .internal_buffer + .last() + .context("Buffer is not empty but has no last element")?; + let end_version = last.version(); + let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); + + let struct_buffer = std::mem::take(&mut self.internal_buffer); + + let mut row_group_writer = self + .writer + .next_row_group() + .context("Failed to get row group")?; + + struct_buffer + .as_slice() + .write_to_row_group(&mut row_group_writer) + .context("Failed to write to row group")?; + row_group_writer + .close() + .context("Failed to close row group")?; + + let old_writer = self.close_writer().context("Failed to close writer")?; + let upload_buffer = old_writer + .into_inner() + .context("Failed to get inner buffer")?; + + let bucket_root = PathBuf::from(&self.table_config.bucket_root); + + upload_parquet_to_gcs( + &self.gcs_client, + upload_buffer, + ParquetType::TABLE_NAME, + &self.table_config.bucket_name, + &bucket_root, + self.processor_name.clone(), + ) + .await?; + + self.internal_buffer_size_bytes = 0; + + + // update metadata with this + // let parquet_processing_result = ParquetProcessingResult { + // start_version, + // end_version, + // last_transaction_timestamp: Some(last_transaction_timestamp), + // txn_version_to_struct_count: None, + // parquet_processed_structs: Some(parquet_processed_transactions), + // table_name: ParquetType::TABLE_NAME.to_string(), + // }; + info!( + table_name = ParquetType::TABLE_NAME, + start_version = start_version, + end_version = end_version, + "Uploaded parquet to GCS and sending result to gap detector." + ); + + Ok(()) + } +} + +#[async_trait] +impl Processable for TimedSizeBufferStep +where + Input: Send + Sync + 'static + Sized + ExtractResources, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable + Send + Sync, + for<'a> &'a [ParquetType]: RecordWriter, +{ + type Input = Input; + type Output = (); + type RunType = PollableAsyncRunType; + + async fn process( + &mut self, + item: TransactionContext, + ) -> Result>, ProcessorError> { + // grab the correct parquetType from the Input. match on the Input type = parquetType somehow? + let parquet_types = item.data.extract(); + // + // println!("Processing transactions-----------------------------------------"); + // for resource in resources { + // println!("Processing resource {:?}", resource); + // // if resource.schema() == self.schema { + // // self.internal_buffer.push(resource.clone()); + // // } + // } + + tracing::info!("Processed {} parquet_types", parquet_types.len()); + for parquet_type in parquet_types { + let size_of_struct = allocative::size_of_unique(&parquet_type); + // TODO: uncomment later when telemetry is ready. + // PARQUET_STRUCT_SIZE + // .with_label_values(&[&processor_name, ParquetType::TABLE_NAME]) + // .set(size_of_struct as i64); + self.internal_buffer_size_bytes += size_of_struct; + self.internal_buffer.push(parquet_type); + + if self.internal_buffer_size_bytes >= self.table_config.max_size { + println!("exceeded max buffer size"); + debug!( + table_name = ParquetType::TABLE_NAME, + buffer_size = self.internal_buffer_size_bytes, + max_buffer_size = self.table_config.max_size, + "Max buffer size reached, uploading to GCS." + ); + if let Err(e) = self.upload_buffer().await { + error!("Failed to upload buffer: {}", e); + return Err(ProcessorError::ProcessError { + message: format!("Failed to upload buffer: {}", e), + }); + } + // self.last_upload_time = Instant::now(); + } + // println!("Processed {} parquet_types", parquet_types.len()); + } + println!("current buffer size: {}", self.internal_buffer_size_bytes); + // TODO: add metrics later when telemetry is ready. + Ok(None) + } + + async fn cleanup( + &mut self, + ) -> Result>>, ProcessorError> { + // if !self.internal_buffer.is_empty() { + // self.upload_to_gcs().await?; + // } + // Ok(Some(std::mem::take(&mut self.internal_buffer))) + // info!( + // "Time has elapsed more than {} since last upload for {}", + // self.upload_interval.as_secs(), + // ParquetType::TABLE_NAME + // ); + // if let Err(e) = self.upload_buffer(gcs_client).await { + // error!("Failed to upload buffer: {}", e); + // return Err(e); + // } + // self.last_upload_time = Instant::now(); + Ok(None) + } +} + +#[async_trait] +impl PollableAsyncStep for TimedSizeBufferStep +where + Input: Send + Sync + 'static + Sized + ExtractResources, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable + Send + Sync, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn poll_interval(&self) -> Duration { + self.poll_interval + } + + async fn poll(&mut self) -> Result>>, ProcessorError> { + // info!( + // "Time has elapsed more than {} since last upload for {}", + // self.upload_interval.as_secs(), + // ParquetType::TABLE_NAME + // ); + // if let Err(e) = self.upload_buffer(gcs_client).await { + // error!("Failed to upload buffer: {}", e); + // return Err(e); + // } + // self.last_upload_time = Instant::now(); + println!("d================================Polling after 1 second="); + Ok(None) + } +} + +impl NamedStep for TimedSizeBufferStep + where + Input: Send + Sync + 'static + Sized + ExtractResources, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable + Send + Sync, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn name(&self) -> String { + format!("TimedSizeBuffer: {}", self.table_config.table_name) + } +}