Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjayprabhu committed Nov 27, 2024
1 parent a7e3aba commit aec0006
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 59 deletions.
7 changes: 4 additions & 3 deletions src/core/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::proto::msg as message;
use crate::proto::msg::MessageType;
use crate::proto::snapchain;

impl message::Message {
Expand All @@ -14,11 +15,11 @@ impl message::Message {
}
}

pub fn msg_type(&self) -> u8 {
pub fn msg_type(&self) -> MessageType {
if self.data.is_some() {
self.data.as_ref().unwrap().r#type as u8
MessageType::try_from(self.data.as_ref().unwrap().r#type).unwrap_or(MessageType::None)
} else {
0
MessageType::None
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/store/account/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,10 @@ impl<T: StoreDef + Clone> Store<T> {

let mut count = current_count;

if count <= max_count {
return Ok(pruned_events); // Nothing to prune
}

let prefix = &make_message_primary_key(fid, self.store_def.postfix(), None);
self.db.for_each_iterator_by_prefix(
Some(prefix.to_vec()),
Expand Down
30 changes: 21 additions & 9 deletions src/storage/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl ShardEngine {

self.count("prepare_proposal.recv_messages", messages.len() as u64);

let mut snapchain_txns = self.create_transactions_from_mempool(messages);
let mut snapchain_txns = self.create_transactions_from_mempool(messages)?;
for snapchain_txn in &mut snapchain_txns {
let (account_root, _events) = self.replay_snapchain_txn(&snapchain_txn, txn_batch)?;
snapchain_txn.account_root = account_root;
Expand All @@ -210,7 +210,7 @@ impl ShardEngine {
fn create_transactions_from_mempool(
&mut self,
messages: Vec<MempoolMessage>,
) -> Vec<Transaction> {
) -> Result<Vec<Transaction>, EngineError> {
let mut transactions = vec![];

let grouped_messages = messages.iter().into_group_map_by(|msg| msg.fid());
Expand All @@ -222,25 +222,34 @@ impl ShardEngine {
system_messages: vec![],
user_messages: vec![],
};
let storage_slot = self
.stores
.onchain_event_store
.get_storage_slot_for_fid(fid)?;
for msg in messages {
match msg {
MempoolMessage::ValidatorMessage(msg) => {
transaction.system_messages.push(msg.clone());
}
MempoolMessage::UserMessage(msg) => {
transaction.user_messages.push(msg.clone());
// Only include messages for users that have storage
if storage_slot.is_active() {
transaction.user_messages.push(msg.clone());
}
}
}
}
transactions.push(transaction);
if !transaction.user_messages.is_empty() || !transaction.system_messages.is_empty() {
transactions.push(transaction);
}
}
info!(
transactions = transactions.len(),
messages = messages.len(),
fids = unique_fids,
"Created transactions from mempool"
);
transactions
Ok(transactions)
}

pub fn propose_state_change(&mut self, shard: u32) -> ShardStateChange {
Expand Down Expand Up @@ -295,9 +304,7 @@ impl ShardEngine {
self.update_trie(&event, txn_batch)?;
events.push(event.clone());
user_messages_count += 1;
let msg_type = MessageType::try_from(msg.msg_type() as i32)
.or(Err(EngineError::InvalidMessageType))?;
message_types.insert(msg_type);
message_types.insert(msg.msg_type());
}
Err(err) => {
warn!(
Expand All @@ -319,7 +326,6 @@ impl ShardEngine {
self.update_trie(&event, txn_batch)?;
events.push(event.clone());
}
info!(fid = fid, msg_type = msg_type.into_u8(), "Pruned messages");
}
Err(err) => {
warn!(
Expand Down Expand Up @@ -445,6 +451,12 @@ impl ShardEngine {
}
}?;

info!(
fid = fid,
msg_type = msg_type.into_u8(),
count = events.len(),
"Pruned messages"
);
Ok(events)
}

Expand Down
Loading

0 comments on commit aec0006

Please sign in to comment.