Skip to content

Commit

Permalink
indexer fix: chunk to avoid PG parameter limit (#19754)
Browse files Browse the repository at this point in the history
## Description 

- fixed a bug that caused mainnet indexer to stop, also got reported in
#19542, specifically here when
tx has many input objects / affected objects / recipients / affected
addresses etc. the expanded query will exceed the PG parameter limit of
65535
- also added ingestion tests for big tx indices & event indices and
better error tracing


## Test plan 

added ingestion tests for tx and event indices

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Oct 8, 2024
1 parent 037f13e commit a7863b7
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 98 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"

[dependencies]
anyhow.workspace = true
rand = "0.8.5"
async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
Expand Down
253 changes: 156 additions & 97 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,48 +926,73 @@ impl PgIndexerStore {

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(event_emit_package::table)
.values(&event_emit_packages)
.on_conflict_do_nothing()
.execute(conn)
.await?;

diesel::insert_into(event_emit_module::table)
.values(&event_emit_modules)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_emit_packages_chunk in
event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_emit_package::table)
.values(event_emit_packages_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_senders::table)
.values(&event_senders)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_emit_modules_chunk in
event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_emit_module::table)
.values(event_emit_modules_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_package::table)
.values(&event_struct_packages)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(event_senders::table)
.values(event_senders_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_module::table)
.values(&event_struct_modules)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_packages_chunk in
event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_package::table)
.values(event_struct_packages_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_name::table)
.values(&event_struct_names)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_modules_chunk in
event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_module::table)
.values(event_struct_modules_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_instantiation::table)
.values(&event_struct_instantiations)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_names_chunk in
event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_name::table)
.values(event_struct_names_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

for event_struct_instantiations_chunk in
event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_instantiation::table)
.values(event_struct_instantiations_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
Ok(())
}
.scope_boxed()
Expand Down Expand Up @@ -1056,71 +1081,99 @@ impl PgIndexerStore {

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(tx_affected_addresses::table)
.values(&affected_addresses)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for affected_addresses_chunk in
affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_affected_addresses::table)
.values(affected_addresses_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_affected_objects::table)
.values(&affected_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for affected_objects_chunk in
affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_affected_objects::table)
.values(affected_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_senders::table)
.values(&senders)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_senders::table)
.values(senders_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_recipients::table)
.values(&recipients)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_recipients::table)
.values(recipients_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_input_objects::table)
.values(&input_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_input_objects::table)
.values(input_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_changed_objects::table)
.values(&changed_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for changed_objects_chunk in
changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_changed_objects::table)
.values(changed_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_pkg::table)
.values(&pkgs)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_pkg::table)
.values(pkgs_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_mod::table)
.values(&mods)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_mod::table)
.values(mods_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_fun::table)
.values(&funs)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_fun::table)
.values(funs_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_digests::table)
.values(&digests)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_digests::table)
.values(digests_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_kinds::table)
.values(&kinds)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_kinds::table)
.values(kinds_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

Ok(())
}
Expand Down Expand Up @@ -1902,9 +1955,12 @@ impl IndexerStore for PgIndexerStore {
"Failed to persist all event_indices chunks: {:?}",
e
))
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} event_indices chunks", len);
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} event_indices chunks", len);
})
.tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?;
Ok(())
}

Expand Down Expand Up @@ -1932,9 +1988,12 @@ impl IndexerStore for PgIndexerStore {
"Failed to persist all tx_indices chunks: {:?}",
e
))
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} tx_indices chunks", len);
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} tx_indices chunks", len);
})
.tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?;
Ok(())
}

Expand Down
57 changes: 56 additions & 1 deletion crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use move_core_types::language_storage::StructTag;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use sui_json_rpc_types::{
Expand All @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag;
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
use sui_types::transaction::SenderSignedData;

use crate::errors::IndexerError;

pub type IndexerResult<T> = Result<T, IndexerError>;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -254,6 +256,24 @@ pub struct EventIndex {
pub type_instantiation: String,
}

// for ingestion test
impl EventIndex {
pub fn random() -> Self {
let mut rng = rand::thread_rng();
EventIndex {
tx_sequence_number: rng.gen(),
event_sequence_number: rng.gen(),
sender: SuiAddress::random_for_testing_only(),
emit_package: ObjectID::random(),
emit_module: rng.gen::<u64>().to_string(),
type_package: ObjectID::random(),
type_module: rng.gen::<u64>().to_string(),
type_name: rng.gen::<u64>().to_string(),
type_instantiation: rng.gen::<u64>().to_string(),
}
}
}

impl EventIndex {
pub fn from_event(
tx_sequence_number: u64,
Expand Down Expand Up @@ -414,6 +434,41 @@ pub struct TxIndex {
pub move_calls: Vec<(ObjectID, String, String)>,
}

impl TxIndex {
pub fn random() -> Self {
let mut rng = rand::thread_rng();
TxIndex {
tx_sequence_number: rng.gen(),
tx_kind: if rng.gen_bool(0.5) {
TransactionKind::SystemTransaction
} else {
TransactionKind::ProgrammableTransaction
},
transaction_digest: TransactionDigest::random(),
checkpoint_sequence_number: rng.gen(),
input_objects: (0..1000).map(|_| ObjectID::random()).collect(),
changed_objects: (0..1000).map(|_| ObjectID::random()).collect(),
affected_objects: (0..1000).map(|_| ObjectID::random()).collect(),
payers: (0..rng.gen_range(0..100))
.map(|_| SuiAddress::random_for_testing_only())
.collect(),
sender: SuiAddress::random_for_testing_only(),
recipients: (0..rng.gen_range(0..1000))
.map(|_| SuiAddress::random_for_testing_only())
.collect(),
move_calls: (0..rng.gen_range(0..1000))
.map(|_| {
(
ObjectID::random(),
rng.gen::<u64>().to_string(),
rng.gen::<u64>().to_string(),
)
})
.collect(),
}
}
}

// ObjectChange is not bcs deserializable, IndexedObjectChange is.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit a7863b7

Please sign in to comment.