From 723b3f98ba40dbcad548bc5661fb89df1f768a5e Mon Sep 17 00:00:00 2001 From: Sanjay Raveendran Date: Wed, 27 Nov 2024 13:33:42 -0800 Subject: [PATCH] Cleanup --- .../store/account/verification_store.rs | 16 ++--- src/storage/store/engine.rs | 62 ++++++++++++------- src/storage/store/stores.rs | 2 +- src/utils/factory.rs | 5 +- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/storage/store/account/verification_store.rs b/src/storage/store/account/verification_store.rs index 7bd4209..810e819 100644 --- a/src/storage/store/account/verification_store.rs +++ b/src/storage/store/account/verification_store.rs @@ -1,21 +1,17 @@ use tracing::info; use super::{ - get_many_messages_as_bytes, get_message, make_fid_key, make_message_primary_key, make_ts_hash, - make_user_key, message_decode, read_fid_key, + make_fid_key, make_ts_hash, make_user_key, message_decode, read_fid_key, store::{Store, StoreDef}, - MessagesPage, StoreEventHandler, FID_BYTES, PAGE_SIZE_MAX, TS_HASH_LENGTH, + MessagesPage, StoreEventHandler, FID_BYTES, TS_HASH_LENGTH, }; +use crate::storage::util::increment_vec_u8; use crate::{ core::error::HubError, - proto::msg::{ - link_body::Target, Protocol, SignatureScheme, VerificationAddAddressBody, - VerificationRemoveBody, - }, - storage::{store::account::delete_message_transaction, util::vec_to_u8_24}, + proto::msg::{Protocol, SignatureScheme, VerificationAddAddressBody, VerificationRemoveBody}, + storage::store::account::delete_message_transaction, }; use crate::{proto::msg::message_data::Body, storage::db::PageOptions}; -use crate::{proto::msg::LinkBody, storage::util::increment_vec_u8}; use crate::{ proto::msg::MessageData, storage::constants::{RootPrefix, UserPostfix}, @@ -24,7 +20,7 @@ use crate::{ proto::msg::{Message, MessageType}, storage::db::{RocksDB, RocksDbTransactionBatch}, }; -use std::{borrow::Borrow, convert::TryInto, sync::Arc}; +use std::sync::Arc; #[derive(Clone)] pub struct VerificationStoreDef { diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index 8e721b0..1d6e6cc 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -271,13 +271,29 @@ impl ShardEngine { ) -> Result, EngineError> { let mut events = vec![]; for snapchain_txn in transactions { - let (_, txn_events) = self.replay_snapchain_txn(snapchain_txn, txn_batch)?; + let (account_root, txn_events) = self.replay_snapchain_txn(snapchain_txn, txn_batch)?; + // Reject early if account roots fail to match (shard roots will definitely fail) + if &account_root != &snapchain_txn.account_root { + warn!( + fid = snapchain_txn.fid, + new_account_root = hex::encode(&account_root), + tx_account_root = hex::encode(&snapchain_txn.account_root), + "Account root mismatch" + ); + return Err(EngineError::HashMismatch); + } events.extend(txn_events); } let root1 = self.stores.trie.root_hash()?; if &root1 != shard_root { + warn!( + shard_id = self.shard_id, + new_shard_root = hex::encode(&root1), + tx_shard_root = hex::encode(shard_root), + "Shard root mismatch" + ); return Err(EngineError::HashMismatch); } @@ -296,6 +312,27 @@ impl ShardEngine { let mut events = vec![]; let mut message_types = HashSet::new(); + // System messages first, then user messages and finally prunes + for msg in &snapchain_txn.system_messages { + if let Some(onchain_event) = &msg.on_chain_event { + let event = self + .stores + .onchain_event_store + .merge_onchain_event(onchain_event.clone(), txn_batch); + + match event { + Ok(hub_event) => { + self.update_trie(&hub_event, txn_batch)?; + events.push(hub_event.clone()); + system_messages_count += 1; + } + Err(err) => { + warn!("Error merging onchain event: {:?}", err); + } + } + } + } + for msg in &snapchain_txn.user_messages { // Errors are validated based on the shard root let result = self.merge_message(msg, txn_batch); @@ -338,26 +375,6 @@ impl ShardEngine { } } - for msg in &snapchain_txn.system_messages { - if let Some(onchain_event) = &msg.on_chain_event { - let event = self - .stores - .onchain_event_store - .merge_onchain_event(onchain_event.clone(), txn_batch); - - match event { - Ok(hub_event) => { - self.update_trie(&hub_event, txn_batch)?; - events.push(hub_event.clone()); - system_messages_count += 1; - } - Err(err) => { - warn!("Error merging onchain event: {:?}", err); - } - } - } - } - let account_root = self.stores.trie.get_hash( &self.db, txn_batch, @@ -369,7 +386,8 @@ impl ShardEngine { num_system_messages = total_system_messages, user_messages_merged = user_messages_count, system_messages_merged = system_messages_count, - account_root = hex::encode(&account_root), + new_account_root = hex::encode(&account_root), + tx_account_root = hex::encode(&snapchain_txn.account_root), "Replayed transaction" ); diff --git a/src/storage/store/stores.rs b/src/storage/store/stores.rs index 3884516..33544c7 100644 --- a/src/storage/store/stores.rs +++ b/src/storage/store/stores.rs @@ -2,7 +2,7 @@ use crate::proto::msg::MessageType; use crate::storage::db::{RocksDB, RocksDbTransactionBatch}; use crate::storage::store::account::{ CastStore, CastStoreDef, IntoU8, LinkStore, OnchainEventStorageError, OnchainEventStore, Store, - StoreDef, StoreEventHandler, + StoreEventHandler, }; use crate::storage::store::shard::ShardStore; use crate::storage::trie::merkle_trie; diff --git a/src/utils/factory.rs b/src/utils/factory.rs index c1d5322..05e6d6a 100644 --- a/src/utils/factory.rs +++ b/src/utils/factory.rs @@ -347,10 +347,7 @@ pub mod messages_factory { } pub mod verifications { - use message::{ - reaction_body::Target, ReactionBody, ReactionType, VerificationAddAddressBody, - VerificationRemoveBody, - }; + use message::{VerificationAddAddressBody, VerificationRemoveBody}; use super::*;