From c6c73055663873e28f372f762b398f0df21bf4c5 Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:40:46 -0700 Subject: [PATCH] use option to replace expect with context and avoid creating two gap detectors (#428) * use option to replace expect with context and avoid creating two gap detectors * add trait param for gap detector loop * lint * - add a new version of standarize_address that takes in slice to avoid encoding beforehand to call this function. - reduced the duplicated lines of code - Use context instead of unwraping with unsafe assumptions - make gap detector trait fn to return Result type without requiring a unnecessary error type * lint --- .../processor/src/bq_analytics/gcs_handler.rs | 23 ++- .../bq_analytics/generic_parquet_processor.rs | 30 ++-- .../src/bq_analytics/parquet_handler.rs | 13 +- .../default_models/parquet_move_resources.rs | 95 ++++++------ .../default_models/parquet_move_tables.rs | 9 +- .../default_models/parquet_transactions.rs | 8 +- .../parquet_write_set_changes.rs | 142 ++++++++++++------ .../default_models/write_set_changes.rs | 37 ++--- .../v2_fungible_asset_balances.rs | 1 + .../src/gap_detectors/gap_detector.rs | 9 +- rust/processor/src/gap_detectors/mod.rs | 67 +++++---- .../src/gap_detectors/parquet_gap_detector.rs | 16 +- .../processors/parquet_default_processor.rs | 17 +-- rust/processor/src/utils/util.rs | 20 +++ rust/processor/src/worker.rs | 86 +++++------ 15 files changed, 323 insertions(+), 250 deletions(-) diff --git a/rust/processor/src/bq_analytics/gcs_handler.rs b/rust/processor/src/bq_analytics/gcs_handler.rs index c038152a6..65f578f95 100644 --- a/rust/processor/src/bq_analytics/gcs_handler.rs +++ b/rust/processor/src/bq_analytics/gcs_handler.rs @@ -6,22 +6,22 @@ use google_cloud_storage::{ http::objects::upload::{Media, UploadObjectRequest, UploadType}, }; use hyper::Body; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use tokio::io::AsyncReadExt; // for read_to_end() use tokio::{ fs::File as TokioFile, time::{sleep, timeout, Duration}, }; use tracing::{debug, error, info}; -const BUCKET_REGULAR_TRAFFIC: &str = "devnet-airflow-continue"; const MAX_RETRIES: usize = 3; const INITIAL_DELAY_MS: u64 = 500; const TIMEOUT_SECONDS: u64 = 300; pub async fn upload_parquet_to_gcs( client: &GCSClient, - file_path: &PathBuf, + file_path: &Path, table_name: &str, bucket_name: &str, + bucket_root: &Path, ) -> Result<(), ParquetProcessorError> { let mut file = TokioFile::open(&file_path) .await @@ -54,13 +54,8 @@ pub async fn upload_parquet_to_gcs( let highwater_s = start_of_month.timestamp_millis(); let highwater_ms = now.timestamp_millis(); let counter = 0; // THIS NEED TO BE REPLACED OR REIMPLEMENTED WITH AN ACTUAL LOGIC TO ENSURE FILE UNIQUENESS. - let object_name: PathBuf = generate_parquet_file_path( - BUCKET_REGULAR_TRAFFIC, - table_name, - highwater_s, - highwater_ms, - counter, - ); + let object_name: PathBuf = + generate_parquet_file_path(bucket_root, table_name, highwater_s, highwater_ms, counter); let file_name = object_name.to_str().unwrap().to_owned(); let upload_type: UploadType = UploadType::Simple(Media::new(file_name.clone())); @@ -108,14 +103,14 @@ pub async fn upload_parquet_to_gcs( } fn generate_parquet_file_path( - gcs_bucket_root: &str, + gcs_bucket_root: &Path, table: &str, highwater_s: i64, highwater_ms: i64, counter: u32, ) -> PathBuf { - PathBuf::from(format!( - "{}/{}/{}/{}_{}.parquet", - gcs_bucket_root, table, highwater_s, highwater_ms, counter + gcs_bucket_root.join(format!( + "{}/{}/{}_{}.parquet", + table, highwater_s, highwater_ms, counter )) } diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs index f10c21259..f8fdbea97 100644 --- a/rust/processor/src/bq_analytics/generic_parquet_processor.rs +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -2,7 +2,10 @@ use super::ParquetProcessingResult; use crate::{ bq_analytics::gcs_handler::upload_parquet_to_gcs, gap_detectors::ProcessingResult, - utils::counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE}, + utils::{ + counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE}, + util::naive_datetime_to_timestamp, + }, }; use ahash::AHashMap; use allocative::Allocative; @@ -24,9 +27,6 @@ use uuid::Uuid; #[derive(Debug, Default, Clone)] pub struct ParquetDataGeneric { pub data: Vec, - pub first_txn_version: u64, - pub last_txn_version: u64, - pub last_transaction_timestamp: Option, pub transaction_version_to_struct_count: AHashMap, } @@ -42,6 +42,10 @@ pub trait HasParquetSchema { fn schema() -> Arc; } +pub trait GetTimeStamp { + fn get_timestamp(&self) -> chrono::NaiveDateTime; +} + /// Auto-implement this for all types that implement `Default` and `RecordWriter` impl HasParquetSchema for ParquetType where @@ -56,7 +60,7 @@ where pub struct ParquetHandler where - ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, + ParquetType: NamedTable + NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, for<'a> &'a [ParquetType]: RecordWriter, { pub schema: Arc, @@ -66,6 +70,7 @@ where pub transaction_version_to_struct_count: AHashMap, pub bucket_name: String, + pub bucket_root: String, pub gap_detector_sender: kanal::AsyncSender, pub file_path: String, } @@ -93,7 +98,7 @@ fn create_new_writer( impl ParquetHandler where - ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, + ParquetType: Allocative + GetTimeStamp + HasVersion + HasParquetSchema + 'static + NamedTable, for<'a> &'a [ParquetType]: RecordWriter, { fn create_new_writer(&self) -> Result> { @@ -110,8 +115,9 @@ where pub fn new( bucket_name: String, + bucket_root: String, gap_detector_sender: kanal::AsyncSender, - schema: Arc, + schema: Arc, ) -> Result { // had to append unique id to avoid concurrent write issues let file_path = format!("{}_{}.parquet", ParquetType::TABLE_NAME, Uuid::new_v4()); @@ -123,6 +129,7 @@ where buffer_size_bytes: 0, transaction_version_to_struct_count: AHashMap::new(), bucket_name, + bucket_root, gap_detector_sender, schema, file_path, @@ -135,7 +142,6 @@ where changes: ParquetDataGeneric, max_buffer_size: usize, ) -> Result<()> { - let last_transaction_timestamp = changes.last_transaction_timestamp; let parquet_structs = changes.data; self.transaction_version_to_struct_count .extend(changes.transaction_version_to_struct_count); @@ -152,7 +158,9 @@ where // for now, it's okay to go little above the buffer_size, given that we will keep max size as 200 MB if self.buffer_size_bytes >= max_buffer_size { let start_version = self.buffer.first().unwrap().version(); - let end_version = self.buffer.last().unwrap().version(); + let last = self.buffer.last().unwrap(); + let end_version = last.version(); + let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp()); let txn_version_to_struct_count = process_struct_count_map( &self.buffer, @@ -182,11 +190,13 @@ where end_version = end_version, "Max buffer size reached, uploading to GCS." ); + let bucket_root = PathBuf::from(&self.bucket_root); let upload_result = upload_parquet_to_gcs( gcs_client, &new_file_path, ParquetType::TABLE_NAME, &self.bucket_name, + &bucket_root, ) .await; self.buffer_size_bytes = 0; @@ -197,7 +207,7 @@ where let parquet_processing_result = ParquetProcessingResult { start_version, end_version, - last_transaction_timestamp: last_transaction_timestamp.clone(), + last_transaction_timestamp: Some(last_transaction_timestamp), txn_version_to_struct_count, }; diff --git a/rust/processor/src/bq_analytics/parquet_handler.rs b/rust/processor/src/bq_analytics/parquet_handler.rs index 785bdb8bd..301317e62 100644 --- a/rust/processor/src/bq_analytics/parquet_handler.rs +++ b/rust/processor/src/bq_analytics/parquet_handler.rs @@ -1,6 +1,6 @@ use crate::{ bq_analytics::generic_parquet_processor::{ - HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric, + GetTimeStamp, HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric, ParquetHandler as GenericParquetHandler, }, gap_detectors::ProcessingResult, @@ -17,11 +17,19 @@ pub fn create_parquet_handler_loop( new_gap_detector_sender: AsyncSender, processor_name: &str, bucket_name: String, + bucket_root: String, parquet_handler_response_channel_size: usize, max_buffer_size: usize, ) -> AsyncSender> where - ParquetType: NamedTable + HasVersion + HasParquetSchema + Send + Sync + 'static + Allocative, + ParquetType: NamedTable + + GetTimeStamp + + HasVersion + + HasParquetSchema + + Send + + Sync + + 'static + + Allocative, for<'a> &'a [ParquetType]: RecordWriter, { let processor_name = processor_name.to_owned(); @@ -38,6 +46,7 @@ where let mut parquet_manager = GenericParquetHandler::new( bucket_name.clone(), + bucket_root.clone(), new_gap_detector_sender.clone(), ParquetType::schema(), ) diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs index 134127add..adffb194c 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs @@ -4,7 +4,7 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::{ - bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, utils::util::standardize_address, }; use allocative_derive::Allocative; @@ -45,7 +45,14 @@ impl HasVersion for MoveResource { } } +impl GetTimeStamp for MoveResource { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + pub struct MoveStructTag { + #[allow(dead_code)] resource_address: String, pub module: String, pub fun: String, @@ -59,14 +66,15 @@ impl MoveResource { txn_version: i64, block_height: i64, block_timestamp: chrono::NaiveDateTime, - ) -> Self { - let parsed_data = Self::convert_move_struct_tag( - write_resource - .r#type - .as_ref() - .expect("MoveStructTag Not Exists."), - ); - Self { + ) -> Result> { + let move_struct_tag = match write_resource.r#type.as_ref() { + Some(t) => t, + None => return Ok(None), + }; + + let parsed_data = convert_move_struct_tag(move_struct_tag); + + let move_resource = Self { txn_version, block_height, write_set_change_index, @@ -81,7 +89,8 @@ impl MoveResource { hex::encode(write_resource.state_key_hash.as_slice()).as_str(), ), block_timestamp, - } + }; + Ok(Some(move_resource)) } pub fn from_delete_resource( @@ -90,14 +99,13 @@ impl MoveResource { txn_version: i64, block_height: i64, block_timestamp: chrono::NaiveDateTime, - ) -> Self { - let parsed_data = Self::convert_move_struct_tag( - delete_resource - .r#type - .as_ref() - .expect("MoveStructTag Not Exists."), - ); - Self { + ) -> Result> { + let move_struct_tag = match delete_resource.r#type.as_ref() { + Some(t) => t, + None => return Ok(None), + }; + let parsed_data = convert_move_struct_tag(move_struct_tag); + let move_resource = Self { txn_version, block_height, write_set_change_index, @@ -112,42 +120,25 @@ impl MoveResource { hex::encode(delete_resource.state_key_hash.as_slice()).as_str(), ), block_timestamp, - } - } - - pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag { - MoveStructTag { - resource_address: standardize_address(struct_tag.address.as_str()), - module: struct_tag.module.to_string(), - fun: struct_tag.name.to_string(), - generic_type_params: struct_tag - .generic_type_params - .iter() - .map(|move_type| -> Result> { - Ok(Some( - serde_json::to_string(move_type).context("Failed to parse move type")?, - )) - }) - .collect::>>() - .unwrap_or(None), - } - } - - pub fn get_outer_type_from_resource(write_resource: &WriteResource) -> String { - let move_struct_tag = - Self::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap()); - - format!( - "{}::{}::{}", - move_struct_tag.get_address(), - move_struct_tag.module, - move_struct_tag.fun, - ) + }; + Ok(Some(move_resource)) } } -impl MoveStructTag { - pub fn get_address(&self) -> String { - standardize_address(self.resource_address.as_str()) +pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag { + MoveStructTag { + resource_address: standardize_address(struct_tag.address.as_str()), + module: struct_tag.module.to_string(), + fun: struct_tag.name.to_string(), + generic_type_params: struct_tag + .generic_type_params + .iter() + .map(|move_type| -> Result> { + Ok(Some( + serde_json::to_string(move_type).context("Failed to parse move type")?, + )) + }) + .collect::>>() + .unwrap_or(None), } } diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs index 014f00fef..2243277d1 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs @@ -4,7 +4,7 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::{ - bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, utils::util::{hash_str, standardize_address}, }; use allocative_derive::Allocative; @@ -38,6 +38,13 @@ impl HasVersion for TableItem { self.txn_version } } + +impl GetTimeStamp for TableItem { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + #[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] pub struct CurrentTableItem { pub table_handle: String, diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs index 7a18d621a..3a9af5a4a 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -10,7 +10,7 @@ use super::{ parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }; use crate::{ - bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::{get_clean_payload, get_clean_writeset, get_payload_type, standardize_address}, @@ -60,6 +60,12 @@ impl HasVersion for Transaction { } } +impl GetTimeStamp for Transaction { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + impl Transaction { fn from_transaction_info( info: &TransactionInfo, diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs index 8507a20cb..bab8628f6 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -9,10 +9,11 @@ use super::{ parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, }; use crate::{ - bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, - utils::util::standardize_address, + bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable}, + utils::util::{standardize_address, standardize_address_from_bytes}, }; use allocative_derive::Allocative; +use anyhow::Context; use aptos_protos::transaction::v1::{ write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum}, WriteSetChange as WriteSetChangePB, @@ -58,6 +59,12 @@ impl Default for WriteSetChange { } } +impl GetTimeStamp for WriteSetChange { + fn get_timestamp(&self) -> chrono::NaiveDateTime { + self.block_timestamp + } +} + impl WriteSetChange { pub fn from_write_set_change( write_set_change: &WriteSetChangePB, @@ -65,14 +72,14 @@ impl WriteSetChange { txn_version: i64, block_height: i64, block_timestamp: chrono::NaiveDateTime, - ) -> (Self, WriteSetChangeDetail) { + ) -> anyhow::Result> { let change_type = Self::get_write_set_change_type(write_set_change); let change = write_set_change .change .as_ref() .expect("WriteSetChange must have a change"); match change { - WriteSetChangeEnum::WriteModule(inner) => ( + WriteSetChangeEnum::WriteModule(inner) => Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -80,7 +87,7 @@ impl WriteSetChange { ), block_height, change_type, - resource_address: standardize_address(&inner.address.to_string()), + resource_address: standardize_address(&inner.address), write_set_change_index, block_timestamp, }, @@ -90,8 +97,8 @@ impl WriteSetChange { txn_version, block_height, )), - ), - WriteSetChangeEnum::DeleteModule(inner) => ( + ))), + WriteSetChangeEnum::DeleteModule(inner) => Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -99,7 +106,7 @@ impl WriteSetChange { ), block_height, change_type, - resource_address: standardize_address(&inner.address.to_string()), + resource_address: standardize_address(&inner.address), write_set_change_index, block_timestamp, }, @@ -109,47 +116,71 @@ impl WriteSetChange { txn_version, block_height, )), - ), - WriteSetChangeEnum::WriteResource(inner) => ( - Self { - txn_version, - state_key_hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), - block_height, - change_type, - resource_address: standardize_address(&inner.address.to_string()), - write_set_change_index, - block_timestamp, - }, - WriteSetChangeDetail::Resource(MoveResource::from_write_resource( + ))), + WriteSetChangeEnum::WriteResource(inner) => { + let resource_option = MoveResource::from_write_resource( inner, write_set_change_index, txn_version, block_height, block_timestamp, - )), - ), - WriteSetChangeEnum::DeleteResource(inner) => ( - Self { - txn_version, - state_key_hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), - block_height, - change_type, - resource_address: standardize_address(&inner.address.to_string()), - write_set_change_index, - block_timestamp, - }, - WriteSetChangeDetail::Resource(MoveResource::from_delete_resource( + ); + + resource_option + .unwrap() + .context(format!( + "Failed to parse move resource, version {}", + txn_version + )) + .map(|resource| { + Some(( + Self { + txn_version, + state_key_hash: standardize_address_from_bytes( + inner.state_key_hash.as_slice(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Resource(resource), + )) + }) + }, + WriteSetChangeEnum::DeleteResource(inner) => { + let resource_option = MoveResource::from_delete_resource( inner, write_set_change_index, txn_version, block_height, block_timestamp, - )), - ), + ); + + resource_option + .unwrap() + .context(format!( + "Failed to parse move resource, version {}", + txn_version + )) + .map(|resource| { + Some(( + Self { + txn_version, + state_key_hash: standardize_address_from_bytes( + inner.state_key_hash.as_slice(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Resource(resource), + )) + }) + }, WriteSetChangeEnum::WriteTableItem(inner) => { let (ti, cti) = TableItem::from_write_table_item( inner, @@ -158,7 +189,7 @@ impl WriteSetChange { block_height, block_timestamp, ); - ( + Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -175,7 +206,7 @@ impl WriteSetChange { cti, Some(TableMetadata::from_write_table_item(inner)), ), - ) + ))) }, WriteSetChangeEnum::DeleteTableItem(inner) => { let (ti, cti) = TableItem::from_delete_table_item( @@ -185,7 +216,7 @@ impl WriteSetChange { block_height, block_timestamp, ); - ( + Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -198,7 +229,7 @@ impl WriteSetChange { block_timestamp, }, WriteSetChangeDetail::Table(ti, cti, None), - ) + ))) }, } } @@ -209,21 +240,32 @@ impl WriteSetChange { block_height: i64, timestamp: chrono::NaiveDateTime, ) -> (Vec, Vec) { - write_set_changes + let results: Vec<(Self, WriteSetChangeDetail)> = write_set_changes .iter() .enumerate() - .map(|(write_set_change_index, write_set_change)| { - Self::from_write_set_change( + .filter_map(|(write_set_change_index, write_set_change)| { + match Self::from_write_set_change( write_set_change, write_set_change_index as i64, txn_version, block_height, timestamp, - ) + ) { + Ok(Some((change, detail))) => Some((change, detail)), + Ok(None) => None, + Err(e) => { + tracing::error!( + "Failed to convert write set change: {:?} with error: {:?}", + write_set_change, + e + ); + panic!("Failed to convert write set change.") + }, + } }) - .collect::>() - .into_iter() - .unzip() + .collect::>(); + + results.into_iter().unzip() } fn get_write_set_change_type(t: &WriteSetChangePB) -> String { diff --git a/rust/processor/src/db/common/models/default_models/write_set_changes.rs b/rust/processor/src/db/common/models/default_models/write_set_changes.rs index c28a97c51..11a2e36db 100644 --- a/rust/processor/src/db/common/models/default_models/write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/write_set_changes.rs @@ -9,7 +9,10 @@ use super::{ move_tables::{CurrentTableItem, TableItem, TableMetadata}, transactions::Transaction, }; -use crate::{schema::write_set_changes, utils::util::standardize_address}; +use crate::{ + schema::write_set_changes, + utils::util::{standardize_address, standardize_address_from_bytes}, +}; use aptos_protos::transaction::v1::{ write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum}, WriteSetChange as WriteSetChangePB, @@ -49,12 +52,10 @@ impl WriteSetChange { WriteSetChangeEnum::WriteModule(inner) => ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, - address: standardize_address(&inner.address.to_string()), + address: standardize_address(&inner.address), index, }, WriteSetChangeDetail::Module(MoveModule::from_write_module( @@ -67,12 +68,10 @@ impl WriteSetChange { WriteSetChangeEnum::DeleteModule(inner) => ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, - address: standardize_address(&inner.address.to_string()), + address: standardize_address(&inner.address), index, }, WriteSetChangeDetail::Module(MoveModule::from_delete_module( @@ -85,12 +84,10 @@ impl WriteSetChange { WriteSetChangeEnum::WriteResource(inner) => ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, - address: standardize_address(&inner.address.to_string()), + address: standardize_address(&inner.address), index, }, WriteSetChangeDetail::Resource(MoveResource::from_write_resource( @@ -103,12 +100,10 @@ impl WriteSetChange { WriteSetChangeEnum::DeleteResource(inner) => ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, - address: standardize_address(&inner.address.to_string()), + address: standardize_address(&inner.address), index, }, WriteSetChangeDetail::Resource(MoveResource::from_delete_resource( @@ -128,9 +123,7 @@ impl WriteSetChange { ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, address: String::default(), @@ -153,9 +146,7 @@ impl WriteSetChange { ( Self { transaction_version, - hash: standardize_address( - hex::encode(inner.state_key_hash.as_slice()).as_str(), - ), + hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), transaction_block_height, type_, address: String::default(), diff --git a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs index 33819b470..42e317f18 100644 --- a/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -165,6 +165,7 @@ impl FungibleAssetBalance { let asset_type = inner.metadata.get_reference_address(); let is_primary = Self::is_primary(&owner_address, &asset_type, &storage_id); + #[allow(clippy::useless_asref)] let concurrent_balance = object_data .concurrent_fungible_asset_balance .as_ref() diff --git a/rust/processor/src/gap_detectors/gap_detector.rs b/rust/processor/src/gap_detectors/gap_detector.rs index c5707f798..b4b6d44e5 100644 --- a/rust/processor/src/gap_detectors/gap_detector.rs +++ b/rust/processor/src/gap_detectors/gap_detector.rs @@ -2,14 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - gap_detectors::{GapDetectorResult, ProcessingResult}, + gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult}, processors::DefaultProcessingResult, }; use ahash::AHashMap; - -pub trait GapDetectorTrait { - fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result; -} +use anyhow::Result; pub struct DefaultGapDetector { next_version_to_process: u64, @@ -24,7 +21,7 @@ pub struct DefaultGapDetectorResult { } impl GapDetectorTrait for DefaultGapDetector { - fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result { + fn process_versions(&mut self, result: ProcessingResult) -> Result { match result { ProcessingResult::DefaultProcessingResult(result) => { // Check for gaps diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs index d0b35c621..07ef999f1 100644 --- a/rust/processor/src/gap_detectors/mod.rs +++ b/rust/processor/src/gap_detectors/mod.rs @@ -1,17 +1,17 @@ use crate::{ bq_analytics::ParquetProcessingResult, gap_detectors::{ - gap_detector::{DefaultGapDetector, DefaultGapDetectorResult, GapDetectorTrait}, + gap_detector::{DefaultGapDetector, DefaultGapDetectorResult}, parquet_gap_detector::{ParquetFileGapDetector, ParquetFileGapDetectorResult}, }, processors::{DefaultProcessingResult, Processor, ProcessorTrait}, utils::counters::{PARQUET_PROCESSOR_DATA_GAP_COUNT, PROCESSOR_DATA_GAP_COUNT}, worker::PROCESSOR_SERVICE_TYPE, }; +use anyhow::Result; use enum_dispatch::enum_dispatch; use kanal::AsyncReceiver; use tracing::{error, info}; - pub mod gap_detector; pub mod parquet_gap_detector; @@ -26,20 +26,37 @@ pub enum GapDetector { ParquetFileGapDetector, } -#[enum_dispatch(GapDetectorTrait)] pub enum GapDetectorResult { - DefaultGapDetectorResult, - ParquetFileGapDetectorResult, + DefaultGapDetectorResult(DefaultGapDetectorResult), + ParquetFileGapDetectorResult(ParquetFileGapDetectorResult), } + +/// Trait for gap detectors +/// +/// This trait defines the interface for gap detectors used in the processors. +/// Gap detectors are responsible for identifying gaps in data based on processed transactions. +/// Implementations of this trait must provide the functionality to process versions +/// and return a result indicating the presence of any gaps. +/// +/// Implementations should: +/// - Be thread-safe (hence the `Send` bound). +/// - Define how to process versions and detect gaps. +/// - Return a `GapDetectorResult` indicating the outcome of the processing. +/// +#[enum_dispatch] +pub trait GapDetectorTrait: Send { + fn process_versions(&mut self, result: ProcessingResult) -> Result; +} + pub enum ProcessingResult { DefaultProcessingResult(DefaultProcessingResult), ParquetProcessingResult(ParquetProcessingResult), } pub async fn create_gap_detector_status_tracker_loop( + mut gap_detector: GapDetector, gap_detector_receiver: AsyncReceiver, processor: Processor, - starting_version: u64, gap_detection_batch_size: u64, ) { let processor_name = processor.name(); @@ -49,13 +66,11 @@ pub async fn create_gap_detector_status_tracker_loop( "[Parser] Starting gap detector task", ); - let mut default_gap_detector = DefaultGapDetector::new(starting_version); - let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version); let mut last_update_time = std::time::Instant::now(); loop { match gap_detector_receiver.recv().await { Ok(ProcessingResult::DefaultProcessingResult(result)) => { - match default_gap_detector + match gap_detector .process_versions(ProcessingResult::DefaultProcessingResult(result)) { Ok(res) => { @@ -112,7 +127,7 @@ pub async fn create_gap_detector_status_tracker_loop( service_type = PROCESSOR_SERVICE_TYPE, "[ParquetGapDetector] received parquet gap detector task", ); - match parquet_gap_detector + match gap_detector .process_versions(ProcessingResult::ParquetProcessingResult(result)) { Ok(res) => { @@ -132,24 +147,20 @@ pub async fn create_gap_detector_status_tracker_loop( // We don't panic as everything downstream will panic if it doesn't work/receive } - if let Some(res_last_success_batch) = res.last_success_batch { - if last_update_time.elapsed().as_secs() - >= UPDATE_PROCESSOR_STATUS_SECS - { - tracing::info!("Updating last processed version"); - processor - .update_last_processed_version( - res_last_success_batch.end_version as u64, - res_last_success_batch - .last_transaction_timestamp - .clone(), - ) - .await - .unwrap(); - last_update_time = std::time::Instant::now(); - } else { - tracing::info!("Not Updating last processed version"); - } + if last_update_time.elapsed().as_secs() + >= UPDATE_PROCESSOR_STATUS_SECS + { + tracing::info!("Updating last processed version"); + processor + .update_last_processed_version( + res.start_version, + res.last_transaction_timestamp, + ) + .await + .unwrap(); + last_update_time = std::time::Instant::now(); + } else { + tracing::info!("Not Updating last processed version"); } }, _ => { diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs index c0b5ae6cc..19d5a1d30 100644 --- a/rust/processor/src/gap_detectors/parquet_gap_detector.rs +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -1,17 +1,14 @@ // // Copyright © Aptos Foundation // // SPDX-License-Identifier: Apache-2.0 -use crate::{ - bq_analytics::ParquetProcessingResult, - gap_detectors::{gap_detector::GapDetectorTrait, GapDetectorResult, ProcessingResult}, -}; +use crate::gap_detectors::{GapDetectorResult, GapDetectorTrait, ProcessingResult}; use ahash::AHashMap; +use anyhow::Result; use std::cmp::max; use tracing::{debug, info}; pub struct ParquetFileGapDetector { next_version_to_process: i64, - last_success_batch: Option, version_counters: AHashMap, max_version: i64, } @@ -19,21 +16,21 @@ pub struct ParquetFileGapDetector { pub struct ParquetFileGapDetectorResult { pub next_version_to_process: u64, pub num_gaps: u64, - pub last_success_batch: Option, + pub start_version: u64, + pub last_transaction_timestamp: Option, } impl ParquetFileGapDetector { pub fn new(starting_version: u64) -> Self { Self { next_version_to_process: starting_version as i64, - last_success_batch: None, version_counters: AHashMap::new(), max_version: 0, } } } impl GapDetectorTrait for ParquetFileGapDetector { - fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result { + fn process_versions(&mut self, result: ProcessingResult) -> Result { // Update counts of structures for each transaction version let result = match result { ProcessingResult::ParquetProcessingResult(r) => r, @@ -82,7 +79,8 @@ impl GapDetectorTrait for ParquetFileGapDetector { ParquetFileGapDetectorResult { next_version_to_process: self.next_version_to_process as u64, num_gaps: (self.max_version - self.next_version_to_process) as u64, - last_success_batch: self.last_success_batch.clone(), + start_version: result.start_version as u64, + last_transaction_timestamp: result.last_transaction_timestamp, }, )) } diff --git a/rust/processor/src/processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_default_processor.rs index 0b8a30890..a56257a54 100644 --- a/rust/processor/src/processors/parquet_default_processor.rs +++ b/rust/processor/src/processors/parquet_default_processor.rs @@ -31,6 +31,7 @@ const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; pub struct DefaultParquetProcessorConfig { pub google_application_credentials: Option, pub bucket_name: String, + pub bucket_root: String, pub parquet_handler_response_channel_size: usize, pub max_buffer_size: usize, } @@ -59,6 +60,7 @@ impl DefaultParquetProcessor { new_gap_detector_sender.clone(), ProcessorName::DefaultParquetProcessor.into(), config.bucket_name.clone(), + config.bucket_root.clone(), config.parquet_handler_response_channel_size, config.max_buffer_size, ); @@ -67,6 +69,7 @@ impl DefaultParquetProcessor { new_gap_detector_sender.clone(), ProcessorName::DefaultParquetProcessor.into(), config.bucket_name.clone(), + config.bucket_root.clone(), config.parquet_handler_response_channel_size, config.max_buffer_size, ); @@ -75,6 +78,7 @@ impl DefaultParquetProcessor { new_gap_detector_sender.clone(), ProcessorName::DefaultParquetProcessor.into(), config.bucket_name.clone(), + config.bucket_root.clone(), config.parquet_handler_response_channel_size, config.max_buffer_size, ); @@ -83,6 +87,7 @@ impl DefaultParquetProcessor { new_gap_detector_sender.clone(), ProcessorName::DefaultParquetProcessor.into(), config.bucket_name.clone(), + config.bucket_root.clone(), config.parquet_handler_response_channel_size, config.max_buffer_size, ); @@ -132,10 +137,7 @@ impl ProcessorTrait for DefaultParquetProcessor { let mr_parquet_data = ParquetDataGeneric { data: mr, - last_transaction_timestamp: last_transaction_timestamp.clone(), transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - first_txn_version: start_version, - last_txn_version: end_version, }; self.move_resource_sender @@ -145,10 +147,7 @@ impl ProcessorTrait for DefaultParquetProcessor { let wsc_parquet_data = ParquetDataGeneric { data: wsc, - last_transaction_timestamp: last_transaction_timestamp.clone(), transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - first_txn_version: start_version, - last_txn_version: end_version, }; self.wsc_sender .send(wsc_parquet_data) @@ -157,10 +156,7 @@ impl ProcessorTrait for DefaultParquetProcessor { let t_parquet_data = ParquetDataGeneric { data: t, - last_transaction_timestamp: last_transaction_timestamp.clone(), transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - first_txn_version: start_version, - last_txn_version: end_version, }; self.transaction_sender .send(t_parquet_data) @@ -169,10 +165,7 @@ impl ProcessorTrait for DefaultParquetProcessor { let ti_parquet_data = ParquetDataGeneric { data: ti, - last_transaction_timestamp: last_transaction_timestamp.clone(), transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), - first_txn_version: start_version, - last_txn_version: end_version, }; self.ti_sender diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index e00688586..4550de820 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -15,6 +15,7 @@ use aptos_protos::{ util::timestamp::Timestamp, }; use bigdecimal::{BigDecimal, Signed, ToPrimitive, Zero}; +use chrono::NaiveDateTime; use lazy_static::lazy_static; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; @@ -74,6 +75,18 @@ pub fn standardize_address(handle: &str) -> String { } } +/// Standardizes all addresses and table handles to be length 66 (0x-64 length hash) that takes in a slice. +pub fn standardize_address_from_bytes(bytes: &[u8]) -> String { + let encdoed_bytes = hex::encode(bytes); + // let encdoed_bytes = binding.as_str(); + + if let Some(handle) = &encdoed_bytes.strip_prefix("0x") { + format!("0x{:0>64}", handle) + } else { + format!("0x{:0>64}", encdoed_bytes) + } +} + pub fn hash_str(val: &str) -> String { hex::encode(sha2::Sha256::digest(val.as_bytes())) } @@ -267,6 +280,13 @@ fn get_clean_script_payload(payload: &ScriptPayload, version: i64) -> ScriptPayl } } +pub fn naive_datetime_to_timestamp(ndt: NaiveDateTime) -> Timestamp { + Timestamp { + seconds: ndt.and_utc().timestamp(), + nanos: ndt.and_utc().timestamp_subsec_nanos() as i32, + } +} + pub fn parse_timestamp(ts: &Timestamp, version: i64) -> chrono::NaiveDateTime { let final_ts = if ts.seconds >= MAX_TIMESTAMP_SECS { Timestamp { diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index be519a660..443d72c9a 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -4,7 +4,10 @@ use crate::{ config::IndexerGrpcHttp2Config, db::common::models::{ledger_info::LedgerInfo, processor_status::ProcessorStatusQuery}, - gap_detectors::{create_gap_detector_status_tracker_loop, ProcessingResult}, + gap_detectors::{ + create_gap_detector_status_tracker_loop, gap_detector::DefaultGapDetector, + parquet_gap_detector::ParquetFileGapDetector, GapDetector, ProcessingResult, + }, grpc_stream::TransactionsPBResponse, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, @@ -298,44 +301,35 @@ impl Worker { // Create a gap detector task that will panic if there is a gap in the processing let (gap_detector_sender, gap_detector_receiver) = kanal::bounded_async::(BUFFER_SIZE); - let (processor, gap_detection_batch_size, gap_detector_sender) = - if self.processor_config.is_parquet_processor() { - let processor = build_processor( - &self.processor_config, - self.per_table_chunk_sizes.clone(), - self.deprecated_tables, - self.db_pool.clone(), - Some(gap_detector_sender.clone()), - ); - let gap_detection_batch_size: u64 = self.parquet_gap_detection_batch_size; - ( - processor, - gap_detection_batch_size, - Some(gap_detector_sender), - ) - } else { - let processor = build_processor( - &self.processor_config, - self.per_table_chunk_sizes.clone(), - self.deprecated_tables, - self.db_pool.clone(), - None, - ); - let gap_detection_batch_size = self.gap_detection_batch_size; + let is_parquet_processor = self.processor_config.is_parquet_processor(); + let (maybe_gap_detector_sender, gap_detection_batch_size) = if is_parquet_processor { + let gap_detection_batch_size: u64 = self.parquet_gap_detection_batch_size; + (Some(gap_detector_sender.clone()), gap_detection_batch_size) + } else { + let gap_detection_batch_size = self.gap_detection_batch_size; + (None, gap_detection_batch_size) + }; - ( - processor, - gap_detection_batch_size, - Some(gap_detector_sender), - ) - }; + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.deprecated_tables, + self.db_pool.clone(), + maybe_gap_detector_sender, + ); + + let gap_detector = if is_parquet_processor { + GapDetector::ParquetFileGapDetector(ParquetFileGapDetector::new(starting_version)) + } else { + GapDetector::DefaultGapDetector(DefaultGapDetector::new(starting_version)) + }; tokio::spawn(async move { create_gap_detector_status_tracker_loop( + gap_detector, gap_detector_receiver, processor, - starting_version, gap_detection_batch_size, ) .await; @@ -382,7 +376,7 @@ impl Worker { &self, task_index: usize, receiver: kanal::AsyncReceiver, - gap_detector_sender: Option>, + gap_detector_sender: AsyncSender, ) -> JoinHandle<()> { let processor_name = self.processor_config.name(); let stream_address = self.indexer_grpc_data_service_address.to_string(); @@ -390,13 +384,23 @@ impl Worker { let auth_token = self.auth_token.clone(); // Build the processor based on the config. - let processor = build_processor( - &self.processor_config, - self.per_table_chunk_sizes.clone(), - self.deprecated_tables, - self.db_pool.clone(), - gap_detector_sender.clone(), - ); + let processor = if self.processor_config.is_parquet_processor() { + build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.deprecated_tables, + self.db_pool.clone(), + Some(gap_detector_sender.clone()), + ) + } else { + build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.deprecated_tables, + self.db_pool.clone(), + None, + ) + }; let concurrent_tasks = self.number_concurrent_processing_tasks; @@ -612,8 +616,6 @@ impl Worker { .set(processing_result.db_insertion_duration_in_secs); gap_detector_sender - .as_ref() - .unwrap() .send(ProcessingResult::DefaultProcessingResult( processing_result, ))