From 227fbab3cfcbf442ee38ad3b872cb164d91f79d5 Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Wed, 21 Sep 2022 01:34:26 -0700 Subject: [PATCH] add current token datas --- .../2022-09-04-194128_add_token_data/up.sql | 72 ++-- .../down.sql | 4 + .../up.sql | 67 +++ crates/indexer/src/database.rs | 48 ++- crates/indexer/src/indexer/tailer.rs | 28 +- .../src/indexer/transaction_processor.rs | 1 + crates/indexer/src/models/collection_datas.rs | 174 ++++---- crates/indexer/src/models/mod.rs | 1 + crates/indexer/src/models/token_datas.rs | 256 +++++------- crates/indexer/src/models/token_utils.rs | 160 ++++++++ crates/indexer/src/models/tokens.rs | 386 ++++++++++-------- .../src/processors/default_processor.rs | 10 + .../indexer/src/processors/token_processor.rs | 270 ++++++++++-- crates/indexer/src/schema.rs | 106 +++-- crates/indexer/src/util.rs | 7 + testsuite/smoke-test/src/indexer.rs | 27 +- 16 files changed, 1061 insertions(+), 556 deletions(-) create mode 100644 crates/indexer/migrations/2022-09-20-055651_add_current_token_data/down.sql create mode 100644 crates/indexer/migrations/2022-09-20-055651_add_current_token_data/up.sql create mode 100644 crates/indexer/src/models/token_utils.rs diff --git a/crates/indexer/migrations/2022-09-04-194128_add_token_data/up.sql b/crates/indexer/migrations/2022-09-04-194128_add_token_data/up.sql index 2b5aa8fbc9f87..22183c5982da3 100644 --- a/crates/indexer/migrations/2022-09-04-194128_add_token_data/up.sql +++ b/crates/indexer/migrations/2022-09-04-194128_add_token_data/up.sql @@ -1,63 +1,61 @@ -- Your SQL goes here -- tracks tokens per version CREATE TABLE tokens ( - creator_address VARCHAR(100) NOT NULL, - collection_name_hash VARCHAR(64) NOT NULL, - name_hash VARCHAR(64) NOT NULL, - collection_name TEXT NOT NULL, - name TEXT NOT NULL, + -- sha256 of creator + collection_name + name + token_data_id_hash VARCHAR(64) NOT NULL, property_version NUMERIC NOT NULL, transaction_version BIGINT NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + name VARCHAR(128) NOT NULL, token_properties jsonb NOT NULL, inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), -- Constraints PRIMARY KEY ( - creator_address, - collection_name_hash, - name_hash, + token_data_id_hash, property_version, transaction_version ) ); +CREATE INDEX token_crea_cn_name_index ON tokens (creator_address, collection_name, name); CREATE INDEX token_insat_index ON tokens (inserted_at); -- tracks who owns tokens at certain version CREATE TABLE token_ownerships ( - creator_address VARCHAR(66) NOT NULL, - collection_name_hash VARCHAR(64) NOT NULL, - name_hash VARCHAR(64) NOT NULL, - collection_name TEXT NOT NULL, - name TEXT NOT NULL, + -- sha256 of creator + collection_name + name + token_data_id_hash VARCHAR(64) NOT NULL, property_version NUMERIC NOT NULL, transaction_version BIGINT NOT NULL, + table_handle VARCHAR(66) NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + name VARCHAR(128) NOT NULL, owner_address VARCHAR(66), amount NUMERIC NOT NULL, - table_handle VARCHAR(66) NOT NULL, table_type TEXT, inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), -- Constraints PRIMARY KEY ( - creator_address, - collection_name_hash, - name_hash, + token_data_id_hash, property_version, transaction_version, table_handle ) ); -CREATE INDEX to_owner ON token_ownerships (owner_address); +CREATE INDEX to_owner_index ON token_ownerships (owner_address); +CREATE INDEX to_crea_cn_name_index ON token_ownerships (creator_address, collection_name, name); CREATE INDEX to_insat_index ON token_ownerships (inserted_at); -- tracks token metadata CREATE TABLE token_datas ( - creator_address VARCHAR(66) NOT NULL, - collection_name_hash VARCHAR(64) NOT NULL, - name_hash VARCHAR(64) NOT NULL, - collection_name TEXT NOT NULL, - name TEXT NOT NULL, + -- sha256 of creator + collection_name + name + token_data_id_hash VARCHAR(64) NOT NULL, transaction_version BIGINT NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + name VARCHAR(128) NOT NULL, maximum NUMERIC NOT NULL, supply NUMERIC NOT NULL, largest_property_version NUMERIC NOT NULL, - metadata_uri TEXT NOT NULL, + metadata_uri VARCHAR(512) NOT NULL, payee_address VARCHAR(66) NOT NULL, royalty_points_numerator NUMERIC NOT NULL, royalty_points_denominator NUMERIC NOT NULL, @@ -69,22 +67,19 @@ CREATE TABLE token_datas ( default_properties jsonb NOT NULL, inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), -- Constraints - PRIMARY KEY ( - creator_address, - collection_name_hash, - name_hash, - transaction_version - ) + PRIMARY KEY (token_data_id_hash, transaction_version) ); +CREATE INDEX td_crea_cn_name_index ON token_datas (creator_address, collection_name, name); CREATE INDEX td_insat_index ON token_datas (inserted_at); -- tracks collection metadata CREATE TABLE collection_datas ( + -- sha256 of creator + collection_name + collection_data_id_hash VARCHAR(64) NOT NULL, + transaction_version BIGINT NOT NULL, creator_address VARCHAR(66) NOT NULL, - collection_name_hash VARCHAR(64) NOT NULL, - collection_name TEXT NOT NULL, + collection_name VARCHAR(128) NOT NULL, description TEXT NOT NULL, - transaction_version BIGINT NOT NULL, - metadata_uri TEXT NOT NULL, + metadata_uri VARCHAR(512) NOT NULL, supply NUMERIC NOT NULL, maximum NUMERIC NOT NULL, maximum_mutable BOOLEAN NOT NULL, @@ -92,10 +87,7 @@ CREATE TABLE collection_datas ( description_mutable BOOLEAN NOT NULL, inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), -- Constraints - PRIMARY KEY ( - creator_address, - collection_name_hash, - transaction_version - ) + PRIMARY KEY (collection_data_id_hash, transaction_version) ); -CREATE INDEX cd_insat_index ON collection_datas (inserted_at); +CREATE INDEX cd_crea_cn_index ON collection_datas (creator_address, collection_name); +CREATE INDEX cd_insat_index ON collection_datas (inserted_at); \ No newline at end of file diff --git a/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/down.sql b/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/down.sql new file mode 100644 index 0000000000000..fac2b2c3d5037 --- /dev/null +++ b/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/down.sql @@ -0,0 +1,4 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS current_token_ownerships; +DROP TABLE IF EXISTS current_token_datas; +DROP TABLE IF EXISTS current_collection_datas; \ No newline at end of file diff --git a/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/up.sql b/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/up.sql new file mode 100644 index 0000000000000..10dd6e595c660 --- /dev/null +++ b/crates/indexer/migrations/2022-09-20-055651_add_current_token_data/up.sql @@ -0,0 +1,67 @@ +-- Your SQL goes here +-- tracks tokens in owner's tokenstore +CREATE TABLE current_token_ownerships ( + -- sha256 of creator + collection_name + name + token_data_id_hash VARCHAR(64) NOT NULL, + property_version NUMERIC NOT NULL, + owner_address VARCHAR(66) NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + name VARCHAR(128) NOT NULL, + amount NUMERIC NOT NULL, + token_properties jsonb NOT NULL, + last_transaction_version BIGINT NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW(), + -- Constraints + PRIMARY KEY ( + token_data_id_hash, + property_version, + owner_address + ) +); +CREATE INDEX curr_to_crea_cn_name_index ON current_token_ownerships (creator_address, collection_name, name); +CREATE INDEX curr_to_owner_index ON current_token_ownerships (owner_address); +CREATE INDEX curr_to_insat_index ON current_token_ownerships (inserted_at); +-- tracks latest token metadata +CREATE TABLE current_token_datas ( + -- sha256 of creator + collection_name + name + token_data_id_hash VARCHAR(64) UNIQUE PRIMARY KEY NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + name VARCHAR(128) NOT NULL, + maximum NUMERIC NOT NULL, + supply NUMERIC NOT NULL, + largest_property_version NUMERIC NOT NULL, + metadata_uri VARCHAR(512) NOT NULL, + payee_address VARCHAR(66) NOT NULL, + royalty_points_numerator NUMERIC NOT NULL, + royalty_points_denominator NUMERIC NOT NULL, + maximum_mutable BOOLEAN NOT NULL, + uri_mutable BOOLEAN NOT NULL, + description_mutable BOOLEAN NOT NULL, + properties_mutable BOOLEAN NOT NULL, + royalty_mutable BOOLEAN NOT NULL, + default_properties jsonb NOT NULL, + last_transaction_version BIGINT NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX curr_td_crea_cn_name_index ON current_token_datas (creator_address, collection_name, name); +CREATE INDEX curr_td_insat_index ON current_token_datas (inserted_at); +-- tracks latest collection metadata +CREATE TABLE current_collection_datas ( + -- sha256 of creator + collection_name + collection_data_id_hash VARCHAR(64) UNIQUE PRIMARY KEY NOT NULL, + creator_address VARCHAR(66) NOT NULL, + collection_name VARCHAR(128) NOT NULL, + description TEXT NOT NULL, + metadata_uri VARCHAR(512) NOT NULL, + supply NUMERIC NOT NULL, + maximum NUMERIC NOT NULL, + maximum_mutable BOOLEAN NOT NULL, + uri_mutable BOOLEAN NOT NULL, + description_mutable BOOLEAN NOT NULL, + last_transaction_version BIGINT NOT NULL, + inserted_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX curr_cd_crea_cn_index ON current_collection_datas (creator_address, collection_name); +CREATE INDEX curr_cd_insat_index ON current_collection_datas (inserted_at); \ No newline at end of file diff --git a/crates/indexer/src/database.rs b/crates/indexer/src/database.rs index a9d4cdaa838a9..368a1e75b50fd 100644 --- a/crates/indexer/src/database.rs +++ b/crates/indexer/src/database.rs @@ -5,15 +5,24 @@ #![allow(clippy::extra_unused_lifetimes)] use crate::util::remove_null_bytes; use diesel::{ - pg::PgConnection, + pg::{Pg, PgConnection}, + query_builder::{AstPass, Query, QueryFragment}, r2d2::{ConnectionManager, PoolError, PooledConnection}, - RunQueryDsl, + QueryResult, RunQueryDsl, }; use std::{cmp::min, sync::Arc}; pub type PgPool = diesel::r2d2::Pool>; pub type PgDbPool = Arc; pub type PgPoolConnection = PooledConnection>; +#[derive(QueryId)] +/// Using this will append a where clause at the end of the string upsert function, e.g. +/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version" +/// This is needed when we want to maintain a table with only the latest state +pub struct UpsertFilterLatestTransactionQuery { + query: T, + where_clause: Option<&'static str>, +} pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX; @@ -60,19 +69,50 @@ pub fn execute_with_better_error< >( conn: &mut PgConnection, query: diesel::query_builder::InsertStatement, + mut additional_where_clause: Option<&'static str>, ) -> diesel::QueryResult where ::FromClause: diesel::query_builder::QueryFragment, { - let debug = diesel::debug_query::(&query).to_string(); + let original_query = diesel::debug_query::(&query).to_string(); + // This is needed because if we don't insert any row, then diesel makes a call like this + // SELECT 1 FROM TABLE WHERE 1=0 + if original_query.to_lowercase().contains("where") { + additional_where_clause = None; + } + let final_query = UpsertFilterLatestTransactionQuery { + query, + where_clause: additional_where_clause, + }; + let debug = diesel::debug_query::(&final_query).to_string(); aptos_logger::debug!("Executing query: {:?}", debug); - let res = query.execute(conn); + let res = final_query.execute(conn); if let Err(ref e) = res { aptos_logger::warn!("Error running query: {:?}\n{}", e, debug); } res } +/// Section below is required to modify the query. +impl Query for UpsertFilterLatestTransactionQuery { + type SqlType = T::SqlType; +} + +impl RunQueryDsl for UpsertFilterLatestTransactionQuery {} + +impl QueryFragment for UpsertFilterLatestTransactionQuery +where + T: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + self.query.walk_ast(out.reborrow())?; + if let Some(w) = self.where_clause { + out.push_sql(w); + } + Ok(()) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/indexer/src/indexer/tailer.rs b/crates/indexer/src/indexer/tailer.rs index 921f968849ff2..f972a4cd69322 100644 --- a/crates/indexer/src/indexer/tailer.rs +++ b/crates/indexer/src/indexer/tailer.rs @@ -105,6 +105,7 @@ impl Tailer { diesel::insert_into(ledger_infos::table).values(LedgerInfo { chain_id: new_chain_id, }), + None, ) .context(r#"Error updating chain_id!"#) .map(|_| new_chain_id as u64) @@ -316,28 +317,13 @@ mod test { } pub fn wipe_database(conn: &mut PgPoolConnection) { - for table in [ - "collection_datas", - "tokens", - "token_datas", - "token_ownerships", - "signatures", - "move_modules", - "move_resources", - "table_items", - "table_metadatas", - "write_set_changes", - "events", - "user_transactions", - "block_metadata_transactions", - "transactions", - "processor_statuses", - "ledger_infos", - "__diesel_schema_migrations", + for command in [ + "DROP SCHEMA public CASCADE", + "CREATE SCHEMA public", + "GRANT ALL ON SCHEMA public TO postgres", + "GRANT ALL ON SCHEMA public TO public", ] { - diesel::sql_query(format!("DROP TABLE IF EXISTS {} CASCADE", table)) - .execute(conn) - .unwrap(); + diesel::sql_query(command).execute(conn).unwrap(); } } diff --git a/crates/indexer/src/indexer/transaction_processor.rs b/crates/indexer/src/indexer/transaction_processor.rs index 248f397e4396e..5471bc0e4f9cc 100644 --- a/crates/indexer/src/indexer/transaction_processor.rs +++ b/crates/indexer/src/indexer/transaction_processor.rs @@ -160,6 +160,7 @@ pub trait TransactionProcessor: Send + Sync + Debug { dsl::details.eq(excluded(dsl::details)), dsl::last_updated.eq(excluded(dsl::last_updated)), )), + None, ) .expect("Error updating Processor Status!"); } diff --git a/crates/indexer/src/models/collection_datas.rs b/crates/indexer/src/models/collection_datas.rs index cd346c06cbccc..9f93840abea6a 100644 --- a/crates/indexer/src/models/collection_datas.rs +++ b/crates/indexer/src/models/collection_datas.rs @@ -6,8 +6,8 @@ #![allow(clippy::unused_unit)] use crate::{ - schema::collection_datas, - util::{hash_str, u64_to_bigdecimal}, + schema::{collection_datas, current_collection_datas}, + util::{hash_str, truncate_str}, }; use anyhow::Context; use aptos_api_types::WriteTableItem as APIWriteTableItem; @@ -15,17 +15,20 @@ use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; -use super::tokens::{TableHandleToOwner, TableMetadataForToken}; +use super::{ + token_utils::{CollectionDataIdType, TokenWriteSet}, + tokens::{TableHandleToOwner, TableMetadataForToken}, +}; #[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] -#[diesel(primary_key(creator_address, collection_name_hash, transaction_version))] +#[diesel(primary_key(collection_data_id_hash, transaction_version))] #[diesel(table_name = collection_datas)] pub struct CollectionData { + pub collection_data_id_hash: String, + pub transaction_version: i64, pub creator_address: String, - pub collection_name_hash: String, pub collection_name: String, pub description: String, - pub transaction_version: i64, pub metadata_uri: String, pub supply: BigDecimal, pub maximum: BigDecimal, @@ -36,95 +39,90 @@ pub struct CollectionData { pub inserted_at: chrono::NaiveDateTime, } +#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] +#[diesel(primary_key(collection_data_id_hash))] +#[diesel(table_name = current_collection_datas)] +pub struct CurrentCollectionData { + pub collection_data_id_hash: String, + pub creator_address: String, + pub collection_name: String, + pub description: String, + pub metadata_uri: String, + pub supply: BigDecimal, + pub maximum: BigDecimal, + pub maximum_mutable: bool, + pub uri_mutable: bool, + pub description_mutable: bool, + pub last_transaction_version: i64, + // Default time columns + pub inserted_at: chrono::NaiveDateTime, +} + impl CollectionData { pub fn from_write_table_item( table_item: &APIWriteTableItem, txn_version: i64, table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let table_item_data = table_item.data.as_ref().unwrap(); - if table_item_data.value_type != "0x3::token::CollectionData" { - return Ok(None); - } - let value = &table_item_data.value; - let table_handle = table_item.handle.to_string(); - let creator_address = table_handle_to_owner - .get(&TableMetadataForToken::standardize_handle(&table_handle)) - .map(|table_metadata| table_metadata.owner_address.clone()) - .context(format!( - "version {} failed! collection creator resource was missing, table handle {} not in map {:?}", - txn_version, TableMetadataForToken::standardize_handle(&table_handle), table_handle_to_owner, - ))?; - let collection_name = value["name"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! name missing from collection {:?}", - txn_version, value - ))?; - let collection_name_hash = hash_str(&collection_name); + let maybe_collection_data = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::CollectionData(inner)) => Some(inner), + _ => None, + }; + if let Some(collection_data) = maybe_collection_data { + let table_handle = table_item.handle.to_string(); + let creator_address = table_handle_to_owner + .get(&TableMetadataForToken::standardize_handle(&table_handle)) + .map(|table_metadata| table_metadata.owner_address.clone()) + .context(format!( + "version {} failed! collection creator resource was missing, table handle {} not in map {:?}", + txn_version, TableMetadataForToken::standardize_handle(&table_handle), table_handle_to_owner, + ))?; + let collection_data_id = CollectionDataIdType { + creator: creator_address, + name: collection_data.name, + }; + let collection_data_id_hash = hash_str(&collection_data_id.to_string()); + let collection_name = truncate_str(&collection_data_id.name, 128).to_string(); + let metadata_uri = truncate_str(&collection_data.uri, 512).to_string(); - Ok(Some(Self { - collection_name, - creator_address, - collection_name_hash, - description: value["description"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! description missing from collection {:?}", - txn_version, value - ))?, - transaction_version: txn_version, - metadata_uri: value["uri"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! uri missing from collection {:?}", - txn_version, value - ))?, - supply: value["supply"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! supply missing from collection {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse supply {:?}", - txn_version, value["supply"] - ))?, - maximum: value["maximum"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! maximum missing from collection {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse maximum {:?}", - txn_version, value["maximum"] - ))?, - maximum_mutable: value["mutability_config"]["maximum"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.maximum missing {:?}", - txn_version, value - ))?, - uri_mutable: value["mutability_config"]["uri"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.uri missing {:?}", - txn_version, value - ))?, - description_mutable: value["mutability_config"]["description"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.description missing {:?}", - txn_version, value - ))?, - inserted_at: chrono::Utc::now().naive_utc(), - })) + Ok(Some(( + Self { + collection_data_id_hash: collection_data_id_hash.clone(), + collection_name: collection_name.clone(), + creator_address: collection_data_id.creator.clone(), + description: collection_data.description.clone(), + transaction_version: txn_version, + metadata_uri: metadata_uri.clone(), + supply: collection_data.supply.clone(), + maximum: collection_data.maximum.clone(), + maximum_mutable: collection_data.mutability_config.maximum, + uri_mutable: collection_data.mutability_config.uri, + description_mutable: collection_data.mutability_config.description, + inserted_at: chrono::Utc::now().naive_utc(), + }, + CurrentCollectionData { + collection_data_id_hash, + collection_name, + creator_address: collection_data_id.creator, + description: collection_data.description, + metadata_uri, + supply: collection_data.supply, + maximum: collection_data.maximum, + maximum_mutable: collection_data.mutability_config.maximum, + uri_mutable: collection_data.mutability_config.uri, + description_mutable: collection_data.mutability_config.description, + last_transaction_version: txn_version, + inserted_at: chrono::Utc::now().naive_utc(), + }, + ))) + } else { + Ok(None) + } } } diff --git a/crates/indexer/src/models/mod.rs b/crates/indexer/src/models/mod.rs index 39f3356b61009..1af2802a69ab8 100644 --- a/crates/indexer/src/models/mod.rs +++ b/crates/indexer/src/models/mod.rs @@ -11,6 +11,7 @@ pub mod move_tables; pub mod processor_statuses; pub mod signatures; pub mod token_datas; +pub mod token_utils; pub mod tokens; pub mod transactions; pub mod user_transactions; diff --git a/crates/indexer/src/models/token_datas.rs b/crates/indexer/src/models/token_datas.rs index c7541f07e19e0..072d920839bf4 100644 --- a/crates/indexer/src/models/token_datas.rs +++ b/crates/indexer/src/models/token_datas.rs @@ -5,9 +5,10 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] +use super::token_utils::TokenWriteSet; use crate::{ - schema::token_datas, - util::{hash_str, u64_to_bigdecimal}, + schema::{current_token_datas, token_datas}, + util::{hash_str, truncate_str}, }; use anyhow::Context; use aptos_api_types::WriteTableItem as APIWriteTableItem; @@ -16,15 +17,14 @@ use field_count::FieldCount; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] -#[diesel(primary_key(creator_address, collection_name_hash, name_hash, transaction_version))] +#[diesel(primary_key(token_data_id_hash, transaction_version))] #[diesel(table_name = token_datas)] pub struct TokenData { + pub token_data_id_hash: String, + pub transaction_version: i64, pub creator_address: String, - pub collection_name_hash: String, - pub name_hash: String, pub collection_name: String, pub name: String, - pub transaction_version: i64, pub maximum: BigDecimal, pub supply: BigDecimal, pub largest_property_version: BigDecimal, @@ -42,165 +42,115 @@ pub struct TokenData { pub inserted_at: chrono::NaiveDateTime, } -pub struct TokenDataId { +#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] +#[diesel(primary_key(token_data_id_hash))] +#[diesel(table_name = current_token_datas)] +pub struct CurrentTokenData { + pub token_data_id_hash: String, pub creator_address: String, pub collection_name: String, pub name: String, + pub maximum: bigdecimal::BigDecimal, + pub supply: bigdecimal::BigDecimal, + pub largest_property_version: bigdecimal::BigDecimal, + pub metadata_uri: String, + pub payee_address: String, + pub royalty_points_numerator: bigdecimal::BigDecimal, + pub royalty_points_denominator: bigdecimal::BigDecimal, + pub maximum_mutable: bool, + pub uri_mutable: bool, + pub description_mutable: bool, + pub properties_mutable: bool, + pub royalty_mutable: bool, + pub default_properties: serde_json::Value, + pub last_transaction_version: i64, + // Default time columns + pub inserted_at: chrono::NaiveDateTime, } impl TokenData { pub fn from_write_table_item( table_item: &APIWriteTableItem, txn_version: i64, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let table_item_data = table_item.data.as_ref().unwrap(); - if table_item_data.value_type != "0x3::token::TokenData" { - return Ok(None); - } - let key = &table_item_data.key; - let value = &table_item_data.value; - let token_data_id = Self::get_token_data_id_from_table_item_key(key, txn_version)?; - let collection_name_hash = hash_str(&token_data_id.collection_name); - let name_hash = hash_str(&token_data_id.name); + let maybe_token_data = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::TokenData(inner)) => Some(inner), + _ => None, + }; - Ok(Some(Self { - creator_address: token_data_id.creator_address, - collection_name_hash, - name_hash, - collection_name: token_data_id.collection_name, - name: token_data_id.name, - transaction_version: txn_version, - maximum: value["maximum"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! maximum missing from token data {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse maximum {:?}", - txn_version, value["maximum"] - ))?, - supply: value["supply"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! supply missing from token data {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse supply {:?}", - txn_version, value["maximum"] - ))?, - largest_property_version: value["largest_property_version"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! largest_property_version missing from token data {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse largest_property_version {:?}", - txn_version, value["maximum"] - ))?, - metadata_uri: value["uri"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! uri missing from token data {:?}", - txn_version, value - ))?, - payee_address: value["royalty"]["payee_address"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! royalty.payee_address missing {:?}", - txn_version, value - ))?, - royalty_points_numerator: value["royalty"]["royalty_points_numerator"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! royalty.royalty_points_numerator missing {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse royalty_points_numerator {:?}", - txn_version, value["royalty"]["royalty_points_numerator"] - ))?, - royalty_points_denominator: value["royalty"]["royalty_points_denominator"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! royalty.royalty_points_denominator missing {:?}", - txn_version, value - ))? - .context(format!( - "version {} failed! failed to parse royalty_points_denominator {:?}", - txn_version, value["royalty"]["royalty_points_denominator"] - ))?, - maximum_mutable: value["mutability_config"]["maximum"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.maximum missing {:?}", - txn_version, value - ))?, - uri_mutable: value["mutability_config"]["uri"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.uri missing {:?}", - txn_version, value - ))?, - description_mutable: value["mutability_config"]["description"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.description missing {:?}", - txn_version, value - ))?, - properties_mutable: value["mutability_config"]["properties"].as_bool().context( - format!( - "version {} failed! mutability_config.properties missing {:?}", - txn_version, value - ), - )?, - royalty_mutable: value["mutability_config"]["royalty"] - .as_bool() - .context(format!( - "version {} failed! mutability_config.royalty missing {:?}", - txn_version, value - ))?, - default_properties: value["default_properties"].clone(), - inserted_at: chrono::Utc::now().naive_utc(), - })) - } + if let Some(token_data) = maybe_token_data { + let token_data_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenDataId(inner)) => Some(inner), + _ => None, + } + .context(format!( + "Could not get token data id from table item key_type: {}, key: {:?} version: {}", + table_item_data.key_type, table_item_data.key, txn_version + ))?; + let token_data_id_hash = hash_str(&token_data_id.to_string()); + let collection_name = truncate_str(&token_data_id.collection, 128).to_string(); + let name = truncate_str(&token_data_id.name, 128).to_string(); + let metadata_uri = truncate_str(&token_data.uri, 512).to_string(); - fn get_token_data_id_from_table_item_key( - key: &serde_json::Value, - txn_version: i64, - ) -> anyhow::Result { - Ok(TokenDataId { - creator_address: key["creator"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! creator missing from key {:?}", - txn_version, key - ))?, - collection_name: key["collection"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! collection missing from key {:?}", - txn_version, key - ))?, - name: key["name"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! name missing from key {:?}", - txn_version, key - ))?, - }) + Ok(Some(( + Self { + token_data_id_hash: token_data_id_hash.clone(), + creator_address: token_data_id.creator.clone(), + collection_name: collection_name.clone(), + name: name.clone(), + transaction_version: txn_version, + maximum: token_data.maximum.clone(), + supply: token_data.supply.clone(), + largest_property_version: token_data.largest_property_version.clone(), + metadata_uri: metadata_uri.clone(), + payee_address: token_data.royalty.payee_address.clone(), + royalty_points_numerator: token_data.royalty.royalty_points_numerator.clone(), + royalty_points_denominator: token_data + .royalty + .royalty_points_denominator + .clone(), + maximum_mutable: token_data.mutability_config.maximum, + uri_mutable: token_data.mutability_config.uri, + description_mutable: token_data.mutability_config.description, + properties_mutable: token_data.mutability_config.properties, + royalty_mutable: token_data.mutability_config.royalty, + default_properties: token_data.default_properties.clone(), + inserted_at: chrono::Utc::now().naive_utc(), + }, + CurrentTokenData { + token_data_id_hash, + creator_address: token_data_id.creator, + collection_name, + name, + maximum: token_data.maximum, + supply: token_data.supply, + largest_property_version: token_data.largest_property_version, + metadata_uri, + payee_address: token_data.royalty.payee_address, + royalty_points_numerator: token_data.royalty.royalty_points_numerator, + royalty_points_denominator: token_data.royalty.royalty_points_denominator, + maximum_mutable: token_data.mutability_config.maximum, + uri_mutable: token_data.mutability_config.uri, + description_mutable: token_data.mutability_config.description, + properties_mutable: token_data.mutability_config.properties, + royalty_mutable: token_data.mutability_config.royalty, + default_properties: token_data.default_properties, + last_transaction_version: txn_version, + inserted_at: chrono::Utc::now().naive_utc(), + }, + ))) + } else { + Ok(None) + } } } diff --git a/crates/indexer/src/models/token_utils.rs b/crates/indexer/src/models/token_utils.rs new file mode 100644 index 0000000000000..fce090994c9ba --- /dev/null +++ b/crates/indexer/src/models/token_utils.rs @@ -0,0 +1,160 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] + +use anyhow::Context; +use aptos_api_types::deserialize_from_string; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Formatter}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenDataIdType { + pub creator: String, + pub collection: String, + pub name: String, +} + +impl fmt::Display for TokenDataIdType { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}::{}::{}", self.creator, self.collection, self.name) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CollectionDataIdType { + pub creator: String, + pub name: String, +} + +impl fmt::Display for CollectionDataIdType { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}::{}", self.creator, self.name) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenIdType { + pub token_data_id: TokenDataIdType, + #[serde(deserialize_with = "deserialize_from_string")] + pub property_version: bigdecimal::BigDecimal, +} + +impl fmt::Display for TokenIdType { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}::{}", self.token_data_id, self.property_version) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenDataType { + pub default_properties: serde_json::Value, + pub description: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub largest_property_version: bigdecimal::BigDecimal, + #[serde(deserialize_with = "deserialize_from_string")] + pub maximum: bigdecimal::BigDecimal, + pub mutability_config: TokenDataMutabilityConfigType, + pub name: String, + pub royalty: RoyaltyType, + #[serde(deserialize_with = "deserialize_from_string")] + pub supply: bigdecimal::BigDecimal, + pub uri: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenDataMutabilityConfigType { + pub description: bool, + pub maximum: bool, + pub properties: bool, + pub royalty: bool, + pub uri: bool, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct RoyaltyType { + pub payee_address: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub royalty_points_denominator: bigdecimal::BigDecimal, + #[serde(deserialize_with = "deserialize_from_string")] + pub royalty_points_numerator: bigdecimal::BigDecimal, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct TokenType { + #[serde(deserialize_with = "deserialize_from_string")] + pub amount: bigdecimal::BigDecimal, + pub id: TokenIdType, + pub token_properties: serde_json::Value, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CollectionDataType { + pub description: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub maximum: bigdecimal::BigDecimal, + pub mutability_config: CollectionDataMutabilityConfigType, + pub name: String, + #[serde(deserialize_with = "deserialize_from_string")] + pub supply: bigdecimal::BigDecimal, + pub uri: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CollectionDataMutabilityConfigType { + pub description: bool, + pub maximum: bool, + pub uri: bool, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum TokenWriteSet { + TokenDataId(TokenDataIdType), + TokenId(TokenIdType), + TokenData(TokenDataType), + Token(TokenType), + CollectionData(CollectionDataType), +} + +impl TokenWriteSet { + pub fn from_table_item_type( + data_type: &str, + data: &serde_json::Value, + txn_version: i64, + ) -> anyhow::Result> { + match data_type { + "0x3::token::TokenDataId" => serde_json::from_value(data.clone()) + .map(|inner| Some(TokenWriteSet::TokenDataId(inner))) + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )), + "0x3::token::TokenId" => serde_json::from_value(data.clone()) + .map(|inner| Some(TokenWriteSet::TokenId(inner))) + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )), + "0x3::token::TokenData" => serde_json::from_value(data.clone()) + .map(|inner| Some(TokenWriteSet::TokenData(inner))) + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )), + "0x3::token::Token" => serde_json::from_value(data.clone()) + .map(|inner| Some(TokenWriteSet::Token(inner))) + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )), + "0x3::token::CollectionData" => serde_json::from_value(data.clone()) + .map(|inner| Some(TokenWriteSet::CollectionData(inner))) + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )), + _ => Ok(None), + } + } +} diff --git a/crates/indexer/src/models/tokens.rs b/crates/indexer/src/models/tokens.rs index 931e93bb6cc65..8797bda4131ae 100644 --- a/crates/indexer/src/models/tokens.rs +++ b/crates/indexer/src/models/tokens.rs @@ -8,13 +8,14 @@ use std::collections::HashMap; use super::{ - collection_datas::CollectionData, + collection_datas::{CollectionData, CurrentCollectionData}, move_resources::MoveResource, - token_datas::{TokenData, TokenDataId}, + token_datas::{CurrentTokenData, TokenData}, + token_utils::TokenWriteSet, }; use crate::{ - schema::{token_ownerships, tokens}, - util::{ensure_not_negative, hash_str, u64_to_bigdecimal}, + schema::{current_token_ownerships, token_ownerships, tokens}, + util::{ensure_not_negative, hash_str, truncate_str}, }; use anyhow::Context; use aptos_api_types::{ @@ -22,27 +23,29 @@ use aptos_api_types::{ WriteResource as APIWriteResource, WriteSetChange as APIWriteSetChange, WriteTableItem as APIWriteTableItem, }; +use aptos_logger::warn; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +type TableHandle = String; +type OwnerAddress = String; +type TableType = String; +pub type TableHandleToOwner = HashMap; +pub type TokenDataIdHash = String; +// PK of current_token_ownerships, i.e. token_data_id_hash + property_version + owner_address +pub type CurrentTokenOwnershipPK = (TokenDataIdHash, BigDecimal, OwnerAddress); + #[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] -#[diesel(primary_key( - creator_address, - collection_name_hash, - name_hash, - property_version, - transaction_version -))] +#[diesel(primary_key(token_data_id_hash, property_version, transaction_version))] #[diesel(table_name = tokens)] pub struct Token { + pub token_data_id_hash: String, + pub property_version: BigDecimal, + pub transaction_version: i64, pub creator_address: String, - pub collection_name_hash: String, - pub name_hash: String, pub collection_name: String, pub name: String, - pub property_version: BigDecimal, - pub transaction_version: i64, pub token_properties: serde_json::Value, // Default time columns pub inserted_at: chrono::NaiveDateTime, @@ -50,43 +53,56 @@ pub struct Token { #[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] #[diesel(primary_key( - creator_address, - collection_name_hash, - name_hash, + token_data_id_hash, property_version, transaction_version, table_handle ))] #[diesel(table_name = token_ownerships)] pub struct TokenOwnership { + pub token_data_id_hash: String, + pub property_version: BigDecimal, + pub transaction_version: i64, + pub table_handle: String, pub creator_address: String, - pub collection_name_hash: String, - pub name_hash: String, pub collection_name: String, pub name: String, - pub property_version: BigDecimal, - pub transaction_version: i64, pub owner_address: Option, pub amount: BigDecimal, - pub table_handle: String, pub table_type: Option, // Default time columns pub inserted_at: chrono::NaiveDateTime, } +#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Queryable, Serialize)] +#[diesel(primary_key(token_data_id_hash, property_version, owner_address))] +#[diesel(table_name = current_token_ownerships)] +pub struct CurrentTokenOwnership { + pub token_data_id_hash: String, + pub property_version: BigDecimal, + pub owner_address: String, + pub creator_address: String, + pub collection_name: String, + pub name: String, + pub amount: BigDecimal, + pub token_properties: serde_json::Value, + pub last_transaction_version: i64, + // Default time columns + pub inserted_at: chrono::NaiveDateTime, +} + #[derive(Debug)] pub struct TableMetadataForToken { pub owner_address: OwnerAddress, pub table_type: TableType, } -type TableHandle = String; -type OwnerAddress = String; -type TableType = String; -pub type TableHandleToOwner = HashMap; impl Token { /// We can find token data from write sets in user transactions. Table items will contain metadata for collections /// and tokens. To find ownership, we have to look in write resource write sets for who owns those table handles + /// + /// We also will compute current versions of the token tables which are at a higher granularity than the transactional tables (only + /// state at the last transaction will be tracked, hence using hashmap to dedupe) pub fn from_transaction( transaction: &APITransaction, ) -> ( @@ -94,6 +110,9 @@ impl Token { Vec, Vec, Vec, + HashMap, + HashMap, + HashMap, ) { if let APITransaction::UserTransaction(user_txn) = transaction { let mut tokens = vec![]; @@ -101,6 +120,15 @@ impl Token { let mut token_datas = vec![]; let mut collection_datas = vec![]; + let mut current_token_ownerships: HashMap< + CurrentTokenOwnershipPK, + CurrentTokenOwnership, + > = HashMap::new(); + let mut current_token_datas: HashMap = + HashMap::new(); + let mut current_collection_datas: HashMap = + HashMap::new(); + let txn_version = user_txn.info.version.0 as i64; let mut table_handle_to_owner: TableHandleToOwner = HashMap::new(); for wsc in &user_txn.info.changes { @@ -145,20 +173,48 @@ impl Token { ), _ => (None, None, None), }; - if let Some((token, token_ownership)) = maybe_token_w_ownership { + if let Some((token, token_ownership, maybe_current_token_ownership)) = + maybe_token_w_ownership + { tokens.push(token); token_ownerships.push(token_ownership); + if let Some(current_token_ownership) = maybe_current_token_ownership { + current_token_ownerships.insert( + ( + current_token_ownership.token_data_id_hash.clone(), + current_token_ownership.property_version.clone(), + current_token_ownership.owner_address.clone(), + ), + current_token_ownership, + ); + } } - if let Some(token_data) = maybe_token_data { + if let Some((token_data, current_token_data)) = maybe_token_data { token_datas.push(token_data); + current_token_datas.insert( + current_token_data.token_data_id_hash.clone(), + current_token_data, + ); } - if let Some(collection_data) = maybe_collection_data { + if let Some((collection_data, current_collection_data)) = maybe_collection_data { collection_datas.push(collection_data); + current_collection_datas.insert( + current_collection_data.collection_data_id_hash.clone(), + current_collection_data, + ); } } - return (tokens, token_ownerships, token_datas, collection_datas); + return ( + tokens, + token_ownerships, + token_datas, + collection_datas, + current_token_ownerships, + current_token_datas, + current_collection_datas, + ); } - (vec![], vec![], vec![], vec![]) + Default::default() } /// Get token from write table item. Table items don't have address of the table so we need to look it up in the table_handle_to_owner mapping @@ -168,66 +224,82 @@ impl Token { table_item: &APIWriteTableItem, txn_version: i64, table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { + ) -> anyhow::Result)>> { let table_item_data = table_item.data.as_ref().unwrap(); - if table_item_data.key_type != "0x3::token::TokenId" { - return Ok(None); - } - let table_handle = - TableMetadataForToken::standardize_handle(&table_item.handle.to_string()); - let (owner_address, table_type) = table_handle_to_owner - .get(&table_handle) - .map(|table_metadata| { - ( - Some(table_metadata.owner_address.clone()), - Some(table_metadata.table_type.clone()), - ) - }) - .unwrap_or((None, None)); - let key = &table_item_data.key; - let token_data_id = Self::get_token_data_id_from_table_item_key(key, txn_version)?; - let property_version = Self::get_property_version_from_table_item_key(key, txn_version)?; - let collection_name_hash = hash_str(&token_data_id.collection_name); - let name_hash = hash_str(&token_data_id.name); + let maybe_token = match TokenWriteSet::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + Some(TokenWriteSet::Token(inner)) => Some(inner), + _ => None, + }; + + if let Some(token) = maybe_token { + let table_handle = + TableMetadataForToken::standardize_handle(&table_item.handle.to_string()); + let table_handle_metadata = table_handle_to_owner.get(&table_handle); + let (owner_address, table_type) = match table_handle_metadata { + Some(metadata) => ( + Some(metadata.owner_address.clone()), + Some(metadata.table_type.clone()), + ), + None => { + warn!( + "Missing table handle metadata for token. Version: {}, table handle for TokenStore: {}, all metadata: {:?}", + txn_version, table_handle, table_handle_to_owner + ); + (None, None) + } + }; + let token_id = token.id; + let token_data_id = token_id.token_data_id; + let token_data_id_hash = hash_str(&token_data_id.to_string()); + let collection_name = truncate_str(&token_data_id.collection, 128).to_string(); + let name = truncate_str(&token_data_id.name, 128).to_string(); + + let curr_token_ownership = match &owner_address { + Some(owner_address) => Some(CurrentTokenOwnership { + token_data_id_hash: token_data_id_hash.clone(), + property_version: token_id.property_version.clone(), + owner_address: owner_address.clone(), + creator_address: token_data_id.creator.clone(), + collection_name: collection_name.clone(), + name: name.clone(), + amount: ensure_not_negative(token.amount.clone()), + token_properties: token.token_properties.clone(), + last_transaction_version: txn_version, + inserted_at: chrono::Utc::now().naive_utc(), + }), + None => None, + }; - if table_item_data.value_type == "0x3::token::Token" { - let value = &table_item_data.value; Ok(Some(( Self { - creator_address: token_data_id.creator_address.clone(), - collection_name_hash: collection_name_hash.clone(), - name_hash: name_hash.clone(), - collection_name: token_data_id.collection_name.clone(), - name: token_data_id.name.clone(), - property_version: property_version.clone(), + token_data_id_hash: token_data_id_hash.clone(), + creator_address: token_data_id.creator.clone(), + collection_name: collection_name.clone(), + name: name.clone(), + property_version: token_id.property_version.clone(), transaction_version: txn_version, - token_properties: value["token_properties"].clone(), + token_properties: token.token_properties.clone(), inserted_at: chrono::Utc::now().naive_utc(), }, TokenOwnership { - creator_address: token_data_id.creator_address.clone(), - collection_name: token_data_id.collection_name.clone(), - collection_name_hash, - name_hash, - name: token_data_id.name, - property_version, + token_data_id_hash, + creator_address: token_data_id.creator, + collection_name, + name, + property_version: token_id.property_version, transaction_version: txn_version, owner_address, - amount: value["amount"] - .as_str() - .map(|s| -> anyhow::Result { - Ok(ensure_not_negative(u64_to_bigdecimal(s.parse::()?))) - }) - .context(format!( - "version {} failed! amount missing from token {:?}", - txn_version, value - ))? - .context(format!("failed to parse amount {:?}", value["amount"]))?, + amount: ensure_not_negative(token.amount), table_handle, table_type, inserted_at: chrono::Utc::now().naive_utc(), }, + curr_token_ownership, ))) } else { Ok(None) @@ -240,102 +312,80 @@ impl Token { table_item: &APIDeleteTableItem, txn_version: i64, table_handle_to_owner: &TableHandleToOwner, - ) -> anyhow::Result> { + ) -> anyhow::Result)>> { let table_item_data = table_item.data.as_ref().unwrap(); - if table_item_data.key_type != "0x3::token::TokenId" { - return Ok(None); - } - let table_handle = - TableMetadataForToken::standardize_handle(&table_item.handle.to_string()); - let (owner_address, table_type) = table_handle_to_owner - .get(&table_handle) - .map(|table_metadata| { - ( - Some(table_metadata.owner_address.clone()), - Some(table_metadata.table_type.clone()), - ) - }) - .unwrap_or((None, None)); - let key = &table_item_data.key; - let token_data_id = Self::get_token_data_id_from_table_item_key(key, txn_version)?; - let property_version = Self::get_property_version_from_table_item_key(key, txn_version)?; - let collection_name_hash = hash_str(&token_data_id.collection_name); - let name_hash = hash_str(&token_data_id.name); + let maybe_token_id = match TokenWriteSet::from_table_item_type( + table_item_data.key_type.as_str(), + &table_item_data.key, + txn_version, + )? { + Some(TokenWriteSet::TokenId(inner)) => Some(inner), + _ => None, + }; - Ok(Some(( - Self { - creator_address: token_data_id.creator_address.clone(), - collection_name: token_data_id.collection_name.clone(), - name: token_data_id.name.clone(), - collection_name_hash: collection_name_hash.clone(), - name_hash: name_hash.clone(), - property_version: property_version.clone(), - transaction_version: txn_version, - token_properties: serde_json::Value::Null, - inserted_at: chrono::Utc::now().naive_utc(), - }, - TokenOwnership { - creator_address: token_data_id.creator_address.clone(), - collection_name: token_data_id.collection_name.clone(), - name: token_data_id.name, - collection_name_hash, - name_hash, - property_version, - transaction_version: txn_version, - owner_address, - amount: BigDecimal::default(), - table_handle, - table_type, - inserted_at: chrono::Utc::now().naive_utc(), - }, - ))) - } + if let Some(token_id) = maybe_token_id { + let table_handle = + TableMetadataForToken::standardize_handle(&table_item.handle.to_string()); + let (owner_address, table_type) = table_handle_to_owner + .get(&table_handle) + .map(|table_metadata| { + ( + Some(table_metadata.owner_address.clone()), + Some(table_metadata.table_type.clone()), + ) + }) + .unwrap_or((None, None)); + let token_data_id = token_id.token_data_id; + let token_data_id_hash = hash_str(&token_data_id.to_string()); + let collection_name = truncate_str(&token_data_id.collection, 128).to_string(); + let name = truncate_str(&token_data_id.name, 128).to_string(); - fn get_property_version_from_table_item_key( - key: &serde_json::Value, - txn_version: i64, - ) -> anyhow::Result { - key["property_version"] - .as_str() - .map(|s| -> anyhow::Result { Ok(u64_to_bigdecimal(s.parse::()?)) }) - .context(format!( - "version {} failed! token_data_id.property_version missing from token id {:?}", - txn_version, key - ))? - .context(format!( - "version {} failed! failed to parse property_version {:?}", - txn_version, key["property_version"] - )) - } + let curr_token_ownership = match &owner_address { + Some(owner_address) => Some(CurrentTokenOwnership { + token_data_id_hash: token_data_id_hash.clone(), + property_version: token_id.property_version.clone(), + owner_address: owner_address.clone(), + creator_address: token_data_id.creator.clone(), + collection_name: collection_name.clone(), + name: name.clone(), + amount: BigDecimal::default(), + token_properties: serde_json::Value::Null, + last_transaction_version: txn_version, + inserted_at: chrono::Utc::now().naive_utc(), + }), + None => None, + }; - fn get_token_data_id_from_table_item_key( - key: &serde_json::Value, - txn_version: i64, - ) -> anyhow::Result { - Ok(TokenDataId { - creator_address: key["token_data_id"]["creator"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! token_data_id.creator missing from token_id {:?}", - txn_version, key - ))?, - collection_name: key["token_data_id"]["collection"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! token_data_id.collection missing from token_id {:?}", - txn_version, key - ))?, - name: key["token_data_id"]["name"] - .as_str() - .map(|s| s.to_string()) - .context(format!( - "version {} failed! name missing from token_id {:?}", - txn_version, key - ))?, - }) + Ok(Some(( + Self { + token_data_id_hash: token_data_id_hash.clone(), + creator_address: token_data_id.creator.clone(), + collection_name: collection_name.clone(), + name: name.clone(), + property_version: token_id.property_version.clone(), + transaction_version: txn_version, + token_properties: serde_json::Value::Null, + inserted_at: chrono::Utc::now().naive_utc(), + }, + TokenOwnership { + token_data_id_hash, + creator_address: token_data_id.creator, + collection_name, + name, + property_version: token_id.property_version, + transaction_version: txn_version, + owner_address, + amount: BigDecimal::default(), + table_handle, + table_type, + inserted_at: chrono::Utc::now().naive_utc(), + }, + curr_token_ownership, + ))) + } else { + Ok(None) + } } } diff --git a/crates/indexer/src/processors/default_processor.rs b/crates/indexer/src/processors/default_processor.rs index 627348afeb57c..d16ed728fd50c 100644 --- a/crates/indexer/src/processors/default_processor.rs +++ b/crates/indexer/src/processors/default_processor.rs @@ -118,6 +118,7 @@ fn insert_transactions( .values(&txns[start_ind..end_ind]) .on_conflict(version) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -165,6 +166,7 @@ fn insert_user_transactions_w_sigs( ut_schema::entry_function_id_str.eq(excluded(ut_schema::entry_function_id_str)), ut_schema::inserted_at.eq(excluded(ut_schema::inserted_at)), )), + None, ) { Ok(_) => {} Err(e) => { @@ -185,6 +187,7 @@ fn insert_user_transactions_w_sigs( sig_schema::is_sender_primary, )) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -217,6 +220,7 @@ fn insert_block_metadata_transactions( .values(&bmt[start_ind..end_ind]) .on_conflict(version) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -239,6 +243,7 @@ fn insert_events(conn: &mut PgConnection, ev: &[EventModel]) -> Result<(), diese .values(&ev[start_ind..end_ind]) .on_conflict((account_address, creation_number, sequence_number)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -264,6 +269,7 @@ fn insert_write_set_changes( .values(&wscs[start_ind..end_ind]) .on_conflict((transaction_version, index)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -296,6 +302,7 @@ fn insert_move_modules( .values(&modules[start_ind..end_ind]) .on_conflict((transaction_version, write_set_change_index)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -328,6 +335,7 @@ fn insert_move_resources( .values(&resources[start_ind..end_ind]) .on_conflict((transaction_version, write_set_change_index)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -371,6 +379,7 @@ fn insert_table_data( .values(&items[start_ind..end_ind]) .on_conflict((ti::transaction_version, ti::write_set_change_index)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -386,6 +395,7 @@ fn insert_table_data( .values(&metadata_nonnull[start_ind..end_ind]) .on_conflict(tm::handle) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { diff --git a/crates/indexer/src/processors/token_processor.rs b/crates/indexer/src/processors/token_processor.rs index bd03e612067f1..ae697eb9d753b 100644 --- a/crates/indexer/src/processors/token_processor.rs +++ b/crates/indexer/src/processors/token_processor.rs @@ -2,23 +2,27 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - database::{clean_data_for_db, execute_with_better_error, get_chunks, PgDbPool}, + database::{ + clean_data_for_db, execute_with_better_error, get_chunks, PgDbPool, PgPoolConnection, + }, indexer::{ errors::TransactionProcessingError, processing_result::ProcessingResult, transaction_processor::TransactionProcessor, }, models::{ - collection_datas::CollectionData, - token_datas::TokenData, - tokens::{Token, TokenOwnership}, + collection_datas::{CollectionData, CurrentCollectionData}, + token_datas::{CurrentTokenData, TokenData}, + tokens::{ + CurrentTokenOwnership, CurrentTokenOwnershipPK, Token, TokenDataIdHash, TokenOwnership, + }, }, schema, }; use aptos_api_types::Transaction; use async_trait::async_trait; -use diesel::{result::Error, PgConnection}; +use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods, PgConnection}; use field_count::FieldCount; -use std::fmt::Debug; +use std::{collections::HashMap, fmt::Debug}; pub const NAME: &str = "token_processor"; pub struct TokenTransactionProcessor { @@ -42,8 +46,28 @@ impl Debug for TokenTransactionProcessor { } } -fn insert_to_db( +fn insert_to_db_impl( conn: &mut PgConnection, + tokens: &[Token], + token_ownerships: &[TokenOwnership], + token_datas: &[TokenData], + collection_datas: &[CollectionData], + current_token_ownerships: &[CurrentTokenOwnership], + current_token_datas: &[CurrentTokenData], + current_collection_datas: &[CurrentCollectionData], +) -> Result<(), diesel::result::Error> { + insert_tokens(conn, tokens)?; + insert_token_datas(conn, token_datas)?; + insert_token_ownerships(conn, token_ownerships)?; + insert_collection_datas(conn, collection_datas)?; + insert_current_token_ownerships(conn, current_token_ownerships)?; + insert_current_token_datas(conn, current_token_datas)?; + insert_current_collection_datas(conn, current_collection_datas)?; + Ok(()) +} + +fn insert_to_db( + conn: &mut PgPoolConnection, name: &'static str, start_version: u64, end_version: u64, @@ -51,6 +75,9 @@ fn insert_to_db( token_ownerships: Vec, token_datas: Vec, collection_datas: Vec, + current_token_ownerships: Vec, + current_token_datas: Vec, + current_collection_datas: Vec, ) -> Result<(), diesel::result::Error> { aptos_logger::trace!( name = name, @@ -62,11 +89,16 @@ fn insert_to_db( .build_transaction() .read_write() .run::<_, Error, _>(|pg_conn| { - insert_tokens(pg_conn, &tokens)?; - insert_token_datas(pg_conn, &token_datas)?; - insert_token_ownerships(pg_conn, &token_ownerships)?; - insert_collection_datas(pg_conn, &collection_datas)?; - Ok(()) + insert_to_db_impl( + pg_conn, + &tokens, + &token_ownerships, + &token_datas, + &collection_datas, + ¤t_token_ownerships, + ¤t_token_datas, + ¤t_collection_datas, + ) }) { Ok(_) => Ok(()), Err(_) => conn @@ -77,12 +109,20 @@ fn insert_to_db( let token_datas = clean_data_for_db(token_datas, true); let token_ownerships = clean_data_for_db(token_ownerships, true); let collection_datas = clean_data_for_db(collection_datas, true); + let current_token_ownerships = clean_data_for_db(current_token_ownerships, true); + let current_token_datas = clean_data_for_db(current_token_datas, true); + let current_collection_datas = clean_data_for_db(current_collection_datas, true); - insert_tokens(pg_conn, &tokens)?; - insert_token_datas(pg_conn, &token_datas)?; - insert_token_ownerships(pg_conn, &token_ownerships)?; - insert_collection_datas(pg_conn, &collection_datas)?; - Ok(()) + insert_to_db_impl( + pg_conn, + &tokens, + &token_ownerships, + &token_datas, + &collection_datas, + ¤t_token_ownerships, + ¤t_token_datas, + ¤t_collection_datas, + ) }), } } @@ -99,14 +139,9 @@ fn insert_tokens( conn, diesel::insert_into(schema::tokens::table) .values(&tokens_to_insert[start_ind..end_ind]) - .on_conflict(( - creator_address, - collection_name_hash, - name_hash, - property_version, - transaction_version, - )) + .on_conflict((token_data_id_hash, property_version, transaction_version)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -133,14 +168,13 @@ fn insert_token_ownerships( diesel::insert_into(schema::token_ownerships::table) .values(&token_ownerships_to_insert[start_ind..end_ind]) .on_conflict(( - creator_address, - collection_name_hash, - name_hash, + token_data_id_hash, property_version, transaction_version, table_handle, )) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -163,13 +197,9 @@ fn insert_token_datas( conn, diesel::insert_into(schema::token_datas::table) .values(&token_datas_to_insert[start_ind..end_ind]) - .on_conflict(( - creator_address, - collection_name_hash, - name_hash, - transaction_version, - )) + .on_conflict((token_data_id_hash, transaction_version)) .do_nothing(), + None, ) { Ok(_) => {} Err(e) => { @@ -195,8 +225,129 @@ fn insert_collection_datas( conn, diesel::insert_into(schema::collection_datas::table) .values(&collection_datas_to_insert[start_ind..end_ind]) - .on_conflict((creator_address, collection_name_hash, transaction_version)) + .on_conflict((collection_data_id_hash, transaction_version)) .do_nothing(), + None, + ) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + } + Ok(()) +} + +fn insert_current_token_ownerships( + conn: &mut PgConnection, + items_to_insert: &[CurrentTokenOwnership], +) -> Result<(), diesel::result::Error> { + use schema::current_token_ownerships::dsl::*; + + let chunks = get_chunks(items_to_insert.len(), CurrentTokenOwnership::field_count()); + + for (start_ind, end_ind) in chunks { + match execute_with_better_error( + conn, + diesel::insert_into(schema::current_token_ownerships::table) + .values(&items_to_insert[start_ind..end_ind]) + .on_conflict((token_data_id_hash, property_version, owner_address)) + .do_update() + .set(( + creator_address.eq(excluded(creator_address)), + collection_name.eq(excluded(collection_name)), + name.eq(excluded(name)), + amount.eq(excluded(amount)), + token_properties.eq(excluded(token_properties)), + last_transaction_version.eq(excluded(last_transaction_version)), + inserted_at.eq(excluded(inserted_at)), + )), + Some(" WHERE current_token_ownerships.last_transaction_version < excluded.last_transaction_version "), + ) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + } + Ok(()) +} + +fn insert_current_token_datas( + conn: &mut PgConnection, + items_to_insert: &[CurrentTokenData], +) -> Result<(), diesel::result::Error> { + use schema::current_token_datas::dsl::*; + + let chunks = get_chunks(items_to_insert.len(), CurrentTokenData::field_count()); + + for (start_ind, end_ind) in chunks { + match execute_with_better_error( + conn, + diesel::insert_into(schema::current_token_datas::table) + .values(&items_to_insert[start_ind..end_ind]) + .on_conflict(token_data_id_hash) + .do_update() + .set(( + creator_address.eq(excluded(creator_address)), + collection_name.eq(excluded(collection_name)), + name.eq(excluded(name)), + maximum.eq(excluded(maximum)), + supply.eq(excluded(supply)), + largest_property_version.eq(excluded(largest_property_version)), + metadata_uri.eq(excluded(metadata_uri)), + payee_address.eq(excluded(payee_address)), + royalty_points_numerator.eq(excluded(royalty_points_numerator)), + royalty_points_denominator.eq(excluded(royalty_points_denominator)), + maximum_mutable.eq(excluded(maximum_mutable)), + uri_mutable.eq(excluded(uri_mutable)), + description_mutable.eq(excluded(description_mutable)), + properties_mutable.eq(excluded(properties_mutable)), + royalty_mutable.eq(excluded(royalty_mutable)), + default_properties.eq(excluded(default_properties)), + last_transaction_version.eq(excluded(last_transaction_version)), + inserted_at.eq(excluded(inserted_at)), + )), + Some(" WHERE current_token_datas.last_transaction_version < excluded.last_transaction_version "), + ) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + } + Ok(()) +} + +fn insert_current_collection_datas( + conn: &mut PgConnection, + items_to_insert: &[CurrentCollectionData], +) -> Result<(), diesel::result::Error> { + use schema::current_collection_datas::dsl::*; + + let chunks = get_chunks(items_to_insert.len(), CurrentCollectionData::field_count()); + + for (start_ind, end_ind) in chunks { + match execute_with_better_error( + conn, + diesel::insert_into(schema::current_collection_datas::table) + .values(&items_to_insert[start_ind..end_ind]) + .on_conflict(collection_data_id_hash) + .do_update() + .set(( + creator_address.eq(excluded(creator_address)), + collection_name.eq(excluded(collection_name)), + description.eq(excluded(description)), + metadata_uri.eq(excluded(metadata_uri)), + supply.eq(excluded(supply)), + maximum.eq(excluded(maximum)), + maximum_mutable.eq(excluded(maximum_mutable)), + uri_mutable.eq(excluded(uri_mutable)), + description_mutable.eq(excluded(description_mutable)), + last_transaction_version.eq(excluded(last_transaction_version)), + inserted_at.eq(excluded(inserted_at)), + )), + Some(" WHERE current_collection_datas.last_transaction_version < excluded.last_transaction_version "), ) { Ok(_) => {} Err(e) => { @@ -223,15 +374,59 @@ impl TransactionProcessor for TokenTransactionProcessor { let mut all_token_ownerships = vec![]; let mut all_token_datas = vec![]; let mut all_collection_datas = vec![]; + + let mut all_current_token_ownerships: HashMap< + CurrentTokenOwnershipPK, + CurrentTokenOwnership, + > = HashMap::new(); + let mut all_current_token_datas: HashMap = + HashMap::new(); + let mut all_current_collection_datas: HashMap = + HashMap::new(); + for txn in transactions { - let (mut tokens, mut token_ownerships, mut token_datas, mut collection_datas) = - Token::from_transaction(&txn); + let ( + mut tokens, + mut token_ownerships, + mut token_datas, + mut collection_datas, + current_token_ownerships, + current_token_datas, + current_collection_datas, + ) = Token::from_transaction(&txn); all_tokens.append(&mut tokens); all_token_ownerships.append(&mut token_ownerships); all_token_datas.append(&mut token_datas); all_collection_datas.append(&mut collection_datas); + // Given versions will always be increasing here, we can just override current values + all_current_token_ownerships.extend(current_token_ownerships); + all_current_token_datas.extend(current_token_datas); + all_current_collection_datas.extend(current_collection_datas); } + let mut all_current_token_ownerships = all_current_token_ownerships + .into_iter() + .map(|(_, v)| v) + .collect::>(); + let mut all_current_token_datas = all_current_token_datas + .into_iter() + .map(|(_, v)| v) + .collect::>(); + let mut all_current_collection_datas = all_current_collection_datas + .into_iter() + .map(|(_, v)| v) + .collect::>(); + all_current_token_ownerships.sort_by(|a, b| { + (&a.token_data_id_hash, &a.property_version, &a.owner_address).cmp(&( + &b.token_data_id_hash, + &b.property_version, + &b.owner_address, + )) + }); + all_current_token_datas.sort_by(|a, b| a.token_data_id_hash.cmp(&b.token_data_id_hash)); + all_current_collection_datas + .sort_by(|a, b| a.collection_data_id_hash.cmp(&b.collection_data_id_hash)); + let mut conn = self.get_conn(); let tx_result = insert_to_db( &mut conn, @@ -242,6 +437,9 @@ impl TransactionProcessor for TokenTransactionProcessor { all_token_ownerships, all_token_datas, all_collection_datas, + all_current_token_ownerships, + all_current_token_datas, + all_current_collection_datas, ); match tx_result { Ok(_) => Ok(ProcessingResult::new( diff --git a/crates/indexer/src/schema.rs b/crates/indexer/src/schema.rs index 90ce86cf5764b..188cd23381352 100644 --- a/crates/indexer/src/schema.rs +++ b/crates/indexer/src/schema.rs @@ -17,13 +17,13 @@ table! { } table! { - collection_datas (creator_address, collection_name_hash, transaction_version) { + collection_datas (collection_data_id_hash, transaction_version) { + collection_data_id_hash -> Varchar, + transaction_version -> Int8, creator_address -> Varchar, - collection_name_hash -> Varchar, - collection_name -> Text, + collection_name -> Varchar, description -> Text, - transaction_version -> Int8, - metadata_uri -> Text, + metadata_uri -> Varchar, supply -> Numeric, maximum -> Numeric, maximum_mutable -> Bool, @@ -33,6 +33,62 @@ table! { } } +table! { + current_collection_datas (collection_data_id_hash) { + collection_data_id_hash -> Varchar, + creator_address -> Varchar, + collection_name -> Varchar, + description -> Text, + metadata_uri -> Varchar, + supply -> Numeric, + maximum -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +table! { + current_token_datas (token_data_id_hash) { + token_data_id_hash -> Varchar, + creator_address -> Varchar, + collection_name -> Varchar, + name -> Varchar, + maximum -> Numeric, + supply -> Numeric, + largest_property_version -> Numeric, + metadata_uri -> Varchar, + payee_address -> Varchar, + royalty_points_numerator -> Numeric, + royalty_points_denominator -> Numeric, + maximum_mutable -> Bool, + uri_mutable -> Bool, + description_mutable -> Bool, + properties_mutable -> Bool, + royalty_mutable -> Bool, + default_properties -> Jsonb, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + +table! { + current_token_ownerships (token_data_id_hash, property_version, owner_address) { + token_data_id_hash -> Varchar, + property_version -> Numeric, + owner_address -> Varchar, + creator_address -> Varchar, + collection_name -> Varchar, + name -> Varchar, + amount -> Numeric, + token_properties -> Jsonb, + last_transaction_version -> Int8, + inserted_at -> Timestamp, + } +} + table! { events (account_address, creation_number, sequence_number) { sequence_number -> Int8, @@ -138,17 +194,16 @@ table! { } table! { - token_datas (creator_address, collection_name_hash, name_hash, transaction_version) { - creator_address -> Varchar, - collection_name_hash -> Varchar, - name_hash -> Varchar, - collection_name -> Text, - name -> Text, + token_datas (token_data_id_hash, transaction_version) { + token_data_id_hash -> Varchar, transaction_version -> Int8, + creator_address -> Varchar, + collection_name -> Varchar, + name -> Varchar, maximum -> Numeric, supply -> Numeric, largest_property_version -> Numeric, - metadata_uri -> Text, + metadata_uri -> Varchar, payee_address -> Varchar, royalty_points_numerator -> Numeric, royalty_points_denominator -> Numeric, @@ -163,31 +218,29 @@ table! { } table! { - token_ownerships (creator_address, collection_name_hash, name_hash, property_version, transaction_version, table_handle) { - creator_address -> Varchar, - collection_name_hash -> Varchar, - name_hash -> Varchar, - collection_name -> Text, - name -> Text, + token_ownerships (token_data_id_hash, property_version, transaction_version, table_handle) { + token_data_id_hash -> Varchar, property_version -> Numeric, transaction_version -> Int8, + table_handle -> Varchar, + creator_address -> Varchar, + collection_name -> Varchar, + name -> Varchar, owner_address -> Nullable, amount -> Numeric, - table_handle -> Varchar, table_type -> Nullable, inserted_at -> Timestamp, } } table! { - tokens (creator_address, collection_name_hash, name_hash, property_version, transaction_version) { - creator_address -> Varchar, - collection_name_hash -> Varchar, - name_hash -> Varchar, - collection_name -> Text, - name -> Text, + tokens (token_data_id_hash, property_version, transaction_version) { + token_data_id_hash -> Varchar, property_version -> Numeric, transaction_version -> Int8, + creator_address -> Varchar, + collection_name -> Varchar, + name -> Varchar, token_properties -> Jsonb, inserted_at -> Timestamp, } @@ -246,6 +299,9 @@ table! { allow_tables_to_appear_in_same_query!( block_metadata_transactions, collection_datas, + current_collection_datas, + current_token_datas, + current_token_ownerships, events, ledger_infos, move_modules, diff --git a/crates/indexer/src/util.rs b/crates/indexer/src/util.rs index adeeab112d5fd..25c9eed34f39b 100644 --- a/crates/indexer/src/util.rs +++ b/crates/indexer/src/util.rs @@ -13,6 +13,13 @@ pub fn hash_str(val: &str) -> String { hex::encode(sha2::Sha256::digest(val.as_bytes())) } +pub fn truncate_str(val: &str, max_chars: usize) -> &str { + match val.char_indices().nth(max_chars) { + None => val, + Some((idx, _)) => &val[..idx], + } +} + pub fn u64_to_bigdecimal(val: u64) -> BigDecimal { BigDecimal::from(val) } diff --git a/testsuite/smoke-test/src/indexer.rs b/testsuite/smoke-test/src/indexer.rs index 60bdc4340699f..91018cd86db37 100644 --- a/testsuite/smoke-test/src/indexer.rs +++ b/testsuite/smoke-test/src/indexer.rs @@ -12,28 +12,13 @@ use forge::{AptosPublicInfo, Result, Swarm}; use std::sync::Arc; pub fn wipe_database(conn: &mut PgPoolConnection) { - for table in [ - "collection_datas", - "tokens", - "token_datas", - "token_ownerships", - "signatures", - "move_modules", - "move_resources", - "table_items", - "table_metadatas", - "write_set_changes", - "events", - "user_transactions", - "block_metadata_transactions", - "transactions", - "processor_statuses", - "ledger_infos", - "__diesel_schema_migrations", + for command in [ + "DROP SCHEMA public CASCADE", + "CREATE SCHEMA public", + "GRANT ALL ON SCHEMA public TO postgres", + "GRANT ALL ON SCHEMA public TO public", ] { - diesel::sql_query(format!("DROP TABLE IF EXISTS {} CASCADE", table)) - .execute(conn) - .unwrap(); + diesel::sql_query(command).execute(conn).unwrap(); } }