Skip to content

Commit

Permalink
[cherry-pick] indexer: handle failed tx block in TPS (#11922) (#11926)
Browse files Browse the repository at this point in the history
picked several indexer changes onto 1.1
#11848 
#11864 
#11922
  • Loading branch information
gegaowp authored May 11, 2023
1 parent f2c6bd3 commit 3efe985
Show file tree
Hide file tree
Showing 17 changed files with 340 additions and 147 deletions.
4 changes: 4 additions & 0 deletions crates/sui-indexer/benches/indexer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore {
total_storage_rebate: i64::MAX,
total_transaction_blocks: 1000,
total_transactions: 1000,
total_successful_transaction_blocks: 1000,
total_successful_transactions: 1000,
network_total_transactions: 0,
timestamp_ms: Utc::now().timestamp_millis(),
},
Expand All @@ -89,6 +91,7 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore {
deleted_objects: vec![],
}],
addresses: vec![],
active_addresses: vec![],
packages: vec![],
input_objects: vec![],
move_calls: vec![],
Expand Down Expand Up @@ -122,6 +125,7 @@ fn create_transaction(sequence_number: i64) -> Transaction {
timestamp_ms: Some(Utc::now().timestamp_millis()),
transaction_kind: "test".to_string(),
transaction_count: 0,
execution_success: true,
created: vec![],
mutated: vec![],
deleted: vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ CREATE TABLE transactions (
timestamp_ms BIGINT,
transaction_kind TEXT NOT NULL,
transaction_count BIGINT NOT NULL,
execution_success BOOLEAN NOT NULL,
-- object related
created TEXT[] NOT NULL,
mutated TEXT[] NOT NULL,
Expand Down Expand Up @@ -54,3 +55,4 @@ CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest
CREATE INDEX transactions_timestamp_ms ON transactions (timestamp_ms);
CREATE INDEX transactions_sender ON transactions (sender);
CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number);
CREATE INDEX transactions_execution_success ON transactions (execution_success);
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP TABLE IF EXISTS addresses;
DROP TABLE IF EXISTS active_addresses;
14 changes: 12 additions & 2 deletions crates/sui-indexer/migrations/2022-11-28-204251_addresses/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ CREATE TABLE addresses
(
account_address address PRIMARY KEY,
first_appearance_tx base58digest NOT NULL,
first_appearance_time BIGINT NOT NULL
first_appearance_time BIGINT NOT NULL,
last_appearance_tx base58digest NOT NULL,
last_appearance_time BIGINT NOT NULL
);

CREATE TABLE active_addresses
(
account_address address PRIMARY KEY,
first_appearance_tx base58digest NOT NULL,
first_appearance_time BIGINT NOT NULL,
last_appearance_tx base58digest NOT NULL,
last_appearance_time BIGINT NOT NULL
);
CREATE INDEX addresses_account_address ON addresses (account_address);
56 changes: 28 additions & 28 deletions crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,34 @@ CREATE INDEX objects_owner_address ON objects (owner_type, owner_address);
CREATE INDEX objects_tx_digest ON objects (previous_transaction);

-- NOTE(gegaowp): remove object history so that it will not be created over DB reset / migration run.
-- CREATE TABLE objects_history
-- (
-- epoch BIGINT NOT NULL,
-- checkpoint BIGINT NOT NULL,
-- object_id address NOT NULL,
-- version BIGINT NOT NULL,
-- object_digest base58digest NOT NULL,
-- owner_type owner_type NOT NULL,
-- owner_address address,
-- old_owner_type owner_type,
-- old_owner_address address,
-- initial_shared_version BIGINT,
-- previous_transaction base58digest NOT NULL,
-- object_type VARCHAR NOT NULL,
-- object_status object_status NOT NULL,
-- has_public_transfer BOOLEAN NOT NULL,
-- storage_rebate BIGINT NOT NULL,
-- bcs bcs_bytes[] NOT NULL,
-- CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint)
-- ) PARTITION BY RANGE (checkpoint);
-- CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint);
-- CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version);
-- CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address);
-- CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address);
-- -- fast-path partition for the most recent objects before checkpoint, range is half-open.
-- -- partition name need to match regex of '.*(_partition_)\d+'.
-- CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0);
-- CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);
CREATE TABLE objects_history
(
epoch BIGINT NOT NULL,
checkpoint BIGINT NOT NULL,
object_id address NOT NULL,
version BIGINT NOT NULL,
object_digest base58digest NOT NULL,
owner_type owner_type NOT NULL,
owner_address address,
old_owner_type owner_type,
old_owner_address address,
initial_shared_version BIGINT,
previous_transaction base58digest NOT NULL,
object_type VARCHAR NOT NULL,
object_status object_status NOT NULL,
has_public_transfer BOOLEAN NOT NULL,
storage_rebate BIGINT NOT NULL,
bcs bcs_bytes[] NOT NULL,
CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint)
) PARTITION BY RANGE (checkpoint);
CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint);
CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version);
CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address);
CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address);
-- fast-path partition for the most recent objects before checkpoint, range is half-open.
-- partition name need to match regex of '.*(_partition_)\d+'.
CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0);
CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);

