From b48bd804c528f77ec0318c34c2ce64ce65d44b2e Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Tue, 8 Oct 2024 15:45:17 -0400 Subject: [PATCH] indexer fix: chunk to avoid PG paramter limit --- Cargo.lock | 1 + crates/sui-indexer/Cargo.toml | 1 + .../sui-indexer/src/store/pg_indexer_store.rs | 253 +++++++++++------- crates/sui-indexer/src/types.rs | 57 +++- crates/sui-indexer/tests/ingestion_tests.rs | 39 +++ 5 files changed, 253 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67e3fec556ebd..8d9c7a81f62b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13677,6 +13677,7 @@ dependencies = [ "ntest", "object_store", "prometheus", + "rand 0.8.5", "rayon", "regex", "serde", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 3442cf46fa20d..fa4490741b163 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index bc2b0131ff1b0..7989b8e0116d2 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -910,48 +910,73 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(event_emit_package::table) - .values(&event_emit_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; - - diesel::insert_into(event_emit_module::table) - .values(&event_emit_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_packages_chunk in + event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_package::table) + .values(event_emit_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_senders::table) - .values(&event_senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_modules_chunk in + event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_module::table) + .values(event_emit_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_package::table) - .values(&event_struct_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(event_senders::table) + .values(event_senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_module::table) - .values(&event_struct_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_packages_chunk in + event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_package::table) + .values(event_struct_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_name::table) - .values(&event_struct_names) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_modules_chunk in + event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_module::table) + .values(event_struct_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_instantiation::table) - .values(&event_struct_instantiations) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_names_chunk in + event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_name::table) + .values(event_struct_names_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + for event_struct_instantiations_chunk in + event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_instantiation::table) + .values(event_struct_instantiations_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } .scope_boxed() @@ -1040,71 +1065,99 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(tx_affected_addresses::table) - .values(&affected_addresses) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_addresses_chunk in + affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_addresses::table) + .values(affected_addresses_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_affected_objects::table) - .values(&affected_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_objects_chunk in + affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_objects::table) + .values(affected_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_senders::table) - .values(&senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_senders::table) + .values(senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_recipients::table) - .values(&recipients) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_recipients::table) + .values(recipients_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_input_objects::table) - .values(&input_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_input_objects::table) + .values(input_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_changed_objects::table) - .values(&changed_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for changed_objects_chunk in + changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_changed_objects::table) + .values(changed_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_pkg::table) - .values(&pkgs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_pkg::table) + .values(pkgs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_mod::table) - .values(&mods) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_mod::table) + .values(mods_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_fun::table) - .values(&funs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_fun::table) + .values(funs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_digests::table) - .values(&digests) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_digests::table) + .values(digests_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_kinds::table) - .values(&kinds) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_kinds::table) + .values(kinds_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } @@ -1878,9 +1931,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all event_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?; Ok(()) } @@ -1908,9 +1964,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all tx_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?; Ok(()) } diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 33bca14214125..5c2c042dde803 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::IndexerError; use move_core_types::language_storage::StructTag; +use rand::Rng; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_json_rpc_types::{ @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::SenderSignedData; +use crate::errors::IndexerError; + pub type IndexerResult = Result; #[derive(Debug, Default)] @@ -254,6 +256,24 @@ pub struct EventIndex { pub type_instantiation: String, } +// for ingestion test +impl EventIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + EventIndex { + tx_sequence_number: rng.gen(), + event_sequence_number: rng.gen(), + sender: SuiAddress::random_for_testing_only(), + emit_package: ObjectID::random(), + emit_module: rng.gen::().to_string(), + type_package: ObjectID::random(), + type_module: rng.gen::().to_string(), + type_name: rng.gen::().to_string(), + type_instantiation: rng.gen::().to_string(), + } + } +} + impl EventIndex { pub fn from_event( tx_sequence_number: u64, @@ -414,6 +434,41 @@ pub struct TxIndex { pub move_calls: Vec<(ObjectID, String, String)>, } +impl TxIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + TxIndex { + tx_sequence_number: rng.gen(), + tx_kind: if rng.gen_bool(0.5) { + TransactionKind::SystemTransaction + } else { + TransactionKind::ProgrammableTransaction + }, + transaction_digest: TransactionDigest::random(), + checkpoint_sequence_number: rng.gen(), + input_objects: (0..1000).map(|_| ObjectID::random()).collect(), + changed_objects: (0..1000).map(|_| ObjectID::random()).collect(), + affected_objects: (0..1000).map(|_| ObjectID::random()).collect(), + payers: (0..rng.gen_range(0..100)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + sender: SuiAddress::random_for_testing_only(), + recipients: (0..rng.gen_range(0..1000)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + move_calls: (0..rng.gen_range(0..1000)) + .map(|_| { + ( + ObjectID::random(), + rng.gen::().to_string(), + rng.gen::().to_string(), + ) + }) + .collect(), + } + } +} + // ObjectChange is not bcs deserializable, IndexedObjectChange is. #[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index 57eaaa7286d5a..c0a69a0862263 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -11,7 +11,10 @@ use sui_indexer::models::{ objects::StoredObject, objects::StoredObjectSnapshot, transactions::StoredTransaction, }; use sui_indexer::schema::{objects, objects_snapshot, transactions}; +use sui_indexer::store::indexer_store::IndexerStore; use sui_indexer::test_utils::{set_up, wait_for_checkpoint, wait_for_objects_snapshot}; +use sui_indexer::types::EventIndex; +use sui_indexer::types::TxIndex; use sui_types::base_types::SuiAddress; use sui_types::effects::TransactionEffectsAPI; use sui_types::gas_coin::GasCoin; @@ -174,3 +177,39 @@ pub async fn test_objects_snapshot() -> Result<(), IndexerError> { assert_eq!(snapshot_object.owner_id, Some(gas_owner_id.to_vec())); Ok(()) } + +// test insert large batch of tx_indices +#[tokio::test] +pub async fn test_insert_large_batch_tx_indices() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; + + let mut v = Vec::new(); + for _ in 0..1000 { + v.push(TxIndex::random()); + } + pg_store.persist_tx_indices(v).await?; + Ok(()) +} + +// test insert large batch of event_indices +#[tokio::test] +pub async fn test_insert_large_batch_event_indices() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; + + let mut v = Vec::new(); + for _ in 0..1000 { + v.push(EventIndex::random()); + } + pg_store.persist_event_indices(v).await?; + Ok(()) +}