From cf46a81238eefaeff1850b94894b51ae25d31a6d Mon Sep 17 00:00:00 2001 From: musitdev Date: Thu, 22 Aug 2024 22:03:34 +0200 Subject: [PATCH 1/8] add sleep time between gRpc request --- rust/Cargo.lock | 6 +++--- rust/processor/src/config.rs | 7 +++++++ rust/processor/src/grpc_stream.rs | 7 +++++++ rust/processor/src/worker.rs | 5 +++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a49389540..33c27a563 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -148,7 +148,7 @@ dependencies = [ [[package]] name = "aptos-profiler" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/aptos-core?rev=074c4bac6cab49c0e6deaff99ce8d2814ab6e812#074c4bac6cab49c0e6deaff99ce8d2814ab6e812" +source = "git+https://github.com/movementlabsxyz/aptos-core?rev=338f9a1bcc06f62ce4a4994f1642b9a61b631ee0#338f9a1bcc06f62ce4a4994f1642b9a61b631ee0" dependencies = [ "anyhow", "backtrace", @@ -161,7 +161,7 @@ dependencies = [ [[package]] name = "aptos-protos" version = "1.3.0" -source = "git+https://github.com/movementlabsxyz/aptos-core?rev=074c4bac6cab49c0e6deaff99ce8d2814ab6e812#074c4bac6cab49c0e6deaff99ce8d2814ab6e812" +source = "git+https://github.com/movementlabsxyz/aptos-core?rev=338f9a1bcc06f62ce4a4994f1642b9a61b631ee0#338f9a1bcc06f62ce4a4994f1642b9a61b631ee0" dependencies = [ "futures-core", "pbjson", @@ -173,7 +173,7 @@ dependencies = [ [[package]] name = "aptos-system-utils" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/aptos-core?rev=074c4bac6cab49c0e6deaff99ce8d2814ab6e812#074c4bac6cab49c0e6deaff99ce8d2814ab6e812" +source = "git+https://github.com/movementlabsxyz/aptos-core?rev=338f9a1bcc06f62ce4a4994f1642b9a61b631ee0#338f9a1bcc06f62ce4a4994f1642b9a61b631ee0" dependencies = [ "anyhow", "aptos-profiler", diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 785130334..310ad45f4 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -14,6 +14,7 @@ use url::Url; pub const QUERY_DEFAULT_RETRIES: u32 = 5; pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500; +pub const DEFAULT_SLEEP_TIME_BETWENN_REQUEST_MS: u64 = 10; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -55,9 +56,14 @@ pub struct IndexerGrpcProcessorConfig { // String vector for deprecated tables to skip db writes #[serde(default)] pub deprecated_tables: HashSet, + #[serde(default = "IndexerGrpcProcessorConfig::default_sleep_time_between_request")] + pub default_sleep_time_between_request: u64, } impl IndexerGrpcProcessorConfig { + pub const fn default_sleep_time_between_request() -> u64 { + DEFAULT_SLEEP_TIME_BETWENN_REQUEST_MS + } pub const fn default_gap_detection_batch_size() -> u64 { DEFAULT_GAP_DETECTION_BATCH_SIZE } @@ -103,6 +109,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.transaction_filter.clone(), self.grpc_response_item_timeout_in_secs, self.deprecated_tables.clone(), + self.default_sleep_time_between_request, ) .await .context("Failed to build worker")?; diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index e1674dcd0..2f7be14fd 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -316,6 +316,7 @@ pub async fn create_fetcher_loop( transaction_filter: crate::transaction_filter::TransactionFilter, // The number of transactions per protobuf batch pb_channel_txn_chunk_size: usize, + sleep_time_between_request: u64, ) { info!( processor_name = processor_name, @@ -359,6 +360,12 @@ pub async fn create_fetcher_loop( let mut send_ma = MovingAverage::new(3000); loop { + //Add a sleep to slow down indexer query + tokio::time::sleep(tokio::time::Duration::from_millis( + sleep_time_between_request, + )) + .await; + let is_success = match tokio::time::timeout( indexer_grpc_response_item_timeout_secs, resp_stream.next(), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 7ca85cc50..9e4631d83 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -126,6 +126,7 @@ pub struct Worker { pub transaction_filter: TransactionFilter, pub grpc_response_item_timeout_in_secs: u64, pub deprecated_tables: TableFlags, + pub sleep_time_between_request: u64, } impl Worker { @@ -149,6 +150,7 @@ impl Worker { transaction_filter: TransactionFilter, grpc_response_item_timeout_in_secs: u64, deprecated_tables: HashSet, + sleep_time_between_request: u64, ) -> Result { let processor_name = processor_config.name(); info!(processor_name = processor_name, "[Parser] Kicking off"); @@ -194,6 +196,7 @@ impl Worker { transaction_filter, grpc_response_item_timeout_in_secs, deprecated_tables: deprecated_tables_flags, + sleep_time_between_request, }) } @@ -281,6 +284,7 @@ impl Worker { let transaction_filter = self.transaction_filter.clone(); let grpc_response_item_timeout = std::time::Duration::from_secs(self.grpc_response_item_timeout_in_secs); + let sleep_time_between_request = self.sleep_time_between_request; let fetcher_task = tokio::spawn(async move { info!( processor_name = processor_name, @@ -303,6 +307,7 @@ impl Worker { processor_name.to_string(), transaction_filter, pb_channel_txn_chunk_size, + sleep_time_between_request, ) .await }); From 05dac9772269abe3672b1ea5c0fd260dbe8ab12a Mon Sep 17 00:00:00 2001 From: musitdev Date: Thu, 22 Aug 2024 22:16:03 +0200 Subject: [PATCH 2/8] remove the sleep is sleep time is 0 --- rust/processor/src/grpc_stream.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 2f7be14fd..23ed56733 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -361,10 +361,12 @@ pub async fn create_fetcher_loop( loop { //Add a sleep to slow down indexer query - tokio::time::sleep(tokio::time::Duration::from_millis( - sleep_time_between_request, - )) - .await; + if sleep_time_between_request != 0 { + tokio::time::sleep(tokio::time::Duration::from_millis( + sleep_time_between_request, + )) + .await; + } let is_success = match tokio::time::timeout( indexer_grpc_response_item_timeout_secs, From 91662926da84e1eaf5973928b4812ebe43721107 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sat, 7 Sep 2024 17:20:25 +0200 Subject: [PATCH 3/8] remove panic if a writeset has no data. Ignore it --- .../parquet_write_set_changes.rs | 19 ++++----- .../default_models/write_set_changes.rs | 39 +++++++++---------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs index 0c98243c2..daddb11f9 100644 --- a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -61,12 +61,8 @@ impl WriteSetChange { block_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { let change_type = Self::get_write_set_change_type(write_set_change); - let change = write_set_change - .change - .as_ref() - .expect("WriteSetChange must have a change"); - match change { - WriteSetChangeEnum::WriteModule(inner) => Ok(Some(( + match write_set_change.change.as_ref() { + Some(WriteSetChangeEnum::WriteModule(inner)) => Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -86,7 +82,7 @@ impl WriteSetChange { block_timestamp, )), ))), - WriteSetChangeEnum::DeleteModule(inner) => Ok(Some(( + Some(WriteSetChangeEnum::DeleteModule(inner)) => Ok(Some(( Self { txn_version, state_key_hash: standardize_address( @@ -106,7 +102,7 @@ impl WriteSetChange { block_timestamp, )), ))), - WriteSetChangeEnum::WriteResource(inner) => { + Some(WriteSetChangeEnum::WriteResource(inner)) => { let resource_option = MoveResource::from_write_resource( inner, write_set_change_index, @@ -138,7 +134,7 @@ impl WriteSetChange { )) }) }, - WriteSetChangeEnum::DeleteResource(inner) => { + Some(WriteSetChangeEnum::DeleteResource(inner)) => { let resource_option = MoveResource::from_delete_resource( inner, write_set_change_index, @@ -170,7 +166,7 @@ impl WriteSetChange { )) }) }, - WriteSetChangeEnum::WriteTableItem(inner) => { + Some(WriteSetChangeEnum::WriteTableItem(inner)) => { let (ti, cti) = TableItem::from_write_table_item( inner, write_set_change_index, @@ -197,7 +193,7 @@ impl WriteSetChange { ), ))) }, - WriteSetChangeEnum::DeleteTableItem(inner) => { + Some(WriteSetChangeEnum::DeleteTableItem(inner)) => { let (ti, cti) = TableItem::from_delete_table_item( inner, write_set_change_index, @@ -220,6 +216,7 @@ impl WriteSetChange { WriteSetChangeDetail::Table(ti, cti, None), ))) }, + None => Ok(None), } } diff --git a/rust/processor/src/db/common/models/default_models/write_set_changes.rs b/rust/processor/src/db/common/models/default_models/write_set_changes.rs index 11a2e36db..c0aa6044e 100644 --- a/rust/processor/src/db/common/models/default_models/write_set_changes.rs +++ b/rust/processor/src/db/common/models/default_models/write_set_changes.rs @@ -41,15 +41,11 @@ impl WriteSetChange { index: i64, transaction_version: i64, transaction_block_height: i64, - ) -> (Self, WriteSetChangeDetail) { + ) -> Option<(Self, WriteSetChangeDetail)> { let type_ = Self::get_write_set_change_type(write_set_change); - let change = write_set_change - .change - .as_ref() - .expect("WriteSetChange must have a change"); - match change { - WriteSetChangeEnum::WriteModule(inner) => ( + match write_set_change.change.as_ref() { + Some(WriteSetChangeEnum::WriteModule(inner)) => Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -64,8 +60,8 @@ impl WriteSetChange { transaction_version, transaction_block_height, )), - ), - WriteSetChangeEnum::DeleteModule(inner) => ( + )), + Some(WriteSetChangeEnum::DeleteModule(inner)) => Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -80,8 +76,8 @@ impl WriteSetChange { transaction_version, transaction_block_height, )), - ), - WriteSetChangeEnum::WriteResource(inner) => ( + )), + Some(WriteSetChangeEnum::WriteResource(inner)) => Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -96,8 +92,8 @@ impl WriteSetChange { transaction_version, transaction_block_height, )), - ), - WriteSetChangeEnum::DeleteResource(inner) => ( + )), + Some(WriteSetChangeEnum::DeleteResource(inner)) => Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -112,15 +108,15 @@ impl WriteSetChange { transaction_version, transaction_block_height, )), - ), - WriteSetChangeEnum::WriteTableItem(inner) => { + )), + Some(WriteSetChangeEnum::WriteTableItem(inner)) => { let (ti, cti) = TableItem::from_write_table_item( inner, index, transaction_version, transaction_block_height, ); - ( + Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -134,16 +130,16 @@ impl WriteSetChange { cti, Some(TableMetadata::from_write_table_item(inner)), ), - ) + )) }, - WriteSetChangeEnum::DeleteTableItem(inner) => { + Some(WriteSetChangeEnum::DeleteTableItem(inner)) => { let (ti, cti) = TableItem::from_delete_table_item( inner, index, transaction_version, transaction_block_height, ); - ( + Some(( Self { transaction_version, hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()), @@ -153,8 +149,9 @@ impl WriteSetChange { index, }, WriteSetChangeDetail::Table(ti, cti, None), - ) + )) }, + None => None, } } @@ -166,7 +163,7 @@ impl WriteSetChange { write_set_changes .iter() .enumerate() - .map(|(index, write_set_change)| { + .filter_map(|(index, write_set_change)| { Self::from_write_set_change( write_set_change, index as i64, From a11a7ef5346e11a827cec7394a7a3cc4461af820 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sun, 8 Sep 2024 00:11:06 +0200 Subject: [PATCH 4/8] remove all unwrap on writeset.data --- .../models/coin_models/coin_activities.rs | 6 +- .../stake_models/current_delegated_voter.rs | 67 ++++++++++--------- .../models/stake_models/delegator_balances.rs | 8 ++- .../models/stake_models/delegator_pools.rs | 5 +- .../models/stake_models/staking_pool_voter.rs | 24 +++++-- .../models/token_models/collection_datas.rs | 3 + .../models/token_models/token_claims.rs | 6 ++ .../common/models/token_models/token_datas.rs | 3 + .../db/common/models/token_models/tokens.rs | 12 +++- .../token_v2_models/v1_token_royalty.rs | 3 + .../models/token_v2_models/v2_collections.rs | 3 + .../models/token_v2_models/v2_token_datas.rs | 3 + .../token_v2_models/v2_token_ownerships.rs | 6 ++ .../src/processors/nft_metadata_processor.rs | 13 +++- .../parquet_fungible_asset_processor.rs | 20 +++++- .../src/processors/stake_processor.rs | 18 ++++- .../src/processors/token_v2_processor.rs | 19 +++++- rust/processor/src/utils/util.rs | 6 ++ 18 files changed, 170 insertions(+), 55 deletions(-) diff --git a/rust/processor/src/db/common/models/coin_models/coin_activities.rs b/rust/processor/src/db/common/models/coin_models/coin_activities.rs index e7467761f..630d75267 100644 --- a/rust/processor/src/db/common/models/coin_models/coin_activities.rs +++ b/rust/processor/src/db/common/models/coin_models/coin_activities.rs @@ -149,7 +149,11 @@ impl CoinActivity { } // Need coin info from move resources - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { let (maybe_coin_info, maybe_coin_balance_data) = if let WriteSetChangeEnum::WriteResource(write_resource) = &wsc.change.as_ref().unwrap() diff --git a/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs b/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs index e87dcde71..d07111a4a 100644 --- a/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs +++ b/rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs @@ -81,20 +81,21 @@ impl CurrentDelegatedVoter { ) -> anyhow::Result { let mut delegated_voter_map: CurrentDelegatedVoterMap = AHashMap::new(); - let table_item_data = write_table_item.data.as_ref().unwrap(); - let table_handle = standardize_address(&write_table_item.handle); - if let Some(VoteDelegationTableItem::VoteDelegationVector(vote_delegation_vector)) = - VoteDelegationTableItem::from_table_item_type( - table_item_data.value_type.as_str(), - &table_item_data.value, - txn_version, - )? - { - let pool_address = match vote_delegation_handle_to_pool_address.get(&table_handle) { - Some(pool_address) => pool_address.clone(), - None => { - // look up from db - Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await + if write_table_item.data.is_some() { + let table_item_data = write_table_item.data.as_ref().unwrap(); + let table_handle = standardize_address(&write_table_item.handle); + if let Some(VoteDelegationTableItem::VoteDelegationVector(vote_delegation_vector)) = + VoteDelegationTableItem::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? + { + let pool_address = match vote_delegation_handle_to_pool_address.get(&table_handle) { + Some(pool_address) => pool_address.clone(), + None => { + // look up from db + Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await .unwrap_or_else(|_| { tracing::error!( transaction_version = txn_version, @@ -103,28 +104,30 @@ impl CurrentDelegatedVoter { ); "".to_string() }) - }, - }; - if !pool_address.is_empty() { - for inner in vote_delegation_vector { - let delegator_address = inner.get_delegator_address(); - let voter = inner.value.get_voter(); - let pending_voter = inner.value.get_pending_voter(); + }, + }; + if !pool_address.is_empty() { + for inner in vote_delegation_vector { + let delegator_address = inner.get_delegator_address(); + let voter = inner.value.get_voter(); + let pending_voter = inner.value.get_pending_voter(); - let delegated_voter = CurrentDelegatedVoter { - delegator_address: delegator_address.clone(), - delegation_pool_address: pool_address.clone(), - voter: Some(voter.clone()), - pending_voter: Some(pending_voter.clone()), - last_transaction_timestamp: txn_timestamp, - last_transaction_version: txn_version, - table_handle: Some(table_handle.clone()), - }; - delegated_voter_map - .insert((pool_address.clone(), delegator_address), delegated_voter); + let delegated_voter = CurrentDelegatedVoter { + delegator_address: delegator_address.clone(), + delegation_pool_address: pool_address.clone(), + voter: Some(voter.clone()), + pending_voter: Some(pending_voter.clone()), + last_transaction_timestamp: txn_timestamp, + last_transaction_version: txn_version, + table_handle: Some(table_handle.clone()), + }; + delegated_voter_map + .insert((pool_address.clone(), delegator_address), delegated_voter); + } } } } + Ok(delegated_voter_map) } diff --git a/rust/processor/src/db/common/models/stake_models/delegator_balances.rs b/rust/processor/src/db/common/models/stake_models/delegator_balances.rs index 54790f43b..da9534ed6 100644 --- a/rust/processor/src/db/common/models/stake_models/delegator_balances.rs +++ b/rust/processor/src/db/common/models/stake_models/delegator_balances.rs @@ -409,7 +409,7 @@ impl CurrentDelegatorBalance { let changes = &transaction.info.as_ref().unwrap().changes; // Do a first pass to get the mapping of active_share table handles to staking pool resource let txn_version = transaction.version as i64; - for wsc in changes { + for wsc in changes.iter().filter(|wsc| wsc.change.is_some()) { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { if let Some(map) = Self::get_inactive_pool_to_staking_pool_mapping(write_resource, txn_version) @@ -428,7 +428,11 @@ impl CurrentDelegatorBalance { } } // Now make a pass through table items to get the actual delegator balances - for (index, wsc) in changes.iter().enumerate() { + for (index, wsc) in changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { let maybe_delegator_balance = match wsc.change.as_ref().unwrap() { Change::DeleteTableItem(table_item) => { if let Some((balance, current_balance)) = diff --git a/rust/processor/src/db/common/models/stake_models/delegator_pools.rs b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs index 6c5613990..f5461de89 100644 --- a/rust/processor/src/db/common/models/stake_models/delegator_pools.rs +++ b/rust/processor/src/db/common/models/stake_models/delegator_pools.rs @@ -122,7 +122,7 @@ impl DelegatorPool { .as_ref() .expect("Transaction info doesn't exist!") .changes; - for wsc in changes { + for wsc in changes.into_iter().filter(|wsc| wsc.change.is_some()) { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { let maybe_write_resource = Self::from_write_resource(write_resource, txn_version)?; @@ -174,6 +174,9 @@ impl DelegatorPool { write_table_item: &WriteTableItem, txn_version: i64, ) -> anyhow::Result> { + if write_table_item.data.is_some() { + return Ok(None); + } let table_item_data = write_table_item.data.as_ref().unwrap(); if let Some(StakeTableItem::Pool(inner)) = &StakeTableItem::from_table_item_type( diff --git a/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs b/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs index 67ecf4d45..70cc4302a 100644 --- a/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs +++ b/rust/processor/src/db/common/models/stake_models/staking_pool_voter.rs @@ -29,19 +29,29 @@ impl CurrentStakingPoolVoter { let mut staking_pool_voters = AHashMap::new(); let txn_version = transaction.version as i64; - for wsc in &transaction.info.as_ref().unwrap().changes { + for wsc in transaction + .info + .as_ref() + .unwrap() + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { if let Some(StakeResource::StakePool(inner)) = StakeResource::from_write_resource(write_resource, txn_version)? { let staking_pool_address = standardize_address(&write_resource.address.to_string()); - staking_pool_voters.insert(staking_pool_address.clone(), Self { - staking_pool_address, - voter_address: inner.get_delegated_voter(), - last_transaction_version: txn_version, - operator_address: inner.get_operator_address(), - }); + staking_pool_voters.insert( + staking_pool_address.clone(), + Self { + staking_pool_address, + voter_address: inner.get_delegated_voter(), + last_transaction_version: txn_version, + operator_address: inner.get_operator_address(), + }, + ); } } } diff --git a/rust/processor/src/db/common/models/token_models/collection_datas.rs b/rust/processor/src/db/common/models/token_models/collection_datas.rs index 977fa8fc8..7d2f6899f 100644 --- a/rust/processor/src/db/common/models/token_models/collection_datas.rs +++ b/rust/processor/src/db/common/models/token_models/collection_datas.rs @@ -89,6 +89,9 @@ impl CollectionData { query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_collection_data = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_models/token_claims.rs b/rust/processor/src/db/common/models/token_models/token_claims.rs index f418aede5..2568f11bf 100644 --- a/rust/processor/src/db/common/models/token_models/token_claims.rs +++ b/rust/processor/src/db/common/models/token_models/token_claims.rs @@ -59,6 +59,9 @@ impl CurrentTokenPendingClaim { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_offer = match TokenWriteSet::from_table_item_type( @@ -137,6 +140,9 @@ impl CurrentTokenPendingClaim { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_offer = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_models/token_datas.rs b/rust/processor/src/db/common/models/token_models/token_datas.rs index 65bc7d3b8..ba95ea457 100644 --- a/rust/processor/src/db/common/models/token_models/token_datas.rs +++ b/rust/processor/src/db/common/models/token_models/token_datas.rs @@ -72,6 +72,9 @@ impl TokenData { txn_version: i64, txn_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_data = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_models/tokens.rs b/rust/processor/src/db/common/models/token_models/tokens.rs index 90c993e0b..ffddb6211 100644 --- a/rust/processor/src/db/common/models/token_models/tokens.rs +++ b/rust/processor/src/db/common/models/token_models/tokens.rs @@ -127,7 +127,11 @@ impl Token { .as_ref() .expect("Transaction info doesn't exist!"); - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { // Basic token and ownership data let (maybe_token_w_ownership, maybe_token_data, maybe_collection_data) = match wsc.change.as_ref().unwrap() { @@ -232,6 +236,9 @@ impl Token { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result, Option)>> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token = match TokenWriteSet::from_table_item_type( @@ -290,6 +297,9 @@ impl Token { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result, Option)>> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_id = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_v2_models/v1_token_royalty.rs b/rust/processor/src/db/common/models/token_v2_models/v1_token_royalty.rs index f7e1cb124..2f274d5fc 100644 --- a/rust/processor/src/db/common/models/token_v2_models/v1_token_royalty.rs +++ b/rust/processor/src/db/common/models/token_v2_models/v1_token_royalty.rs @@ -50,6 +50,9 @@ impl CurrentTokenRoyaltyV1 { transaction_version: i64, transaction_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { + if write_table_item.data.is_none() { + return Ok(None); + } let table_item_data = write_table_item.data.as_ref().unwrap(); let maybe_token_data = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_v2_models/v2_collections.rs b/rust/processor/src/db/common/models/token_v2_models/v2_collections.rs index 75cce2cd3..8910c88c8 100644 --- a/rust/processor/src/db/common/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/db/common/models/token_v2_models/v2_collections.rs @@ -215,6 +215,9 @@ impl CollectionV2 { query_retries: u32, query_retry_delay_ms: u64, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_collection_data = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/db/common/models/token_v2_models/v2_token_datas.rs index 3be211eb0..ce9d27be2 100644 --- a/rust/processor/src/db/common/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/db/common/models/token_v2_models/v2_token_datas.rs @@ -220,6 +220,9 @@ impl TokenDataV2 { write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, ) -> anyhow::Result> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_data = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/db/common/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/db/common/models/token_v2_models/v2_token_ownerships.rs index ecc26f539..dc9be5471 100644 --- a/rust/processor/src/db/common/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/db/common/models/token_v2_models/v2_token_ownerships.rs @@ -457,6 +457,9 @@ impl TokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result)>> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token = match TokenWriteSet::from_table_item_type( @@ -537,6 +540,9 @@ impl TokenOwnershipV2 { txn_timestamp: chrono::NaiveDateTime, table_handle_to_owner: &TableHandleToOwner, ) -> anyhow::Result)>> { + if table_item.data.is_none() { + return Ok(None); + } let table_item_data = table_item.data.as_ref().unwrap(); let maybe_token_id = match TokenWriteSet::from_table_item_type( diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index 4fcb9a922..4605827a3 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -235,7 +235,11 @@ async fn parse_v2_token( let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = AHashMap::new(); - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { if let Some(object) = ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() @@ -264,7 +268,12 @@ async fn parse_v2_token( } } - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { let wsc_index = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteTableItem(table_item) => { diff --git a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs index 31ebcd35a..5370ba8b6 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_fungible_asset_processor.rs @@ -179,7 +179,11 @@ async fn parse_v2_coin( // First loop to get all objects // Need to do a first pass to get all the objects - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { if let Some(object) = ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() @@ -195,7 +199,12 @@ async fn parse_v2_coin( } } - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { if let Some((balance, _, _)) = FungibleAssetBalance::get_v1_from_write_resource( write_resource, @@ -230,7 +239,12 @@ async fn parse_v2_coin( } // Loop to handle all the other changes - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { match wsc.change.as_ref().unwrap() { Change::WriteResource(write_resource) => { if let Some((balance, _)) = FungibleAssetBalance::get_v2_from_write_resource( diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index d623704d1..3822eb29c 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -441,7 +441,11 @@ impl ProcessorTrait for StakeProcessor { let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); // adding some metadata for subsequent parsing - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { if let Some(DelegationVoteGovernanceRecordsResource::GovernanceRecords(inner)) = DelegationVoteGovernanceRecordsResource::from_write_resource( @@ -484,7 +488,11 @@ impl ProcessorTrait for StakeProcessor { all_current_delegator_balances.extend(current_delegator_balances); // this write table item indexing is to get delegator address, table handle, and voter & pending voter - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { let voter_map = CurrentDelegatedVoter::from_write_table_item( write_table_item, @@ -503,7 +511,11 @@ impl ProcessorTrait for StakeProcessor { } // we need one last loop to prefill delegators that got in before the delegated voting contract was deployed - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteTableItem(write_table_item) = wsc.change.as_ref().unwrap() { if let Some(voter) = CurrentDelegatedVoter::get_delegators_pre_contract_deployment( diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 95645dd8d..e218f7fcd 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -785,7 +785,11 @@ async fn parse_v2_token( let mut tokens_minted: TokenV2Minted = AHashSet::new(); // Need to do a first pass to get all the objects - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { if let Some(object) = ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() @@ -802,7 +806,11 @@ async fn parse_v2_token( } // Need to do a second pass to get all the structs related to the object - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { let address = standardize_address(&wr.address.to_string()); if let Some(aggregated_data) = token_v2_metadata_helper.get_mut(&address) { @@ -918,7 +926,12 @@ async fn parse_v2_token( } } - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { let wsc_index = index as i64; match wsc.change.as_ref().unwrap() { Change::WriteTableItem(table_item) => { diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 4550de820..0a266bc9a 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -222,8 +222,14 @@ pub fn get_clean_payload(payload: &TransactionPayload, version: i64) -> Option Option { + if writeset.write_set.is_none() { + return None; + } match writeset.write_set.as_ref().unwrap() { WriteSetType::ScriptWriteSet(inner) => { + if inner.script.is_none() { + return None; + } let payload = inner.script.as_ref().unwrap(); Some( serde_json::to_value(get_clean_script_payload(payload, version)).unwrap_or_else( From 3902b82e847b83d1c7410d3cb5a13b676b88c87a Mon Sep 17 00:00:00 2001 From: musitdev Date: Sun, 8 Sep 2024 12:27:00 +0200 Subject: [PATCH 5/8] remove some missing unwrap --- .../account_transactions.rs | 2 +- .../src/db/common/models/token_models/tokens.rs | 5 ++++- rust/processor/src/processors/ans_processor.rs | 13 +++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs b/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs index 2d13ad948..383b587e0 100644 --- a/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs +++ b/rust/processor/src/db/common/models/account_transaction_models/account_transactions.rs @@ -92,7 +92,7 @@ impl AccountTransaction { for event in events { account_transactions.extend(Self::from_event(event, txn_version)); } - for wsc in wscs { + for wsc in wscs.iter().filter(|wsc| wsc.change.is_some()) { match wsc.change.as_ref().unwrap() { Change::DeleteResource(res) => { account_transactions diff --git a/rust/processor/src/db/common/models/token_models/tokens.rs b/rust/processor/src/db/common/models/token_models/tokens.rs index ffddb6211..c0ac01081 100644 --- a/rust/processor/src/db/common/models/token_models/tokens.rs +++ b/rust/processor/src/db/common/models/token_models/tokens.rs @@ -364,7 +364,10 @@ impl TableMetadataForToken { .info .as_ref() .expect("Transaction info doesn't exist!"); - for wsc in &transaction_info.changes { + for wsc in transaction_info + .changes + .iterfilter(|wsc| wsc.change.is_some()) + { if let WriteSetChangeEnum::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index addf6972e..a9177911f 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -547,7 +547,11 @@ fn parse_ans( } // Parse V2 ANS subdomain exts - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { match wsc.change.as_ref().unwrap() { WriteSetChange::WriteResource(write_resource) => { if let Some(subdomain_ext) = SubdomainExtV2::from_write_resource( @@ -569,7 +573,12 @@ fn parse_ans( } // Parse V1 ANS write set changes - for (wsc_index, wsc) in transaction_info.changes.iter().enumerate() { + for (wsc_index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { match wsc.change.as_ref().unwrap() { WriteSetChange::WriteTableItem(table_item) => { if let Some((current_ans_lookup, ans_lookup)) = From 3e65c3604f57676bb5026f4f1949737ddaec1855 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sun, 8 Sep 2024 13:44:22 +0200 Subject: [PATCH 6/8] correct build --- .../db/common/models/token_models/tokens.rs | 3 ++- .../processors/fungible_asset_processor.rs | 20 ++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/rust/processor/src/db/common/models/token_models/tokens.rs b/rust/processor/src/db/common/models/token_models/tokens.rs index c0ac01081..49e3ba72e 100644 --- a/rust/processor/src/db/common/models/token_models/tokens.rs +++ b/rust/processor/src/db/common/models/token_models/tokens.rs @@ -366,7 +366,8 @@ impl TableMetadataForToken { .expect("Transaction info doesn't exist!"); for wsc in transaction_info .changes - .iterfilter(|wsc| wsc.change.is_some()) + .iter() + .filter(|wsc| wsc.change.is_some()) { if let WriteSetChangeEnum::WriteResource(write_resource) = wsc.change.as_ref().unwrap() diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index b2642dc38..fb7a921c6 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -502,7 +502,11 @@ async fn parse_v2_coin( // First loop to get all objects // Need to do a first pass to get all the objects - for wsc in transaction_info.changes.iter() { + for wsc in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { if let Some(object) = ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() @@ -519,7 +523,12 @@ async fn parse_v2_coin( } // Loop to get the metadata relevant to parse v1 and v2. // As an optimization, we also handle v1 balances in the process - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() { if let Some((balance, current_balance, event_to_coin)) = FungibleAssetBalance::get_v1_from_write_resource( @@ -662,7 +671,12 @@ async fn parse_v2_coin( } // Loop to handle all the other changes - for (index, wsc) in transaction_info.changes.iter().enumerate() { + for (index, wsc) in transaction_info + .changes + .iter() + .filter(|wsc| wsc.change.is_some()) + .enumerate() + { match wsc.change.as_ref().unwrap() { Change::WriteResource(write_resource) => { if let Some(fa_metadata) = From 00265e2332a1ce17b2cc5d5a94cc61be96276594 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sun, 8 Sep 2024 14:16:48 +0200 Subject: [PATCH 7/8] remove panic on Tx save Error --- rust/processor/src/worker.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 9e4631d83..ae95188df 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -536,10 +536,11 @@ impl Worker { PROCESSOR_ERRORS_COUNT .with_label_values(&[processor_name]) .inc(); - panic!( - "[Parser][T#{}] Error processing '{:}' transactions: {:?}", - task_index, processor_name, e - ); + // panic!( + // "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + // task_index, processor_name, e + // ); + continue; }, }; @@ -844,12 +845,13 @@ pub async fn do_processor( // If the Tx processing abort because of KKey violation, skip the Tx. // As several Tx has been processed replay all the Tx one by one to // allow not present Tx to be saved. - if let Some(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::UniqueViolation, - msg, - )) = err.downcast_ref::() + // if let Some(diesel::result::Error::DatabaseError( + // diesel::result::DatabaseErrorKind::UniqueViolation, + // msg, + // )) = err.downcast_ref::() { - tracing::warn!("Unique Constraint violation replay the Tx: msg: {msg:?}"); + // tracing::warn!("Unique Constraint violation replay the Tx: msg: {msg:?}"); + tracing::warn!("Error during Tx processing, replay all Tx one by one: err: {err:?}"); //replay all Tx one by one let mut last_transaction_timestamp = None; for tx in transactions_pb.transactions { @@ -874,9 +876,10 @@ pub async fn do_processor( last_transaction_timestamp, }, )) - } else { - Err(err) } + // } else { + // Err(err) + // } } else { processed_result }; From 1d1d7c72a01a0b5a55477f8fdca59e17c9ce2e58 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sun, 8 Sep 2024 14:42:50 +0200 Subject: [PATCH 8/8] remove panic on Tx save Error --- rust/processor/src/worker.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index ae95188df..389da017f 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -845,13 +845,12 @@ pub async fn do_processor( // If the Tx processing abort because of KKey violation, skip the Tx. // As several Tx has been processed replay all the Tx one by one to // allow not present Tx to be saved. - // if let Some(diesel::result::Error::DatabaseError( - // diesel::result::DatabaseErrorKind::UniqueViolation, - // msg, - // )) = err.downcast_ref::() + if let Some(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + msg, + )) = err.downcast_ref::() { - // tracing::warn!("Unique Constraint violation replay the Tx: msg: {msg:?}"); - tracing::warn!("Error during Tx processing, replay all Tx one by one: err: {err:?}"); + tracing::warn!("Unique Constraint violation replay the Tx: msg: {msg:?}"); //replay all Tx one by one let mut last_transaction_timestamp = None; for tx in transactions_pb.transactions { @@ -876,10 +875,9 @@ pub async fn do_processor( last_transaction_timestamp, }, )) + } else { + Err(err) } - // } else { - // Err(err) - // } } else { processed_result };