-- CREATE OR REPLACE FUNCTION objects_modified_func() RETURNS TRIGGER AS
-- $body$
Expand Down
30 changes: 16 additions & 14 deletions crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
CREATE TABLE checkpoints
(
sequence_number BIGINT PRIMARY KEY,
checkpoint_digest VARCHAR(255) NOT NULL,
epoch BIGINT NOT NULL,
transactions TEXT[] NOT NULL,
previous_checkpoint_digest VARCHAR(255),
end_of_epoch BOOLEAN NOT NULL,
sequence_number BIGINT PRIMARY KEY,
checkpoint_digest VARCHAR(255) NOT NULL,
epoch BIGINT NOT NULL,
transactions TEXT[] NOT NULL,
previous_checkpoint_digest VARCHAR(255),
end_of_epoch BOOLEAN NOT NULL,
-- derived from GasCostSummary
total_gas_cost BIGINT NOT NULL,
total_computation_cost BIGINT NOT NULL,
total_storage_cost BIGINT NOT NULL,
total_storage_rebate BIGINT NOT NULL,
total_gas_cost BIGINT NOT NULL,
total_computation_cost BIGINT NOT NULL,
total_storage_cost BIGINT NOT NULL,
total_storage_rebate BIGINT NOT NULL,
-- derived from transaction count from genesis
total_transaction_blocks BIGINT NOT NULL,
total_transactions BIGINT NOT NULL,
network_total_transactions BIGINT NOT NULL,
total_transaction_blocks BIGINT NOT NULL,
total_transactions BIGINT NOT NULL,
total_successful_transaction_blocks BIGINT NOT NULL,
total_successful_transactions BIGINT NOT NULL,
network_total_transactions BIGINT NOT NULL,
-- number of milliseconds from the Unix epoch
timestamp_ms BIGINT NOT NULL
timestamp_ms BIGINT NOT NULL
);

CREATE INDEX checkpoints_epoch ON checkpoints (epoch);
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer/migrations/2023-03-20-041133_epoch/up.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE MATERIALIZED VIEW epoch_network_metrics as
SELECT MAX(tps_30_days) as tps_30_days
FROM (SELECT (((SUM(total_transactions) OVER w) - (FIRST_VALUE(total_transactions) OVER w))::float8 /
FROM (SELECT (((SUM(total_successful_transactions + total_transaction_blocks - total_successful_transaction_blocks) OVER w) -
(FIRST_VALUE(total_successful_transactions + total_transaction_blocks - total_successful_transaction_blocks) OVER w))::float8 /
((MAX(timestamp_ms) OVER w - MIN(timestamp_ms) OVER w)) *
1000) AS tps_30_days
FROM checkpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ CREATE TABLE at_risk_validators
);

CREATE OR REPLACE VIEW network_metrics AS
SELECT (SELECT COALESCE(SUM(transaction_count)::float8 / 10, 0)
SELECT (SELECT COALESCE(SUM(
CASE
WHEN execution_success = true THEN transaction_count
ELSE 1
END
)::float8 / 10, 0)
FROM transactions
WHERE timestamp_ms >
(SELECT timestamp_ms FROM checkpoints ORDER BY sequence_number DESC LIMIT 1) - 10000) AS current_tps,
Expand Down
84 changes: 57 additions & 27 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use sui_types::SUI_SYSTEM_ADDRESS;

use crate::errors::IndexerError;
use crate::metrics::IndexerMetrics;
use crate::models::addresses::{dedup_from_addresses, dedup_from_and_to_addresses};
use crate::models::checkpoints::Checkpoint;
use crate::models::epoch::{DBEpochInfo, SystemEpochInfoEvent};
use crate::models::objects::{DeletedObject, Object, ObjectStatus};
Expand Down Expand Up @@ -430,7 +431,8 @@ where
transactions,
events,
object_changes: _,
addresses: _,
addresses,
active_addresses,
packages: _,
input_objects: _,
move_calls: _,
Expand All @@ -455,23 +457,27 @@ where
}
});

