Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer fix: chunk to avoid PG parameter limit #19754

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"

[dependencies]
anyhow.workspace = true
rand = "0.8.5"
async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
Expand Down
253 changes: 156 additions & 97 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,48 +910,73 @@ impl PgIndexerStore {

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(event_emit_package::table)
.values(&event_emit_packages)
.on_conflict_do_nothing()
.execute(conn)
.await?;

diesel::insert_into(event_emit_module::table)
.values(&event_emit_modules)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_emit_packages_chunk in
event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anticipated q about PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX, we do not want to make this an env var / arg, b/c this is a protection to ensure commits can go thru, thus do not want operators to override, that might cause issues

{
diesel::insert_into(event_emit_package::table)
.values(event_emit_packages_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_senders::table)
.values(&event_senders)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_emit_modules_chunk in
event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_emit_module::table)
.values(event_emit_modules_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_package::table)
.values(&event_struct_packages)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(event_senders::table)
.values(event_senders_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_module::table)
.values(&event_struct_modules)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_packages_chunk in
event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_package::table)
.values(event_struct_packages_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_name::table)
.values(&event_struct_names)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_modules_chunk in
event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_module::table)
.values(event_struct_modules_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(event_struct_instantiation::table)
.values(&event_struct_instantiations)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for event_struct_names_chunk in
event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_name::table)
.values(event_struct_names_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

for event_struct_instantiations_chunk in
event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(event_struct_instantiation::table)
.values(event_struct_instantiations_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
Ok(())
}
.scope_boxed()
Expand Down Expand Up @@ -1040,71 +1065,99 @@ impl PgIndexerStore {

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(tx_affected_addresses::table)
.values(&affected_addresses)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for affected_addresses_chunk in
affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_affected_addresses::table)
.values(affected_addresses_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_affected_objects::table)
.values(&affected_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for affected_objects_chunk in
affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_affected_objects::table)
.values(affected_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_senders::table)
.values(&senders)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_senders::table)
.values(senders_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_recipients::table)
.values(&recipients)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_recipients::table)
.values(recipients_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_input_objects::table)
.values(&input_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_input_objects::table)
.values(input_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_changed_objects::table)
.values(&changed_objects)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for changed_objects_chunk in
changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(tx_changed_objects::table)
.values(changed_objects_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_pkg::table)
.values(&pkgs)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_pkg::table)
.values(pkgs_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_mod::table)
.values(&mods)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_mod::table)
.values(mods_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_calls_fun::table)
.values(&funs)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_calls_fun::table)
.values(funs_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_digests::table)
.values(&digests)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_digests::table)
.values(digests_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

diesel::insert_into(tx_kinds::table)
.values(&kinds)
.on_conflict_do_nothing()
.execute(conn)
.await?;
for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
diesel::insert_into(tx_kinds::table)
.values(kinds_chunk)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}

Ok(())
}
Expand Down Expand Up @@ -1878,9 +1931,12 @@ impl IndexerStore for PgIndexerStore {
"Failed to persist all event_indices chunks: {:?}",
e
))
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} event_indices chunks", len);
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} event_indices chunks", len);
})
.tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?;
Ok(())
}

Expand Down Expand Up @@ -1908,9 +1964,12 @@ impl IndexerStore for PgIndexerStore {
"Failed to persist all tx_indices chunks: {:?}",
e
))
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} tx_indices chunks", len);
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} tx_indices chunks", len);
})
.tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?;
Ok(())
}

Expand Down
57 changes: 56 additions & 1 deletion crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use move_core_types::language_storage::StructTag;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use sui_json_rpc_types::{
Expand All @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag;
use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
use sui_types::transaction::SenderSignedData;

use crate::errors::IndexerError;

pub type IndexerResult<T> = Result<T, IndexerError>;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -254,6 +256,24 @@ pub struct EventIndex {
pub type_instantiation: String,
}

// for ingestion test
impl EventIndex {
pub fn random() -> Self {
let mut rng = rand::thread_rng();
EventIndex {
tx_sequence_number: rng.gen(),
event_sequence_number: rng.gen(),
sender: SuiAddress::random_for_testing_only(),
emit_package: ObjectID::random(),
emit_module: rng.gen::<u64>().to_string(),
type_package: ObjectID::random(),
type_module: rng.gen::<u64>().to_string(),
type_name: rng.gen::<u64>().to_string(),
type_instantiation: rng.gen::<u64>().to_string(),
}
}
}

impl EventIndex {
pub fn from_event(
tx_sequence_number: u64,
Expand Down Expand Up @@ -414,6 +434,41 @@ pub struct TxIndex {
pub move_calls: Vec<(ObjectID, String, String)>,
}

impl TxIndex {
pub fn random() -> Self {
let mut rng = rand::thread_rng();
TxIndex {
tx_sequence_number: rng.gen(),
tx_kind: if rng.gen_bool(0.5) {
TransactionKind::SystemTransaction
} else {
TransactionKind::ProgrammableTransaction
},
transaction_digest: TransactionDigest::random(),
checkpoint_sequence_number: rng.gen(),
input_objects: (0..1000).map(|_| ObjectID::random()).collect(),
changed_objects: (0..1000).map(|_| ObjectID::random()).collect(),
affected_objects: (0..1000).map(|_| ObjectID::random()).collect(),
payers: (0..rng.gen_range(0..100))
.map(|_| SuiAddress::random_for_testing_only())
.collect(),
sender: SuiAddress::random_for_testing_only(),
recipients: (0..rng.gen_range(0..1000))
.map(|_| SuiAddress::random_for_testing_only())
.collect(),
move_calls: (0..rng.gen_range(0..1000))
.map(|_| {
(
ObjectID::random(),
rng.gen::<u64>().to_string(),
rng.gen::<u64>().to_string(),
)
})
.collect(),
}
}
}

// ObjectChange is not bcs deserializable, IndexedObjectChange is.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
Expand Down
Loading
Loading