From e35189535ecce84eb654bc1fb3ea02ab72b6f52d Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Fri, 28 Jun 2024 13:31:44 -0700 Subject: [PATCH] add skip db write flags for tables in parquet migration tranche 2 and 3 (#429) * add skip db write flags for tables in parquet migration tranche 2 and 3 * remove flags for transaction_metadata_processor * include coin_infos to deprecated table flags * remove flags from coin and token processor since we are deprecating those two --- rust/processor/README.md | 4 +- .../processor/src/processors/ans_processor.rs | 42 +++++++++++++-- .../src/processors/default_processor.rs | 6 +++ .../processors/fungible_asset_processor.rs | 34 ++++++++++-- .../src/processors/objects_processor.rs | 8 +++ .../src/processors/token_v2_processor.rs | 31 +++++++++-- .../processors/user_transaction_processor.rs | 13 ++++- rust/processor/src/worker.rs | 54 +++++++++++++++---- 8 files changed, 167 insertions(+), 25 deletions(-) diff --git a/rust/processor/README.md b/rust/processor/README.md index ad6338be6..f39073a5f 100644 --- a/rust/processor/README.md +++ b/rust/processor/README.md @@ -54,8 +54,8 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc - `starting_version`: start processor at starting_version. - `ending_version`: stop processor after ending_version. - `number_concurrent_processing_tasks`: number of tasks to parse and insert; 1 means sequential processing, otherwise, -- `deprecated_tables`: a list of tables to skip writing to alloyDB. - transactions are splitted into tasks and inserted with random order. +- `deprecated_tables`: a list of tables to skip writing to alloyDB. you can find a full list of deprecated tables [here](https://aptoslabs.notion.site/Deprecated-Tables-33518cfcff0543378289b2bf06001576?pvs=4) +transactions are splitted into tasks and inserted with random order. ### Use docker image for existing parsers(Only for **Unix/Linux**) diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index a984c8de7..addf6972e 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -17,6 +17,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, util::standardize_address, }, + worker::TableFlags, }; use ahash::AHashMap; use anyhow::bail; @@ -45,6 +46,7 @@ pub struct AnsProcessor { connection_pool: ArcDbPool, config: AnsProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl AnsProcessor { @@ -52,6 +54,7 @@ impl AnsProcessor { connection_pool: ArcDbPool, config: AnsProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, ) -> Self { tracing::info!( ans_v1_primary_names_table_handle = config.ans_v1_primary_names_table_handle, @@ -63,6 +66,7 @@ impl AnsProcessor { connection_pool, config, per_table_chunk_sizes, + deprecated_tables, } } } @@ -372,14 +376,14 @@ impl ProcessorTrait for AnsProcessor { let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let ( - all_current_ans_lookups, - all_ans_lookups, - all_current_ans_primary_names, - all_ans_primary_names, + mut all_current_ans_lookups, + mut all_ans_lookups, + mut all_current_ans_primary_names, + mut all_ans_primary_names, all_current_ans_lookups_v2, all_ans_lookups_v2, all_current_ans_primary_names_v2, - all_ans_primary_names_v2, + mut all_ans_primary_names_v2, ) = parse_ans( &transactions, self.config.ans_v1_primary_names_table_handle.clone(), @@ -390,6 +394,34 @@ impl ProcessorTrait for AnsProcessor { let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); + if self + .deprecated_tables + .contains(TableFlags::ANS_PRIMARY_NAME) + { + all_ans_primary_names.clear(); + } + if self + .deprecated_tables + .contains(TableFlags::ANS_PRIMARY_NAME_V2) + { + all_ans_primary_names_v2.clear(); + } + if self.deprecated_tables.contains(TableFlags::ANS_LOOKUP) { + all_ans_lookups.clear(); + } + if self + .deprecated_tables + .contains(TableFlags::CURRENT_ANS_LOOKUP) + { + all_current_ans_lookups.clear(); + } + if self + .deprecated_tables + .contains(TableFlags::CURRENT_ANS_PRIMARY_NAME) + { + all_current_ans_primary_names.clear(); + } + // Insert values to db let tx_result = insert_to_db( self.get_pool(), diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index cf2bd3ce4..d3d6525f6 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -452,6 +452,12 @@ fn process_transactions( if flags.contains(TableFlags::TABLE_ITEMS) { table_items.clear(); } + if flags.contains(TableFlags::TABLE_METADATAS) { + table_metadata.clear(); + } + if flags.contains(TableFlags::MOVE_MODULES) { + move_modules.clear(); + } ( txns, diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 12c36600a..955bd0052 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -28,6 +28,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, util::{get_entry_function_from_user_request, standardize_address}, }, + worker::TableFlags, }; use ahash::AHashMap; use anyhow::bail; @@ -45,13 +46,19 @@ use tracing::error; pub struct FungibleAssetProcessor { connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl FungibleAssetProcessor { - pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: ArcDbPool, + per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, + ) -> Self { Self { connection_pool, per_table_chunk_sizes, + deprecated_tables, } } } @@ -355,10 +362,10 @@ impl ProcessorTrait for FungibleAssetProcessor { let ( fungible_asset_activities, fungible_asset_metadata, - fungible_asset_balances, - current_fungible_asset_balances, + mut fungible_asset_balances, + mut current_fungible_asset_balances, current_unified_fungible_asset_balances, - coin_supply, + mut coin_supply, ) = parse_v2_coin(&transactions).await; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); @@ -367,6 +374,25 @@ impl ProcessorTrait for FungibleAssetProcessor { let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances .into_iter() .partition(|x| x.is_primary.is_none()); + + if self + .deprecated_tables + .contains(TableFlags::FUNGIBLE_ASSET_BALANCES) + { + fungible_asset_balances.clear(); + } + + if self + .deprecated_tables + .contains(TableFlags::CURRENT_FUNGIBLE_ASSET_BALANCES) + { + current_fungible_asset_balances.clear(); + } + + if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) { + coin_supply.clear(); + } + let tx_result = insert_to_db( self.get_pool(), self.name(), diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 9228692ab..4e8e6bea5 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -13,6 +13,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, util::standardize_address, }, + worker::TableFlags, IndexerGrpcProcessorConfig, }; use ahash::AHashMap; @@ -40,6 +41,7 @@ pub struct ObjectsProcessor { connection_pool: ArcDbPool, config: ObjectsProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl ObjectsProcessor { @@ -47,11 +49,13 @@ impl ObjectsProcessor { connection_pool: ArcDbPool, config: ObjectsProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, ) -> Self { Self { connection_pool, config, per_table_chunk_sizes, + deprecated_tables, } } } @@ -265,6 +269,10 @@ impl ProcessorTrait for ObjectsProcessor { .collect::>(); all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + if self.deprecated_tables.contains(TableFlags::OBJECTS) { + all_objects.clear(); + } + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 979e07693..7008c7059 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -36,6 +36,7 @@ use crate::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool, DbPoolConnection}, util::{get_entry_function_from_user_request, parse_timestamp, standardize_address}, }, + worker::TableFlags, IndexerGrpcProcessorConfig, }; use ahash::{AHashMap, AHashSet}; @@ -64,6 +65,7 @@ pub struct TokenV2Processor { connection_pool: ArcDbPool, config: TokenV2ProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl TokenV2Processor { @@ -71,11 +73,13 @@ impl TokenV2Processor { connection_pool: ArcDbPool, config: TokenV2ProcessorConfig, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, ) -> Self { Self { connection_pool, config, per_table_chunk_sizes, + deprecated_tables, } } } @@ -594,16 +598,16 @@ impl ProcessorTrait for TokenV2Processor { let query_retry_delay_ms = self.config.query_retry_delay_ms; // Token V2 processing which includes token v1 let ( - collections_v2, - token_datas_v2, - token_ownerships_v2, + mut collections_v2, + mut token_datas_v2, + mut token_ownerships_v2, current_collections_v2, current_token_datas_v2, current_deleted_token_datas_v2, current_token_ownerships_v2, current_deleted_token_ownerships_v2, token_activities_v2, - current_token_v2_metadata, + mut current_token_v2_metadata, current_token_royalties_v1, current_token_claims, ) = parse_v2_token( @@ -618,6 +622,25 @@ impl ProcessorTrait for TokenV2Processor { let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); + if self + .deprecated_tables + .contains(TableFlags::TOKEN_OWNERSHIPS_V2) + { + token_ownerships_v2.clear(); + } + if self.deprecated_tables.contains(TableFlags::TOKEN_DATAS_V2) { + token_datas_v2.clear(); + } + if self.deprecated_tables.contains(TableFlags::COLLECTIONS_V2) { + collections_v2.clear(); + } + if self + .deprecated_tables + .contains(TableFlags::CURRENT_TOKEN_V2_METADATA) + { + current_token_v2_metadata.clear(); + } + let tx_result = insert_to_db( self.get_pool(), self.name(), diff --git a/rust/processor/src/processors/user_transaction_processor.rs b/rust/processor/src/processors/user_transaction_processor.rs index 08416488e..52437e852 100644 --- a/rust/processor/src/processors/user_transaction_processor.rs +++ b/rust/processor/src/processors/user_transaction_processor.rs @@ -12,6 +12,7 @@ use crate::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, }, + worker::TableFlags, }; use ahash::AHashMap; use anyhow::bail; @@ -28,13 +29,19 @@ use tracing::error; pub struct UserTransactionProcessor { connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, } impl UserTransactionProcessor { - pub fn new(connection_pool: ArcDbPool, per_table_chunk_sizes: AHashMap) -> Self { + pub fn new( + connection_pool: ArcDbPool, + per_table_chunk_sizes: AHashMap, + deprecated_tables: TableFlags, + ) -> Self { Self { connection_pool, per_table_chunk_sizes, + deprecated_tables, } } } @@ -177,6 +184,10 @@ impl ProcessorTrait for UserTransactionProcessor { } } + if self.deprecated_tables.contains(TableFlags::SIGNATURES) { + signatures.clear(); + } + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index bf5de00f7..be519a660 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -50,17 +50,48 @@ use url::Url; // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -// TODO: Make this configurable pub const BUFFER_SIZE: usize = 300; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; bitflags! { #[derive(Debug, Clone, Copy)] pub struct TableFlags: u64 { - const TRANSACTIONS = 1; - const WRITE_SET_CHANGES = 2; - const MOVE_RESOURCES = 4; - const TABLE_ITEMS = 8; + const TRANSACTIONS = 1 << 0; + const WRITE_SET_CHANGES = 1 << 1; + const MOVE_RESOURCES = 1 << 2; + const TABLE_ITEMS = 1 << 3; + const TABLE_METADATAS = 1 << 4; + const MOVE_MODULES = 1 << 5; + + // Fungible asset + const FUNGIBLE_ASSET_BALANCES = 1 << 6; + const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7; + const COIN_SUPPLY = 1 << 8; + + // Objects + const OBJECTS = 1 << 9; + + // Ans + const CURRENT_ANS_LOOKUP = 1 << 10; + const CURRENT_ANS_PRIMARY_NAME = 1 << 11; + const ANS_PRIMARY_NAME_V2 = 1 << 12; + const ANS_LOOKUP = 1 << 13; + const ANS_PRIMARY_NAME = 1 << 14; + + // Coin + const COIN_ACTIVITIES = 1 << 15; + const COIN_BALANCES = 1 << 16; + const CURRENT_COIN_BALANCES = 1 << 17; + const COIN_INFOS = 1 << 18; + + // Token_v2 processor flags + const TOKEN_OWNERSHIPS_V2 = 1 << 19; + const TOKEN_DATAS_V2 = 1 << 20; + const COLLECTIONS_V2 = 1 << 21; + const CURRENT_TOKEN_V2_METADATA = 1 << 22; + + // User transaction + const SIGNATURES = 1 << 23; } } @@ -823,6 +854,7 @@ pub fn build_processor( db_pool, config.clone(), per_table_chunk_sizes, + deprecated_tables, )), ProcessorConfig::CoinProcessor => { Processor::from(CoinProcessor::new(db_pool, per_table_chunk_sizes)) @@ -835,9 +867,11 @@ pub fn build_processor( ProcessorConfig::EventsProcessor => { Processor::from(EventsProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::FungibleAssetProcessor => { - Processor::from(FungibleAssetProcessor::new(db_pool, per_table_chunk_sizes)) - }, + ProcessorConfig::FungibleAssetProcessor => Processor::from(FungibleAssetProcessor::new( + db_pool, + per_table_chunk_sizes, + deprecated_tables, + )), ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)), ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) @@ -846,6 +880,7 @@ pub fn build_processor( db_pool, config.clone(), per_table_chunk_sizes, + deprecated_tables, )), ProcessorConfig::StakeProcessor(config) => Processor::from(StakeProcessor::new( db_pool, @@ -861,12 +896,13 @@ pub fn build_processor( db_pool, config.clone(), per_table_chunk_sizes, + deprecated_tables, )), ProcessorConfig::TransactionMetadataProcessor => Processor::from( TransactionMetadataProcessor::new(db_pool, per_table_chunk_sizes), ), ProcessorConfig::UserTransactionProcessor => Processor::from( - UserTransactionProcessor::new(db_pool, per_table_chunk_sizes), + UserTransactionProcessor::new(db_pool, per_table_chunk_sizes, deprecated_tables), ), ProcessorConfig::DefaultParquetProcessor(config) => { Processor::from(DefaultParquetProcessor::new(