Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parquet_fungible_asset_processor #456

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ pub mod v2_fungible_asset_activities;
pub mod v2_fungible_asset_balances;
pub mod v2_fungible_asset_utils;
pub mod v2_fungible_metadata;

// parquet models
pub mod parquet_coin_supply;
pub mod parquet_v2_fungible_asset_balances;
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::default_models::move_tables::TableItem,
utils::util::{hash_str, APTOS_COIN_TYPE_STR},
};
use allocative_derive::Allocative;
use anyhow::Context;
use aptos_protos::transaction::v1::WriteTableItem;
use bigdecimal::BigDecimal;
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

const APTOS_COIN_SUPPLY_TABLE_HANDLE: &str =
"0x1b854694ae746cdbd8d44186ca4929b2b337df21d1c74633be19b2710552fdca";
const APTOS_COIN_SUPPLY_TABLE_KEY: &str =
"0x619dc29a0aac8fa146714058e8dd6d2d0f3bdf5f6331907bf91f3acd81e6935";

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct CoinSupply {
pub txn_version: i64,
pub coin_type_hash: String,
pub coin_type: String,
pub supply: String, // it is a string representation of the u128
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
}

impl NamedTable for CoinSupply {
const TABLE_NAME: &'static str = "coin_supply";
}

impl HasVersion for CoinSupply {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for CoinSupply {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl CoinSupply {
/// Currently only supports aptos_coin. Aggregator table detail is in CoinInfo which for aptos coin appears during genesis.
/// We query for the aggregator table details (handle and key) once upon indexer initiation and use it to fetch supply.
pub fn from_write_table_item(
write_table_item: &WriteTableItem,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<Self>> {
if let Some(data) = &write_table_item.data {
// Return early if not aggregator table type
if !(data.key_type == "address" && data.value_type == "u128") {
return Ok(None);
}
// Return early if not aggregator table handle
if write_table_item.handle.as_str() != APTOS_COIN_SUPPLY_TABLE_HANDLE {
return Ok(None);
}

// Convert to TableItem model. Some fields are just placeholders
let (table_item_model, _) =
TableItem::from_write_table_item(write_table_item, 0, txn_version, 0);

// Return early if not aptos coin aggregator key
let table_key = table_item_model.decoded_key.as_str().unwrap();
if table_key != APTOS_COIN_SUPPLY_TABLE_KEY {
return Ok(None);
}
// Everything matches. Get the coin supply
let supply = table_item_model
.decoded_value
.as_ref()
.unwrap()
.as_str()
.unwrap()
.parse::<BigDecimal>()
.context(format!(
"cannot parse string as u128: {:?}, version {}",
table_item_model.decoded_value.as_ref(),
txn_version
))?;
return Ok(Some(Self {
txn_version,
coin_type_hash: hash_str(APTOS_COIN_TYPE_STR),
coin_type: APTOS_COIN_TYPE_STR.to_string(),
supply: supply.to_string(),
block_timestamp: txn_timestamp,
}));
}
Ok(None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
db::common::models::{
coin_models::coin_utils::{CoinInfoType, CoinResource},
fungible_asset_models::{
v2_fungible_asset_activities::EventToCoinType,
v2_fungible_asset_balances::{
get_primary_fungible_store_address, CurrentFungibleAssetBalance,
},
v2_fungible_asset_utils::FungibleAssetStore,
},
object_models::v2_object_utils::ObjectAggregatedDataMapping,
token_v2_models::v2_token_utils::TokenStandard,
},
utils::util::standardize_address,
};
use ahash::AHashMap;
use allocative_derive::Allocative;
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
use bigdecimal::{BigDecimal, Zero};
use field_count::FieldCount;
use parquet_derive::ParquetRecordWriter;
use serde::{Deserialize, Serialize};

#[derive(
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
)]
pub struct FungibleAssetBalance {
pub txn_version: i64,
pub write_set_change_index: i64,
pub storage_id: String,
pub owner_address: String,
pub asset_type: String,
pub is_primary: bool,
pub is_frozen: bool,
pub amount: String, // it is a string representation of the u128
#[allocative(skip)]
pub block_timestamp: chrono::NaiveDateTime,
pub token_standard: String,
}

impl NamedTable for FungibleAssetBalance {
const TABLE_NAME: &'static str = "fungible_asset_balances";
}

impl HasVersion for FungibleAssetBalance {
fn version(&self) -> i64 {
self.txn_version
}
}

impl GetTimeStamp for FungibleAssetBalance {
fn get_timestamp(&self) -> chrono::NaiveDateTime {
self.block_timestamp
}
}

impl FungibleAssetBalance {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do i understand correctly that this code is duplicated but with a different output(FungibleAssetBalance in Parquet vs FungibleAssetBalance in disiel)? can we abstract the transformation?

/// Basically just need to index FA Store, but we'll need to look up FA metadata
pub async fn get_v2_from_write_resource(
write_resource: &WriteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
object_metadatas: &ObjectAggregatedDataMapping,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance)>> {
if let Some(inner) = &FungibleAssetStore::from_write_resource(write_resource, txn_version)?
{
let storage_id = standardize_address(write_resource.address.as_str());
// Need to get the object of the store
if let Some(object_data) = object_metadatas.get(&storage_id) {
let object = &object_data.object.object_core;
let owner_address = object.get_owner_address();
let asset_type = inner.metadata.get_reference_address();
let is_primary = Self::is_primary(&owner_address, &asset_type, &storage_id);

let concurrent_balance = object_data
.concurrent_fungible_asset_balance
.as_ref()
.map(|concurrent_fungible_asset_balance| {
concurrent_fungible_asset_balance.balance.value.clone()
});

let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: asset_type.clone(),
is_primary,
is_frozen: inner.frozen,
amount: concurrent_balance
.clone()
.unwrap_or_else(|| inner.balance.clone())
.to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V2.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: asset_type.clone(),
is_primary,
is_frozen: inner.frozen,
amount: concurrent_balance.unwrap_or_else(|| inner.balance.clone()),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V2.to_string(),
};
return Ok(Some((coin_balance, current_coin_balance)));
}
}

Ok(None)
}

pub fn get_v1_from_delete_resource(
delete_resource: &DeleteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance, EventToCoinType)>> {
if let Some(CoinResource::CoinStoreDeletion) =
&CoinResource::from_delete_resource(delete_resource, txn_version)?
{
let coin_info_type = &CoinInfoType::from_move_type(
&delete_resource.r#type.as_ref().unwrap().generic_type_params[0],
delete_resource.type_str.as_ref(),
txn_version,
);
if let Some(coin_type) = coin_info_type.get_coin_type_below_max() {
let owner_address = standardize_address(delete_resource.address.as_str());
let storage_id =
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());
let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: false,
amount: "0".to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: false,
amount: BigDecimal::zero(),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
return Ok(Some((
coin_balance,
current_coin_balance,
AHashMap::default(),
)));
}
}
Ok(None)
}

/// Getting coin balances from resources for v1
/// If the fully qualified coin type is too long (currently 1000 length), we exclude from indexing
pub fn get_v1_from_write_resource(
write_resource: &WriteResource,
write_set_change_index: i64,
txn_version: i64,
txn_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, CurrentFungibleAssetBalance, EventToCoinType)>> {
if let Some(CoinResource::CoinStoreResource(inner)) =
&CoinResource::from_write_resource(write_resource, txn_version)?
{
let coin_info_type = &CoinInfoType::from_move_type(
&write_resource.r#type.as_ref().unwrap().generic_type_params[0],
write_resource.type_str.as_ref(),
txn_version,
);
if let Some(coin_type) = coin_info_type.get_coin_type_below_max() {
let owner_address = standardize_address(write_resource.address.as_str());
let storage_id =
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());
let coin_balance = Self {
txn_version,
write_set_change_index,
storage_id: storage_id.clone(),
owner_address: owner_address.clone(),
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: inner.frozen,
amount: inner.coin.value.clone().to_string(),
block_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let current_coin_balance = CurrentFungibleAssetBalance {
storage_id,
owner_address,
asset_type: coin_type.clone(),
is_primary: true,
is_frozen: inner.frozen,
amount: inner.coin.value.clone(),
last_transaction_version: txn_version,
last_transaction_timestamp: txn_timestamp,
token_standard: TokenStandard::V1.to_string(),
};
let event_to_coin_mapping: EventToCoinType = AHashMap::from([
(
inner.withdraw_events.guid.id.get_standardized(),
coin_type.clone(),
),
(inner.deposit_events.guid.id.get_standardized(), coin_type),
]);
return Ok(Some((
coin_balance,
current_coin_balance,
event_to_coin_mapping,
)));
}
}
Ok(None)
}

/// Primary store address are derived from the owner address and object address in this format: sha3_256([source | object addr | 0xFC]).
/// This function expects the addresses to have length 66
pub fn is_primary(
owner_address: &str,
metadata_address: &str,
fungible_store_address: &str,
) -> bool {
fungible_store_address
== get_primary_fungible_store_address(owner_address, metadata_address).unwrap()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn get_paired_metadata_address(coin_type_name: &str) -> String {
}
}

fn get_primary_fungible_store_address(
pub fn get_primary_fungible_store_address(
owner_address: &str,
metadata_address: &str,
) -> anyhow::Result<String> {
Expand Down
6 changes: 1 addition & 5 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
mut fungible_asset_balances,
mut current_fungible_asset_balances,
current_unified_fungible_asset_balances,
mut coin_supply,
coin_supply,
) = parse_v2_coin(&transactions).await;

let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
Expand All @@ -389,10 +389,6 @@ impl ProcessorTrait for FungibleAssetProcessor {
current_fungible_asset_balances.clear();
}

if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) {
coin_supply.clear();
}

let tx_result = insert_to_db(
self.get_pool(),
self.name(),
Expand Down
Loading
Loading