// let addresses_handler = self.clone();
// spawn_monitored_task!(async move {
// let mut address_commit_res =
// addresses_handler.state.persist_addresses(&addresses).await;
// while let Err(e) = address_commit_res {
// warn!(
// "Indexer address commit failed with error: {:?}, retrying after {:?} milli-secs...",
// e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS
// );
// tokio::time::sleep(std::time::Duration::from_millis(
// DB_COMMIT_RETRY_INTERVAL_IN_MILLIS,
// ))
// .await;
// address_commit_res =
// addresses_handler.state.persist_addresses(&addresses).await;
// }
// });
let addresses_handler = self.clone();
spawn_monitored_task!(async move {
let mut address_commit_res = addresses_handler
.state
.persist_addresses(&addresses, &active_addresses)
.await;
while let Err(e) = address_commit_res {
warn!(
"Indexer address commit failed with error: {:?}, retrying after {:?} milli-secs...",
e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS
);
tokio::time::sleep(std::time::Duration::from_millis(
DB_COMMIT_RETRY_INTERVAL_IN_MILLIS,
))
.await;
address_commit_res = addresses_handler
.state
.persist_addresses(&addresses, &active_addresses)
.await;
}
});

// MUSTFIX(gegaowp): temp. turn off tx index table commit to reduce short-term storage consumption.
// this include recipients, input_objects and move_calls.
Expand Down Expand Up @@ -558,6 +564,7 @@ where
events: _,
object_changes: tx_object_changes,
addresses: _,
active_addresses: _,
packages,
input_objects: _,
move_calls,
Expand Down Expand Up @@ -863,13 +870,21 @@ where
.flat_map(|tx| tx.get_recipients(checkpoint.epoch, checkpoint.sequence_number))
.collect();

// // Index addresses
// let addresses = transactions
// .iter()
// .flat_map(|tx| {
// tx.get_addresses(checkpoint.epoch, checkpoint.sequence_number)
// })
// .collect();
// Index addresses
// NOTE: dedup is necessary because there are multiple transactions in a checkpoint,
// otherwise error of `ON CONFLICT DO UPDATE command cannot affect row a second time` will be thrown.
let from_and_to_address_data = transactions
.iter()
.flat_map(|tx| {
tx.get_from_and_to_addresses(checkpoint.epoch, checkpoint.sequence_number)
})
.collect();
let from_address_data = transactions
.iter()
.map(|tx| tx.get_from_address())
.collect();
let addresses = dedup_from_and_to_addresses(from_and_to_address_data);
let active_addresses = dedup_from_addresses(from_address_data);

// NOTE: Index epoch when object checkpoint index has reached the same checkpoint,
// because epoch info is based on the latest system state object by the current checkpoint.
Expand Down Expand Up @@ -1005,14 +1020,29 @@ where
};
}
let total_transactions = db_transactions.iter().map(|t| t.transaction_count).sum();
let total_successful_transaction_blocks = db_transactions
.iter()
.filter(|t| t.execution_success)
.count();
let total_successful_transactions = db_transactions
.iter()
.filter(|t| t.execution_success)
.map(|t| t.transaction_count)
.sum();

Ok((
TemporaryCheckpointStore {
checkpoint: Checkpoint::from(checkpoint, total_transactions)?,
checkpoint: Checkpoint::from(
checkpoint,
total_transactions,
total_successful_transactions,
total_successful_transaction_blocks as i64,
)?,
transactions: db_transactions,
events,
object_changes: objects_changes,
addresses: vec![],
addresses,
active_addresses,
packages,
input_objects,
move_calls,
Expand Down
Loading

0 comments on commit 3efe985

Please sign in to comment.