Skip to content

Commit

Permalink
[EI-463] Move coin supply to fa processor (#382)
Browse files Browse the repository at this point in the history
* Move coin supply to fa processor

* rebase

---------

Co-authored-by: bowenyang007 <[email protected]>
  • Loading branch information
rtso and bowenyang007 authored Jun 9, 2024
1 parent 539e974 commit e7cefdb
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 112 deletions.
18 changes: 1 addition & 17 deletions rust/processor/src/models/coin_models/coin_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use super::{
coin_balances::{CoinBalance, CurrentCoinBalance},
coin_infos::CoinInfo,
coin_supply::CoinSupply,
coin_utils::{CoinEvent, EventGuidResource},
};
use crate::{
Expand Down Expand Up @@ -82,7 +81,6 @@ impl CoinActivity {
Vec<CoinBalance>,
AHashMap<CoinType, CoinInfo>,
AHashMap<CurrentCoinBalancePK, CurrentCoinBalance>,
Vec<CoinSupply>,
) {
// All the items we want to track
let mut coin_activities = Vec::new();
Expand All @@ -92,7 +90,7 @@ impl CoinActivity {
AHashMap::new();
// This will help us get the coin type when we see coin deposit/withdraw events for coin activities
let mut all_event_to_coin_type: EventToCoinType = AHashMap::new();
let mut all_coin_supply = Vec::new();

// Extracts events and user request from genesis and user transactions. Other transactions won't have coin events
let txn_data = match transaction.txn_data.as_ref() {
Some(data) => data,
Expand All @@ -116,7 +114,6 @@ impl CoinActivity {

// The rest are fields common to all transactions
let txn_version = transaction.version as i64;
let txn_epoch = transaction.epoch as i64;
let block_height = transaction.block_height as i64;
let transaction_info = transaction
.info
Expand Down Expand Up @@ -170,15 +167,6 @@ impl CoinActivity {
(None, None)
};

let maybe_coin_supply = if let WriteSetChangeEnum::WriteTableItem(table_item) =
wsc.change.as_ref().unwrap()
{
CoinSupply::from_write_table_item(table_item, txn_version, txn_timestamp, txn_epoch)
.unwrap()
} else {
None
};

if let Some(coin_info) = maybe_coin_info {
coin_infos.insert(coin_info.coin_type.clone(), coin_info);
}
Expand All @@ -195,9 +183,6 @@ impl CoinActivity {
coin_balances.push(coin_balance);
all_event_to_coin_type.extend(event_to_coin_type);
}
if let Some(coin_supply) = maybe_coin_supply {
all_coin_supply.push(coin_supply);
}
}
for (index, event) in events.iter().enumerate() {
let event_type = event.type_str.clone();
Expand All @@ -222,7 +207,6 @@ impl CoinActivity {
coin_balances,
coin_infos,
current_coin_balances,
all_coin_supply,
)
}

Expand Down
43 changes: 4 additions & 39 deletions rust/processor/src/processors/coin_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
coin_activities::CoinActivity,
coin_balances::{CoinBalance, CurrentCoinBalance},
coin_infos::CoinInfo,
coin_supply::CoinSupply,
},
fungible_asset_models::v2_fungible_asset_activities::CurrentCoinBalancePK,
},
Expand Down Expand Up @@ -61,7 +60,6 @@ async fn insert_to_db(
coin_infos: &[CoinInfo],
coin_balances: &[CoinBalance],
current_coin_balances: &[CurrentCoinBalance],
coin_supply: &[CoinSupply],
per_table_chunk_sizes: &AHashMap<String, usize>,
) -> Result<(), diesel::result::Error> {
tracing::trace!(
Expand Down Expand Up @@ -98,15 +96,9 @@ async fn insert_to_db(
per_table_chunk_sizes,
),
);
let cs = execute_in_chunks(
conn,
insert_coin_supply_query,
coin_supply,
get_config_table_chunk_size::<CoinSupply>("coin_supply", per_table_chunk_sizes),
);

let (ca_res, ci_res, cb_res, ccb_res, cs_res) = tokio::join!(ca, ci, cb, ccb, cs);
for res in [ca_res, ci_res, cb_res, ccb_res, cs_res] {
let (ca_res, ci_res, cb_res, ccb_res) = tokio::join!(ca, ci, cb, ccb);
for res in [ca_res, ci_res, cb_res, ccb_res] {
res?;
}
Ok(())
Expand Down Expand Up @@ -206,23 +198,6 @@ fn insert_current_coin_balances_query(
)
}

fn insert_coin_supply_query(
items_to_insert: Vec<CoinSupply>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Option<&'static str>,
) {
use schema::coin_supply::dsl::*;

(
diesel::insert_into(schema::coin_supply::table)
.values(items_to_insert)
.on_conflict((transaction_version, coin_type_hash))
.do_nothing(),
None,
)
}

#[async_trait]
impl ProcessorTrait for CoinProcessor {
fn name(&self) -> &'static str {
Expand All @@ -244,26 +219,18 @@ impl ProcessorTrait for CoinProcessor {
all_coin_infos,
all_coin_balances,
all_current_coin_balances,
all_coin_supply,
) = tokio::task::spawn_blocking(move || {
let mut all_coin_activities = vec![];
let mut all_coin_balances = vec![];
let mut all_coin_infos: AHashMap<String, CoinInfo> = AHashMap::new();
let mut all_current_coin_balances: AHashMap<CurrentCoinBalancePK, CurrentCoinBalance> =
AHashMap::new();
let mut all_coin_supply = vec![];

for txn in &transactions {
let (
mut coin_activities,
mut coin_balances,
coin_infos,
current_coin_balances,
mut coin_supply,
) = CoinActivity::from_transaction(txn);
let (mut coin_activities, mut coin_balances, coin_infos, current_coin_balances) =
CoinActivity::from_transaction(txn);
all_coin_activities.append(&mut coin_activities);
all_coin_balances.append(&mut coin_balances);
all_coin_supply.append(&mut coin_supply);
// For coin infos, we only want to keep the first version, so insert only if key is not present already
for (key, value) in coin_infos {
all_coin_infos.entry(key).or_insert(value);
Expand All @@ -286,7 +253,6 @@ impl ProcessorTrait for CoinProcessor {
all_coin_infos,
all_coin_balances,
all_current_coin_balances,
all_coin_supply,
)
})
.await
Expand All @@ -304,7 +270,6 @@ impl ProcessorTrait for CoinProcessor {
&all_coin_infos,
&all_coin_balances,
&all_current_coin_balances,
&all_coin_supply,
&self.per_table_chunk_sizes,
)
.await;
Expand Down
Loading

0 comments on commit e7cefdb

Please sign in to comment.