diff --git a/crates/sui-indexer/benches/indexer_benchmark.rs b/crates/sui-indexer/benches/indexer_benchmark.rs index edfd53d591cce..87e1f17b4feb2 100644 --- a/crates/sui-indexer/benches/indexer_benchmark.rs +++ b/crates/sui-indexer/benches/indexer_benchmark.rs @@ -77,6 +77,8 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore { total_storage_rebate: i64::MAX, total_transaction_blocks: 1000, total_transactions: 1000, + total_successful_transaction_blocks: 1000, + total_successful_transactions: 1000, network_total_transactions: 0, timestamp_ms: Utc::now().timestamp_millis(), }, @@ -89,6 +91,7 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore { deleted_objects: vec![], }], addresses: vec![], + active_addresses: vec![], packages: vec![], input_objects: vec![], move_calls: vec![], @@ -122,6 +125,7 @@ fn create_transaction(sequence_number: i64) -> Transaction { timestamp_ms: Some(Utc::now().timestamp_millis()), transaction_kind: "test".to_string(), transaction_count: 0, + execution_success: true, created: vec![], mutated: vec![], deleted: vec![], diff --git a/crates/sui-indexer/migrations/2022-11-18-195259_transactions/up.sql b/crates/sui-indexer/migrations/2022-11-18-195259_transactions/up.sql index faf49bab29f10..e817ff25b89ae 100644 --- a/crates/sui-indexer/migrations/2022-11-18-195259_transactions/up.sql +++ b/crates/sui-indexer/migrations/2022-11-18-195259_transactions/up.sql @@ -20,6 +20,7 @@ CREATE TABLE transactions ( timestamp_ms BIGINT, transaction_kind TEXT NOT NULL, transaction_count BIGINT NOT NULL, + execution_success BOOLEAN NOT NULL, -- object related created TEXT[] NOT NULL, mutated TEXT[] NOT NULL, @@ -54,3 +55,4 @@ CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest CREATE INDEX transactions_timestamp_ms ON transactions (timestamp_ms); CREATE INDEX transactions_sender ON transactions (sender); CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number); +CREATE INDEX transactions_execution_success ON transactions (execution_success); diff --git a/crates/sui-indexer/migrations/2022-11-28-204251_addresses/down.sql b/crates/sui-indexer/migrations/2022-11-28-204251_addresses/down.sql index 5eb914b85b5c5..c282d41d65167 100644 --- a/crates/sui-indexer/migrations/2022-11-28-204251_addresses/down.sql +++ b/crates/sui-indexer/migrations/2022-11-28-204251_addresses/down.sql @@ -1 +1,2 @@ DROP TABLE IF EXISTS addresses; +DROP TABLE IF EXISTS active_addresses; diff --git a/crates/sui-indexer/migrations/2022-11-28-204251_addresses/up.sql b/crates/sui-indexer/migrations/2022-11-28-204251_addresses/up.sql index 00a461e96f1bf..83b291f3c85d5 100644 --- a/crates/sui-indexer/migrations/2022-11-28-204251_addresses/up.sql +++ b/crates/sui-indexer/migrations/2022-11-28-204251_addresses/up.sql @@ -2,6 +2,16 @@ CREATE TABLE addresses ( account_address address PRIMARY KEY, first_appearance_tx base58digest NOT NULL, - first_appearance_time BIGINT NOT NULL + first_appearance_time BIGINT NOT NULL, + last_appearance_tx base58digest NOT NULL, + last_appearance_time BIGINT NOT NULL +); + +CREATE TABLE active_addresses +( + account_address address PRIMARY KEY, + first_appearance_tx base58digest NOT NULL, + first_appearance_time BIGINT NOT NULL, + last_appearance_tx base58digest NOT NULL, + last_appearance_time BIGINT NOT NULL ); -CREATE INDEX addresses_account_address ON addresses (account_address); diff --git a/crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql b/crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql index 5a2b833af3669..6f84a85b8c32e 100644 --- a/crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql +++ b/crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql @@ -40,34 +40,34 @@ CREATE INDEX objects_owner_address ON objects (owner_type, owner_address); CREATE INDEX objects_tx_digest ON objects (previous_transaction); -- NOTE(gegaowp): remove object history so that it will not be created over DB reset / migration run. --- CREATE TABLE objects_history --- ( --- epoch BIGINT NOT NULL, --- checkpoint BIGINT NOT NULL, --- object_id address NOT NULL, --- version BIGINT NOT NULL, --- object_digest base58digest NOT NULL, --- owner_type owner_type NOT NULL, --- owner_address address, --- old_owner_type owner_type, --- old_owner_address address, --- initial_shared_version BIGINT, --- previous_transaction base58digest NOT NULL, --- object_type VARCHAR NOT NULL, --- object_status object_status NOT NULL, --- has_public_transfer BOOLEAN NOT NULL, --- storage_rebate BIGINT NOT NULL, --- bcs bcs_bytes[] NOT NULL, --- CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint) --- ) PARTITION BY RANGE (checkpoint); --- CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint); --- CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version); --- CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address); --- CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address); --- -- fast-path partition for the most recent objects before checkpoint, range is half-open. --- -- partition name need to match regex of '.*(_partition_)\d+'. --- CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0); --- CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE); +CREATE TABLE objects_history +( + epoch BIGINT NOT NULL, + checkpoint BIGINT NOT NULL, + object_id address NOT NULL, + version BIGINT NOT NULL, + object_digest base58digest NOT NULL, + owner_type owner_type NOT NULL, + owner_address address, + old_owner_type owner_type, + old_owner_address address, + initial_shared_version BIGINT, + previous_transaction base58digest NOT NULL, + object_type VARCHAR NOT NULL, + object_status object_status NOT NULL, + has_public_transfer BOOLEAN NOT NULL, + storage_rebate BIGINT NOT NULL, + bcs bcs_bytes[] NOT NULL, + CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint) +) PARTITION BY RANGE (checkpoint); +CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint); +CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version); +CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address); +CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address); +-- fast-path partition for the most recent objects before checkpoint, range is half-open. +-- partition name need to match regex of '.*(_partition_)\d+'. +CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0); +CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE); -- CREATE OR REPLACE FUNCTION objects_modified_func() RETURNS TRIGGER AS -- $body$ diff --git a/crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql b/crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql index 58f47942ec7bc..a30b6a7bc20e2 100644 --- a/crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql +++ b/crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql @@ -1,22 +1,24 @@ CREATE TABLE checkpoints ( - sequence_number BIGINT PRIMARY KEY, - checkpoint_digest VARCHAR(255) NOT NULL, - epoch BIGINT NOT NULL, - transactions TEXT[] NOT NULL, - previous_checkpoint_digest VARCHAR(255), - end_of_epoch BOOLEAN NOT NULL, + sequence_number BIGINT PRIMARY KEY, + checkpoint_digest VARCHAR(255) NOT NULL, + epoch BIGINT NOT NULL, + transactions TEXT[] NOT NULL, + previous_checkpoint_digest VARCHAR(255), + end_of_epoch BOOLEAN NOT NULL, -- derived from GasCostSummary - total_gas_cost BIGINT NOT NULL, - total_computation_cost BIGINT NOT NULL, - total_storage_cost BIGINT NOT NULL, - total_storage_rebate BIGINT NOT NULL, + total_gas_cost BIGINT NOT NULL, + total_computation_cost BIGINT NOT NULL, + total_storage_cost BIGINT NOT NULL, + total_storage_rebate BIGINT NOT NULL, -- derived from transaction count from genesis - total_transaction_blocks BIGINT NOT NULL, - total_transactions BIGINT NOT NULL, - network_total_transactions BIGINT NOT NULL, + total_transaction_blocks BIGINT NOT NULL, + total_transactions BIGINT NOT NULL, + total_successful_transaction_blocks BIGINT NOT NULL, + total_successful_transactions BIGINT NOT NULL, + network_total_transactions BIGINT NOT NULL, -- number of milliseconds from the Unix epoch - timestamp_ms BIGINT NOT NULL + timestamp_ms BIGINT NOT NULL ); CREATE INDEX checkpoints_epoch ON checkpoints (epoch); diff --git a/crates/sui-indexer/migrations/2023-03-20-041133_epoch/up.sql b/crates/sui-indexer/migrations/2023-03-20-041133_epoch/up.sql index c8da6dd5f628c..c14ede41c7602 100644 --- a/crates/sui-indexer/migrations/2023-03-20-041133_epoch/up.sql +++ b/crates/sui-indexer/migrations/2023-03-20-041133_epoch/up.sql @@ -1,6 +1,7 @@ CREATE MATERIALIZED VIEW epoch_network_metrics as SELECT MAX(tps_30_days) as tps_30_days -FROM (SELECT (((SUM(total_transactions) OVER w) - (FIRST_VALUE(total_transactions) OVER w))::float8 / +FROM (SELECT (((SUM(total_successful_transactions + total_transaction_blocks - total_successful_transaction_blocks) OVER w) - + (FIRST_VALUE(total_successful_transactions + total_transaction_blocks - total_successful_transaction_blocks) OVER w))::float8 / ((MAX(timestamp_ms) OVER w - MIN(timestamp_ms) OVER w)) * 1000) AS tps_30_days FROM checkpoints diff --git a/crates/sui-indexer/migrations/2023-03-20-042226_system_state/up.sql b/crates/sui-indexer/migrations/2023-03-20-042226_system_state/up.sql index abcf4be9887a8..13c5398634028 100644 --- a/crates/sui-indexer/migrations/2023-03-20-042226_system_state/up.sql +++ b/crates/sui-indexer/migrations/2023-03-20-042226_system_state/up.sql @@ -79,7 +79,12 @@ CREATE TABLE at_risk_validators ); CREATE OR REPLACE VIEW network_metrics AS -SELECT (SELECT COALESCE(SUM(transaction_count)::float8 / 10, 0) +SELECT (SELECT COALESCE(SUM( + CASE + WHEN execution_success = true THEN transaction_count + ELSE 1 + END + )::float8 / 10, 0) FROM transactions WHERE timestamp_ms > (SELECT timestamp_ms FROM checkpoints ORDER BY sequence_number DESC LIMIT 1) - 10000) AS current_tps, diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index d144d21b8eee8..d8c292e6b8ab1 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -33,6 +33,7 @@ use sui_types::SUI_SYSTEM_ADDRESS; use crate::errors::IndexerError; use crate::metrics::IndexerMetrics; +use crate::models::addresses::{dedup_from_addresses, dedup_from_and_to_addresses}; use crate::models::checkpoints::Checkpoint; use crate::models::epoch::{DBEpochInfo, SystemEpochInfoEvent}; use crate::models::objects::{DeletedObject, Object, ObjectStatus}; @@ -430,7 +431,8 @@ where transactions, events, object_changes: _, - addresses: _, + addresses, + active_addresses, packages: _, input_objects: _, move_calls: _, @@ -455,23 +457,27 @@ where } }); - // let addresses_handler = self.clone(); - // spawn_monitored_task!(async move { - // let mut address_commit_res = - // addresses_handler.state.persist_addresses(&addresses).await; - // while let Err(e) = address_commit_res { - // warn!( - // "Indexer address commit failed with error: {:?}, retrying after {:?} milli-secs...", - // e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS - // ); - // tokio::time::sleep(std::time::Duration::from_millis( - // DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, - // )) - // .await; - // address_commit_res = - // addresses_handler.state.persist_addresses(&addresses).await; - // } - // }); + let addresses_handler = self.clone(); + spawn_monitored_task!(async move { + let mut address_commit_res = addresses_handler + .state + .persist_addresses(&addresses, &active_addresses) + .await; + while let Err(e) = address_commit_res { + warn!( + "Indexer address commit failed with error: {:?}, retrying after {:?} milli-secs...", + e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS + ); + tokio::time::sleep(std::time::Duration::from_millis( + DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, + )) + .await; + address_commit_res = addresses_handler + .state + .persist_addresses(&addresses, &active_addresses) + .await; + } + }); // MUSTFIX(gegaowp): temp. turn off tx index table commit to reduce short-term storage consumption. // this include recipients, input_objects and move_calls. @@ -558,6 +564,7 @@ where events: _, object_changes: tx_object_changes, addresses: _, + active_addresses: _, packages, input_objects: _, move_calls, @@ -863,13 +870,21 @@ where .flat_map(|tx| tx.get_recipients(checkpoint.epoch, checkpoint.sequence_number)) .collect(); - // // Index addresses - // let addresses = transactions - // .iter() - // .flat_map(|tx| { - // tx.get_addresses(checkpoint.epoch, checkpoint.sequence_number) - // }) - // .collect(); + // Index addresses + // NOTE: dedup is necessary because there are multiple transactions in a checkpoint, + // otherwise error of `ON CONFLICT DO UPDATE command cannot affect row a second time` will be thrown. + let from_and_to_address_data = transactions + .iter() + .flat_map(|tx| { + tx.get_from_and_to_addresses(checkpoint.epoch, checkpoint.sequence_number) + }) + .collect(); + let from_address_data = transactions + .iter() + .map(|tx| tx.get_from_address()) + .collect(); + let addresses = dedup_from_and_to_addresses(from_and_to_address_data); + let active_addresses = dedup_from_addresses(from_address_data); // NOTE: Index epoch when object checkpoint index has reached the same checkpoint, // because epoch info is based on the latest system state object by the current checkpoint. @@ -1005,14 +1020,29 @@ where }; } let total_transactions = db_transactions.iter().map(|t| t.transaction_count).sum(); + let total_successful_transaction_blocks = db_transactions + .iter() + .filter(|t| t.execution_success) + .count(); + let total_successful_transactions = db_transactions + .iter() + .filter(|t| t.execution_success) + .map(|t| t.transaction_count) + .sum(); Ok(( TemporaryCheckpointStore { - checkpoint: Checkpoint::from(checkpoint, total_transactions)?, + checkpoint: Checkpoint::from( + checkpoint, + total_transactions, + total_successful_transactions, + total_successful_transaction_blocks as i64, + )?, transactions: db_transactions, events, object_changes: objects_changes, - addresses: vec![], + addresses, + active_addresses, packages, input_objects, move_calls, diff --git a/crates/sui-indexer/src/models/addresses.rs b/crates/sui-indexer/src/models/addresses.rs index 13b69097177bf..e3e453659a8dc 100644 --- a/crates/sui-indexer/src/models/addresses.rs +++ b/crates/sui-indexer/src/models/addresses.rs @@ -1,9 +1,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; + use diesel::prelude::*; -use crate::schema::addresses; +use crate::schema::{active_addresses, addresses}; +use crate::types::AddressData; #[derive(Queryable, Insertable, Debug)] #[diesel(table_name = addresses, primary_key(account_address))] @@ -11,4 +14,72 @@ pub struct Address { pub account_address: String, pub first_appearance_tx: String, pub first_appearance_time: i64, + pub last_appearance_tx: String, + pub last_appearance_time: i64, +} + +#[derive(Queryable, Insertable, Debug)] +#[diesel(table_name = active_addresses, primary_key(account_address))] +pub struct ActiveAddress { + pub account_address: String, + pub first_appearance_tx: String, + pub first_appearance_time: i64, + pub last_appearance_tx: String, + pub last_appearance_time: i64, +} + +pub fn dedup_from_and_to_addresses(addrs: Vec) -> Vec
{ + let addr_map = addrs.into_iter().fold(HashMap::new(), |mut acc, addr| { + let key = addr.account_address.clone(); + let value = Address { + account_address: addr.account_address, + first_appearance_tx: addr.transaction_digest.clone(), + first_appearance_time: addr.timestamp_ms, + last_appearance_tx: addr.transaction_digest, + last_appearance_time: addr.timestamp_ms, + }; + acc.entry(key) + .and_modify(|v: &mut Address| { + if v.first_appearance_time > value.first_appearance_time { + v.first_appearance_time = value.first_appearance_time; + v.first_appearance_tx = value.first_appearance_tx.clone(); + } + if v.last_appearance_time < value.last_appearance_time { + v.last_appearance_time = value.last_appearance_time; + v.last_appearance_tx = value.last_appearance_tx.clone(); + } + }) + .or_insert(value); + acc + }); + addr_map.into_values().collect() +} + +pub fn dedup_from_addresses(from_addrs: Vec) -> Vec { + let active_addr_map = from_addrs + .into_iter() + .fold(HashMap::new(), |mut acc, addr| { + let key = addr.account_address.clone(); + let value = ActiveAddress { + account_address: addr.account_address, + first_appearance_tx: addr.transaction_digest.clone(), + first_appearance_time: addr.timestamp_ms, + last_appearance_tx: addr.transaction_digest, + last_appearance_time: addr.timestamp_ms, + }; + acc.entry(key) + .and_modify(|v: &mut ActiveAddress| { + if v.first_appearance_time > value.first_appearance_time { + v.first_appearance_time = value.first_appearance_time; + v.first_appearance_tx = value.first_appearance_tx.clone(); + } + if v.last_appearance_time < value.last_appearance_time { + v.last_appearance_time = value.last_appearance_time; + v.last_appearance_tx = value.last_appearance_tx.clone(); + } + }) + .or_insert(value); + acc + }); + active_addr_map.into_values().collect() } diff --git a/crates/sui-indexer/src/models/checkpoints.rs b/crates/sui-indexer/src/models/checkpoints.rs index c919f8d69fcb5..8337e921f653d 100644 --- a/crates/sui-indexer/src/models/checkpoints.rs +++ b/crates/sui-indexer/src/models/checkpoints.rs @@ -29,6 +29,8 @@ pub struct Checkpoint { pub total_storage_rebate: i64, pub total_transaction_blocks: i64, pub total_transactions: i64, + pub total_successful_transaction_blocks: i64, + pub total_successful_transactions: i64, pub network_total_transactions: i64, pub timestamp_ms: i64, pub validator_signature: String, @@ -38,6 +40,8 @@ impl Checkpoint { pub fn from( rpc_checkpoint: &RpcCheckpoint, total_transactions: i64, + total_successful_transactions: i64, + total_successful_transaction_blocks: i64, ) -> Result { let total_gas_cost = rpc_checkpoint .epoch_rolling_gas_cost_summary @@ -66,9 +70,11 @@ impl Checkpoint { total_storage_rebate: rpc_checkpoint.epoch_rolling_gas_cost_summary.storage_rebate as i64, total_transaction_blocks: rpc_checkpoint.transactions.len() as i64, + total_transactions, + total_successful_transaction_blocks, + total_successful_transactions, network_total_transactions: rpc_checkpoint.network_total_transactions as i64, timestamp_ms: rpc_checkpoint.timestamp_ms as i64, - total_transactions, validator_signature: rpc_checkpoint.validator_signature.encode_base64(), }) } diff --git a/crates/sui-indexer/src/models/objects.rs b/crates/sui-indexer/src/models/objects.rs index 31a186d934891..970eacc06594d 100644 --- a/crates/sui-indexer/src/models/objects.rs +++ b/crates/sui-indexer/src/models/objects.rs @@ -198,11 +198,6 @@ impl Object { })?; Ok((object_id, (self.version as u64).into(), digest)) } - - // MUSTFIX(gegaowp): trim data to reduce short-term storage consumption. - pub fn trim_data(&mut self) { - self.bcs.clear(); - } } impl TryFrom for sui_types::object::Object { diff --git a/crates/sui-indexer/src/models/transactions.rs b/crates/sui-indexer/src/models/transactions.rs index 50917a67cd192..adbfc50ffefe5 100644 --- a/crates/sui-indexer/src/models/transactions.rs +++ b/crates/sui-indexer/src/models/transactions.rs @@ -23,6 +23,7 @@ pub struct Transaction { pub timestamp_ms: Option, pub transaction_kind: String, pub transaction_count: i64, + pub execution_success: bool, pub created: Vec>, pub mutated: Vec>, pub deleted: Vec>, @@ -126,6 +127,7 @@ impl TryFrom for Transaction { checkpoint_sequence_number: checkpoint.map(|seq| seq as i64), transaction_kind: transaction.data.transaction().name().to_string(), transaction_count: transaction.data.transaction().transaction_count() as i64, + execution_success: effects.status().is_ok(), timestamp_ms: timestamp_ms.map(|ts| ts as i64), created: vec_string_to_vec_opt(created), mutated: vec_string_to_vec_opt(mutated), @@ -153,22 +155,6 @@ impl TryFrom for Transaction { } } -impl Transaction { - // MUSTFIX(gegaowp): trim data to reduce short-term storage consumption. - pub fn trim_data(&mut self) { - self.created.clear(); - self.mutated.clear(); - self.unwrapped.clear(); - self.wrapped.clear(); - self.move_calls.clear(); - self.recipients.clear(); - // trim BCS and JSON data from transaction - self.raw_transaction.clear(); - self.transaction_content.clear(); - self.transaction_effects_content.clear(); - } -} - fn owned_obj_ref_to_obj_id(owned_obj_ref: &OwnedObjectRef) -> String { owned_obj_ref.reference.object_id.to_string() } diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index b321d837da65e..b5b1e1822c1ce 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -16,11 +16,23 @@ pub mod sql_types { pub struct OwnerType; } +diesel::table! { + active_addresses (account_address) { + account_address -> Varchar, + first_appearance_tx -> Varchar, + first_appearance_time -> Int8, + last_appearance_tx -> Varchar, + last_appearance_time -> Int8, + } +} + diesel::table! { addresses (account_address) { account_address -> Varchar, first_appearance_tx -> Varchar, first_appearance_time -> Int8, + last_appearance_tx -> Varchar, + last_appearance_time -> Int8, } } @@ -47,6 +59,8 @@ diesel::table! { total_storage_rebate -> Int8, total_transaction_blocks -> Int8, total_transactions -> Int8, + total_successful_transaction_blocks -> Int8, + total_successful_transactions -> Int8, network_total_transactions -> Int8, timestamp_ms -> Int8, validator_signature -> Text, @@ -227,6 +241,7 @@ diesel::table! { timestamp_ms -> Nullable, transaction_kind -> Text, transaction_count -> Int8, + execution_success -> Bool, created -> Array>, mutated -> Array>, deleted -> Array>, @@ -296,6 +311,7 @@ diesel::table! { } diesel::allow_tables_to_appear_in_same_query!( + active_addresses, addresses, at_risk_validators, checkpoints, diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index ed1661d0eacb7..5006a9d6b7400 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -20,7 +20,7 @@ use sui_types::storage::ObjectStore; use crate::errors::IndexerError; use crate::metrics::IndexerMetrics; -use crate::models::addresses::Address; +use crate::models::addresses::{ActiveAddress, Address}; use crate::models::checkpoints::Checkpoint; use crate::models::epoch::DBEpochInfo; use crate::models::events::Event; @@ -207,7 +207,11 @@ pub trait IndexerStore { object_deletion_latency: Histogram, ) -> Result<(), IndexerError>; async fn persist_events(&self, events: &[Event]) -> Result<(), IndexerError>; - async fn persist_addresses(&self, addresses: &[Address]) -> Result<(), IndexerError>; + async fn persist_addresses( + &self, + addresses: &[Address], + active_addresses: &[ActiveAddress], + ) -> Result<(), IndexerError>; async fn persist_packages(&self, packages: &[Package]) -> Result<(), IndexerError>; // NOTE: these tables are for tx query performance optimization async fn persist_transaction_index_tables( @@ -282,6 +286,7 @@ pub struct TemporaryCheckpointStore { pub events: Vec, pub object_changes: Vec, pub addresses: Vec
, + pub active_addresses: Vec, pub packages: Vec, pub input_objects: Vec, pub move_calls: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index f73ca8cbe4aab..03b4d92b8595c 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -45,7 +45,7 @@ use sui_types::object::ObjectRead; use crate::errors::{Context, IndexerError}; use crate::metrics::IndexerMetrics; -use crate::models::addresses::Address; +use crate::models::addresses::{ActiveAddress, Address}; use crate::models::checkpoints::Checkpoint; use crate::models::epoch::DBEpochInfo; use crate::models::events::Event; @@ -58,11 +58,11 @@ use crate::models::system_state::DBValidatorSummary; use crate::models::transaction_index::{InputObject, MoveCall, Recipient}; use crate::models::transactions::Transaction; use crate::schema::{ - addresses, checkpoints, checkpoints::dsl as checkpoints_dsl, epochs, epochs::dsl as epochs_dsl, - events, input_objects, input_objects::dsl as input_objects_dsl, move_calls, - move_calls::dsl as move_calls_dsl, objects, objects::dsl as objects_dsl, objects_history, - packages, recipients, recipients::dsl as recipients_dsl, system_states, transactions, - transactions::dsl as transactions_dsl, validators, + active_addresses, addresses, checkpoints, checkpoints::dsl as checkpoints_dsl, epochs, + epochs::dsl as epochs_dsl, events, input_objects, input_objects::dsl as input_objects_dsl, + move_calls, move_calls::dsl as move_calls_dsl, objects, objects::dsl as objects_dsl, + objects_history, packages, recipients, recipients::dsl as recipients_dsl, system_states, + transactions, transactions::dsl as transactions_dsl, validators, }; use crate::store::diesel_marco::{read_only_blocking, transactional_blocking}; use crate::store::indexer_store::TemporaryCheckpointStore; @@ -1149,6 +1149,7 @@ impl IndexerStore for PgIndexerStore { events, object_changes: tx_object_changes, addresses, + active_addresses, packages, input_objects, move_calls, @@ -1199,15 +1200,48 @@ impl IndexerStore for PgIndexerStore { .collect(); persist_transaction_object_changes(conn, mutated_objects, deleted_objects, None, None)?; - // Commit indexed addresses - for addresses_chunk in addresses.chunks(PG_COMMIT_CHUNK_SIZE) { + // TODO(gegaowp): refactor to consolidate commit blocks + // Commit indexed addresses & active addresses + for address_chunk in addresses.chunks(PG_COMMIT_CHUNK_SIZE) { diesel::insert_into(addresses::table) - .values(addresses_chunk) + .values(address_chunk) .on_conflict(addresses::account_address) - .do_nothing() + .do_update() + .set(( + addresses::last_appearance_time + .eq(excluded(addresses::last_appearance_time)), + addresses::last_appearance_tx.eq(excluded(addresses::last_appearance_tx)), + )) .execute(conn) .map_err(IndexerError::from) - .context("Failed writing addresses to PostgresDB")?; + .context( + format!( + "Failed writing addresses to PostgresDB with tx {}", + address_chunk[0].last_appearance_tx + ) + .as_str(), + )?; + } + for active_address_chunk in active_addresses.chunks(PG_COMMIT_CHUNK_SIZE) { + diesel::insert_into(active_addresses::table) + .values(active_address_chunk) + .on_conflict(active_addresses::account_address) + .do_update() + .set(( + active_addresses::last_appearance_time + .eq(excluded(active_addresses::last_appearance_time)), + active_addresses::last_appearance_tx + .eq(excluded(active_addresses::last_appearance_tx)), + )) + .execute(conn) + .map_err(IndexerError::from) + .context( + format!( + "Failed writing active addresses to PostgresDB with tx {}", + active_address_chunk[0].last_appearance_tx + ) + .as_str(), + )?; } // Commit indexed packages @@ -1266,17 +1300,9 @@ impl IndexerStore for PgIndexerStore { checkpoint: &Checkpoint, transactions: &[Transaction], ) -> Result { - let trimmed_transactions = transactions - .iter() - .map(|t| { - let mut t = t.clone(); - t.trim_data(); - t - }) - .collect::>(); transactional_blocking!(&self.blocking_cp, |conn| { // Commit indexed transactions - for transaction_chunk in trimmed_transactions.chunks(PG_COMMIT_CHUNK_SIZE) { + for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE) { diesel::insert_into(transactions::table) .values(transaction_chunk) .on_conflict(transactions::transaction_digest) @@ -1337,26 +1363,10 @@ WHERE e1.epoch = e2.epoch .collect(); let (mutation_count, deletion_count) = (mutated_objects.len(), deleted_objects.len()); - - // MUSTFIX(gegaowp): trim data to reduce short-term storage consumption. - let trimmed_mutated_objects = mutated_objects - .into_iter() - .map(|mut o| { - o.trim_data(); - o - }) - .collect::>(); - let trimmed_deleted_objects = deleted_objects - .into_iter() - .map(|mut o| { - o.trim_data(); - o - }) - .collect::>(); persist_transaction_object_changes( conn, - trimmed_mutated_objects, - trimmed_deleted_objects, + mutated_objects, + deleted_objects, Some(object_mutation_latency), Some(object_deletion_latency), )?; @@ -1388,16 +1398,52 @@ WHERE e1.epoch = e2.epoch Ok(()) } - async fn persist_addresses(&self, addresses: &[Address]) -> Result<(), IndexerError> { + async fn persist_addresses( + &self, + addresses: &[Address], + active_addresses: &[ActiveAddress], + ) -> Result<(), IndexerError> { transactional_blocking!(&self.blocking_cp, |conn| { - for addresses_chunk in addresses.chunks(PG_COMMIT_CHUNK_SIZE) { + for address_chunk in addresses.chunks(PG_COMMIT_CHUNK_SIZE) { diesel::insert_into(addresses::table) - .values(addresses_chunk) + .values(address_chunk) .on_conflict(addresses::account_address) - .do_nothing() + .do_update() + .set(( + addresses::last_appearance_time + .eq(excluded(addresses::last_appearance_time)), + addresses::last_appearance_tx.eq(excluded(addresses::last_appearance_tx)), + )) + .execute(conn) + .map_err(IndexerError::from) + .context( + format!( + "Failed writing addresses to PostgresDB with tx {}", + address_chunk[0].last_appearance_tx + ) + .as_str(), + )?; + } + for active_address_chunk in active_addresses.chunks(PG_COMMIT_CHUNK_SIZE) { + diesel::insert_into(active_addresses::table) + .values(active_address_chunk) + .on_conflict(active_addresses::account_address) + .do_update() + .set(( + active_addresses::last_appearance_time + .eq(excluded(active_addresses::last_appearance_time)), + active_addresses::last_appearance_tx + .eq(excluded(active_addresses::last_appearance_tx)), + )) .execute(conn) .map_err(IndexerError::from) - .context("Failed writing addresses to PostgresDB")?; + .context( + format!( + "Failed writing active addresses to PostgresDB with tx {}", + active_address_chunk[0].last_appearance_tx + ) + .as_str(), + )?; } Ok::<(), IndexerError>(()) })?; diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index cffa814b6c13c..0f54a50d114ee 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -12,7 +12,6 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use sui_types::object::Owner; use crate::errors::IndexerError; -use crate::models::addresses::Address; use crate::models::transaction_index::{InputObject, MoveCall, Recipient}; pub struct FastPathTransactionBlockResponse { @@ -183,6 +182,12 @@ impl TryFrom for CheckpointTransactionBlockResponse } } +pub struct AddressData { + pub account_address: String, + pub transaction_digest: String, + pub timestamp_ms: i64, +} + impl CheckpointTransactionBlockResponse { pub fn get_input_objects(&self, epoch: u64) -> Result, IndexerError> { let raw_tx = self.raw_transaction.clone(); @@ -266,7 +271,7 @@ impl CheckpointTransactionBlockResponse { .collect() } - pub fn get_addresses(&self, epoch: u64, checkpoint: u64) -> Vec
{ + pub fn get_from_and_to_addresses(&self, epoch: u64, checkpoint: u64) -> Vec { let mut addresses = self .get_recipients(epoch, checkpoint) .into_iter() @@ -275,12 +280,20 @@ impl CheckpointTransactionBlockResponse { addresses.push(self.transaction.data.sender().to_string()); addresses .into_iter() - .map(|r| Address { + .map(|r| AddressData { account_address: r, - first_appearance_tx: self.digest.to_string(), - first_appearance_time: self.timestamp_ms as i64, + transaction_digest: self.digest.to_string(), + timestamp_ms: self.timestamp_ms as i64, }) - .collect::>() + .collect::>() + } + + pub fn get_from_address(&self) -> AddressData { + AddressData { + account_address: self.transaction.data.sender().to_string(), + transaction_digest: self.digest.to_string(), + timestamp_ms: self.timestamp_ms as i64, + } } }