Skip to content

Commit

Permalink
Simplify account_transaction_processor and use rayon to speed it up. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored Sep 5, 2024
1 parent 82ae5ff commit 836d83a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ use crate::{
schema::account_transactions,
utils::{counters::PROCESSOR_UNKNOWN_TYPE_COUNT, util::standardize_address},
};
use ahash::AHashMap;
use aptos_protos::transaction::v1::{
transaction::TxnData, write_set_change::Change, DeleteResource, Event, Transaction,
WriteResource,
};
use ahash::AHashSet;
use aptos_protos::transaction::v1::{transaction::TxnData, write_set_change::Change, Transaction};
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

Expand All @@ -39,7 +36,7 @@ impl AccountTransaction {
/// We will also consider transactions that the account signed or is part of a multi sig / multi agent.
/// TODO: recursively find the parent account of an object
/// TODO: include table items in the detection path
pub fn from_transaction(transaction: &Transaction) -> AHashMap<AccountTransactionPK, Self> {
pub fn get_accounts(transaction: &Transaction) -> AHashSet<String> {
let txn_version = transaction.version as i64;
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
Expand All @@ -51,7 +48,7 @@ impl AccountTransaction {
transaction_version = transaction.version,
"Transaction data doesn't exist",
);
return AHashMap::new();
return AHashSet::new();
},
};
let transaction_info = transaction.info.as_ref().unwrap_or_else(|| {
Expand All @@ -73,82 +70,43 @@ impl AccountTransaction {
TxnData::BlockMetadata(inner) => (&inner.events, vec![]),
TxnData::Validator(inner) => (&inner.events, vec![]),
_ => {
return AHashMap::new();
return AHashSet::new();
},
};
let mut account_transactions = AHashMap::new();
for sig in &signatures {
account_transactions.insert((sig.signer.clone(), txn_version), Self {
transaction_version: txn_version,
account_address: sig.signer.clone(),
});
let mut accounts = AHashSet::new();
for sig in signatures {
accounts.insert(sig.signer);
}
for event in events {
account_transactions.extend(Self::from_event(event, txn_version));
// Record event account address. We don't really have to worry about objects here
// because it'll be taken care of in the resource section.
accounts.insert(standardize_address(
event.key.as_ref().unwrap().account_address.as_str(),
));
}
for wsc in wscs {
match wsc.change.as_ref().unwrap() {
Change::DeleteResource(res) => {
account_transactions
.extend(Self::from_delete_resource(res, txn_version).unwrap());
// Record resource account.
// TODO: If the resource is an object, then we need to look for the latest
// owner. This isn't really possible right now given we have parallel threads
// so it'll be very difficult to ensure that we have the correct latest owner.
accounts.insert(standardize_address(res.address.as_str()));
},
Change::WriteResource(res) => {
account_transactions
.extend(Self::from_write_resource(res, txn_version).unwrap());
// Record resource account. If the resource is an object, then we record the
// owner as well.
// This handles partial deletes as well.
accounts.insert(standardize_address(res.address.as_str()));
if let Some(inner) =
&ObjectWithMetadata::from_write_resource(res, txn_version).unwrap()
{
accounts.insert(inner.object_core.get_owner_address());
}
},
_ => {},
}
}
account_transactions
}

/// Base case, record event account address. We don't really have to worry about
/// objects here because it'll be taken care of in the resource section
fn from_event(event: &Event, txn_version: i64) -> AHashMap<AccountTransactionPK, Self> {
let account_address =
standardize_address(event.key.as_ref().unwrap().account_address.as_str());
AHashMap::from([((account_address.clone(), txn_version), Self {
transaction_version: txn_version,
account_address,
})])
}

/// Base case, record resource account. If the resource is an object, then we record the owner as well
/// This handles partial deletes as well
fn from_write_resource(
write_resource: &WriteResource,
txn_version: i64,
) -> anyhow::Result<AHashMap<AccountTransactionPK, Self>> {
let mut result = AHashMap::new();
let account_address = standardize_address(write_resource.address.as_str());
result.insert((account_address.clone(), txn_version), Self {
transaction_version: txn_version,
account_address,
});
if let Some(inner) = &ObjectWithMetadata::from_write_resource(write_resource, txn_version)?
{
result.insert((inner.object_core.get_owner_address(), txn_version), Self {
transaction_version: txn_version,
account_address: inner.object_core.get_owner_address(),
});
}
Ok(result)
}

/// Base case, record resource account.
/// TODO: If the resource is an object, then we need to look for the latest owner. This isn't really possible
/// right now given we have parallel threads so it'll be very difficult to ensure that we have the correct
/// latest owner
fn from_delete_resource(
delete_resource: &DeleteResource,
txn_version: i64,
) -> anyhow::Result<AHashMap<AccountTransactionPK, Self>> {
let mut result = AHashMap::new();
let account_address = standardize_address(delete_resource.address.as_str());
result.insert((account_address.clone(), txn_version), Self {
transaction_version: txn_version,
account_address,
});
Ok(result)
accounts
}
}
32 changes: 18 additions & 14 deletions rust/processor/src/processors/account_transactions_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use anyhow::bail;
use aptos_protos::transaction::v1::Transaction;
use async_trait::async_trait;
use diesel::{pg::Pg, query_builder::QueryFragment};
use rayon::prelude::*;
use std::fmt::Debug;
use tracing::error;

Expand Down Expand Up @@ -101,20 +102,23 @@ impl ProcessorTrait for AccountTransactionsProcessor {
let processing_start = std::time::Instant::now();
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();

let mut account_transactions = AHashMap::new();

for txn in &transactions {
account_transactions.extend(AccountTransaction::from_transaction(txn));
}
let mut account_transactions = account_transactions
.into_values()
.collect::<Vec<AccountTransaction>>();

// Sort by PK
account_transactions.sort_by(|a, b| {
(&a.transaction_version, &a.account_address)
.cmp(&(&b.transaction_version, &b.account_address))
});
let account_transactions: Vec<_> = transactions
.into_par_iter()
.map(|txn| {
let transaction_version = txn.version as i64;
let accounts = AccountTransaction::get_accounts(&txn);
accounts
.into_iter()
.map(|account_address| AccountTransaction {
transaction_version,
account_address,
})
.collect()
})
.collect::<Vec<Vec<_>>>()
.into_iter()
.flatten()
.collect();

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
let db_insertion_start = std::time::Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ impl Worker {

let num_processed = (last_txn_version - first_txn_version) + 1;

debug!(
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
first_txn_version,
Expand Down

0 comments on commit 836d83a

Please sign in to comment.