From cd214a3d7e8e9e47ccf31ab8f5315e8c114313be Mon Sep 17 00:00:00 2001 From: wlmyng <127570466+wlmyng@users.noreply.github.com> Date: Fri, 15 Nov 2024 13:21:24 -0800 Subject: [PATCH] [mvr] Introduce mvr-mode on production indexer (#20271) (#20288) ## Description introduce mvr-mode flag. If true, enables pruning objects_history, and toggles committer to write only a subset of tables to db. Compatible with the production indexer schema. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-indexer/src/benchmark.rs | 1 + crates/sui-indexer/src/config.rs | 6 + .../src/handlers/checkpoint_handler.rs | 2 + crates/sui-indexer/src/handlers/committer.rs | 66 ++++++----- crates/sui-indexer/src/handlers/mod.rs | 2 - crates/sui-indexer/src/handlers/pruner.rs | 4 - crates/sui-indexer/src/indexer.rs | 14 ++- crates/sui-indexer/src/main.rs | 15 ++- crates/sui-indexer/src/test_utils.rs | 56 +++++++++- crates/sui-indexer/tests/ingestion_tests.rs | 104 ++++++++++++++++++ 10 files changed, 230 insertions(+), 40 deletions(-) diff --git a/crates/sui-indexer/src/benchmark.rs b/crates/sui-indexer/src/benchmark.rs index 96df25cba9fa6..b5ef1cfb901e2 100644 --- a/crates/sui-indexer/src/benchmark.rs +++ b/crates/sui-indexer/src/benchmark.rs @@ -117,6 +117,7 @@ impl BenchmarkableIndexer for BenchmarkIndexer { None, cancel, Some(committed_checkpoints_tx), + false, /* mvr_mode */ ) .await }); diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index 6db349aa64747..ca53b3671e2b5 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -188,6 +188,7 @@ impl BackFillConfig { const DEFAULT_CHUNK_SIZE: usize = 1000; } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand, Clone, Debug)] pub enum Command { Indexer { @@ -199,6 +200,11 @@ pub enum Command { pruning_options: PruningOptions, #[command(flatten)] upload_options: UploadOptions, + /// If true, the indexer will run in MVR mode. It will only index data to + /// `objects_snapshot`, `objects_history`, `packages`, `checkpoints`, and `epochs` to + /// support MVR queries. + #[clap(long, default_value_t = false)] + mvr_mode: bool, }, JsonRpcService(JsonRpcConfig), ResetDatabase { diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 170bda5ff6108..74c2314604c81 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -54,6 +54,7 @@ pub async fn new_handlers( committed_checkpoints_tx: Option>>, start_checkpoint_opt: Option, end_checkpoint_opt: Option, + mvr_mode: bool, ) -> Result<(CheckpointHandler, u64), IndexerError> { let start_checkpoint = match start_checkpoint_opt { Some(start_checkpoint) => start_checkpoint, @@ -87,6 +88,7 @@ pub async fn new_handlers( committed_checkpoints_tx, start_checkpoint, end_checkpoint_opt, + mvr_mode )); Ok(( CheckpointHandler::new(state, metrics, indexed_checkpoint_sender), diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index b63e8b42a981e..8f9b9321cd655 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -28,6 +28,7 @@ pub async fn start_tx_checkpoint_commit_task( mut committed_checkpoints_tx: Option>>, mut next_checkpoint_sequence_number: CheckpointSequenceNumber, end_checkpoint_opt: Option, + mvr_mode: bool, ) -> IndexerResult<()> where S: IndexerStore + Clone + Sync + Send + 'static, @@ -70,6 +71,7 @@ where epoch, &metrics, &mut committed_checkpoints_tx, + mvr_mode, ) .await; batch = vec![]; @@ -91,7 +93,15 @@ where } } if !batch.is_empty() { - commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await; + commit_checkpoints( + &state, + batch, + None, + &metrics, + &mut committed_checkpoints_tx, + mvr_mode, + ) + .await; batch = vec![]; } @@ -120,6 +130,7 @@ async fn commit_checkpoints( epoch: Option, metrics: &IndexerMetrics, committed_checkpoints_tx: &mut Option>>, + mvr_mode: bool, ) where S: IndexerStore + Clone + Sync + Send + 'static, { @@ -148,15 +159,18 @@ async fn commit_checkpoints( packages, epoch: _, } = indexed_checkpoint; - checkpoint_batch.push(checkpoint); - tx_batch.push(transactions); - events_batch.push(events); - tx_indices_batch.push(tx_indices); - event_indices_batch.push(event_indices); - display_updates_batch.extend(display_updates.into_iter()); - object_changes_batch.push(object_changes); + // In MVR mode, persist only object_history, packages, checkpoints, and epochs + if !mvr_mode { + tx_batch.push(transactions); + events_batch.push(events); + tx_indices_batch.push(tx_indices); + event_indices_batch.push(event_indices); + display_updates_batch.extend(display_updates.into_iter()); + object_changes_batch.push(object_changes); + object_versions_batch.push(object_versions); + } object_history_changes_batch.push(object_history_changes); - object_versions_batch.push(object_versions); + checkpoint_batch.push(checkpoint); packages_batch.push(packages); } @@ -190,23 +204,23 @@ async fn commit_checkpoints( { let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer(); - let mut persist_tasks = vec![ - state.persist_transactions(tx_batch), - state.persist_tx_indices(tx_indices_batch), - state.persist_events(events_batch), - state.persist_event_indices(event_indices_batch), - state.persist_displays(display_updates_batch), - state.persist_packages(packages_batch), - // TODO: There are a few ways we could make the following more memory efficient. - // 1. persist_objects and persist_object_history both call another function to make the final - // committed object list. We could call it early and share the result. - // 2. We could avoid clone by using Arc. - state.persist_objects(object_changes_batch.clone()), - state.persist_object_history(object_history_changes_batch.clone()), - state.persist_full_objects_history(object_history_changes_batch.clone()), - state.persist_objects_version(object_versions_batch.clone()), - state.persist_raw_checkpoints(raw_checkpoints_batch), - ]; + let mut persist_tasks = Vec::new(); + // In MVR mode, persist only packages and object history + persist_tasks.push(state.persist_packages(packages_batch)); + persist_tasks.push(state.persist_object_history(object_history_changes_batch.clone())); + if !mvr_mode { + persist_tasks.push(state.persist_transactions(tx_batch)); + persist_tasks.push(state.persist_tx_indices(tx_indices_batch)); + persist_tasks.push(state.persist_events(events_batch)); + persist_tasks.push(state.persist_event_indices(event_indices_batch)); + persist_tasks.push(state.persist_displays(display_updates_batch)); + persist_tasks.push(state.persist_objects(object_changes_batch.clone())); + persist_tasks + .push(state.persist_full_objects_history(object_history_changes_batch.clone())); + persist_tasks.push(state.persist_objects_version(object_versions_batch.clone())); + persist_tasks.push(state.persist_raw_checkpoints(raw_checkpoints_batch)); + } + if let Some(epoch_data) = epoch.clone() { persist_tasks.push(state.persist_epoch(epoch_data)); } diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 403ee8e22706c..be46d71a2d848 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -288,8 +288,6 @@ pub enum CommitterTables { TxDigests, TxInputObjects, TxKinds, - TxRecipients, - TxSenders, Checkpoints, PrunerCpWatermark, diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 85b6faa12f071..228196f83bbc2 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -65,8 +65,6 @@ pub enum PrunableTable { TxDigests, TxInputObjects, TxKinds, - TxRecipients, - TxSenders, Checkpoints, PrunerCpWatermark, @@ -96,8 +94,6 @@ impl PrunableTable { PrunableTable::TxDigests => tx, PrunableTable::TxInputObjects => tx, PrunableTable::TxKinds => tx, - PrunableTable::TxRecipients => tx, - PrunableTable::TxSenders => tx, PrunableTable::Checkpoints => cp, PrunableTable::PrunerCpWatermark => cp, diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index d1819a90a7416..cde7d7c76c894 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -8,7 +8,7 @@ use anyhow::Result; use prometheus::Registry; use tokio::sync::{oneshot, watch}; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{info, warn}; use async_trait::async_trait; use futures::future::try_join_all; @@ -38,9 +38,10 @@ impl Indexer { store: PgIndexerStore, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, - retention_config: Option, + mut retention_config: Option, cancel: CancellationToken, committed_checkpoints_tx: Option>>, + mvr_mode: bool, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", @@ -67,6 +68,14 @@ impl Indexer { ) .await?; + if mvr_mode { + warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention."); + retention_config = Some(RetentionConfig { + epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs. + overrides: Default::default(), + }); + } + if let Some(retention_config) = retention_config { let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?; let cancel_clone = cancel.clone(); @@ -93,6 +102,7 @@ impl Indexer { committed_checkpoints_tx, config.start_checkpoint, config.end_checkpoint, + mvr_mode, ) .await?; // Ingestion task watermarks are snapshotted once on indexer startup based on the diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 85782cff9689e..c6cbe77860611 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -4,7 +4,7 @@ use clap::Parser; use sui_indexer::backfill::backfill_runner::BackfillRunner; use sui_indexer::benchmark::run_indexer_benchmark; -use sui_indexer::config::{Command, UploadOptions}; +use sui_indexer::config::{Command, RetentionConfig, UploadOptions}; use sui_indexer::database::ConnectionPool; use sui_indexer::db::setup_postgres::clear_database; use sui_indexer::db::{ @@ -46,10 +46,20 @@ async fn main() -> anyhow::Result<()> { snapshot_config, pruning_options, upload_options, + mvr_mode, } => { // Make sure to run all migrations on startup, and also serve as a compatibility check. run_migrations(pool.dedicated_connection().await?).await?; - let retention_config = pruning_options.load_from_file(); + + let retention_config = if mvr_mode { + warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention."); + Some(RetentionConfig { + epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs. + overrides: Default::default(), + }) + } else { + pruning_options.load_from_file() + }; if retention_config.is_some() { check_prunable_tables_valid(&mut pool.get().await?).await?; } @@ -64,6 +74,7 @@ async fn main() -> anyhow::Result<()> { retention_config, CancellationToken::new(), None, + mvr_mode, ) .await?; } diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 248183c916c59..ec18aa2348c40 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -80,6 +80,36 @@ pub async fn start_indexer_writer_for_testing( PgIndexerStore, JoinHandle>, CancellationToken, +) { + start_indexer_writer_for_testing_with_mvr_mode( + db_url, + snapshot_config, + retention_config, + data_ingestion_path, + cancel, + start_checkpoint, + end_checkpoint, + false, + ) + .await +} + +/// Separate entrypoint for instantiating an indexer with or without MVR mode enabled. Relevant only +/// for MVR, the production indexer available through start_indexer_writer_for_testing should be +/// generally used. +pub async fn start_indexer_writer_for_testing_with_mvr_mode( + db_url: String, + snapshot_config: Option, + retention_config: Option, + data_ingestion_path: Option, + cancel: Option, + start_checkpoint: Option, + end_checkpoint: Option, + mvr_mode: bool, +) -> ( + PgIndexerStore, + JoinHandle>, + CancellationToken, ) { let token = cancel.unwrap_or_default(); let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig { @@ -135,6 +165,7 @@ pub async fn start_indexer_writer_for_testing( retention_config, token_clone, None, + mvr_mode, ) .await }) @@ -238,6 +269,22 @@ pub async fn set_up( PgIndexerStore, JoinHandle>, TempDb, +) { + set_up_on_mvr_mode(sim, data_ingestion_path, false).await +} + +/// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum. With MVR +/// mode enabled, this indexer writes only to a subset of tables - `objects_snapshot`, +/// `objects_history`, `checkpoints`, `epochs`, and `packages`. +pub async fn set_up_on_mvr_mode( + sim: Arc, + data_ingestion_path: PathBuf, + mvr_mode: bool, +) -> ( + JoinHandle<()>, + PgIndexerStore, + JoinHandle>, + TempDb, ) { let database = TempDb::new().unwrap(); let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port()) @@ -250,14 +297,15 @@ pub async fn set_up( .await; }); // Starts indexer - let (pg_store, pg_handle, _) = start_indexer_writer_for_testing( + let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode( database.database().url().as_str().to_owned(), None, None, Some(data_ingestion_path), - None, /* cancel */ - None, /* start_checkpoint */ - None, /* end_checkpoint */ + None, /* cancel */ + None, /* start_checkpoint */ + None, /* end_checkpoint */ + mvr_mode, /* mvr_mode */ ) .await; (server_handle, pg_store, pg_handle, database) diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index 2b6a31286b27a..a464982ae9da7 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -1,7 +1,9 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; +use std::time::Duration; +use diesel::dsl::count_star; use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel_async::RunQueryDsl; @@ -12,8 +14,13 @@ use sui_indexer::models::{ checkpoints::StoredCheckpoint, objects::StoredObject, objects::StoredObjectSnapshot, transactions::StoredTransaction, }; +use sui_indexer::schema::epochs; +use sui_indexer::schema::events; +use sui_indexer::schema::full_objects_history; +use sui_indexer::schema::objects_history; use sui_indexer::schema::{checkpoints, objects, objects_snapshot, transactions}; use sui_indexer::store::indexer_store::IndexerStore; +use sui_indexer::test_utils::set_up_on_mvr_mode; use sui_indexer::test_utils::{ set_up, set_up_with_start_and_end_checkpoints, wait_for_checkpoint, wait_for_objects_snapshot, }; @@ -339,3 +346,100 @@ pub async fn test_epoch_boundary() -> Result<(), IndexerError> { assert_eq!(db_checkpoint.epoch, 1); Ok(()) } + +#[tokio::test] +pub async fn test_mvr_mode() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + // Create 3 checkpoints and epochs of sequence number 0 through 2 inclusive + for _ in 0..=2 { + let transfer_recipient = SuiAddress::random_for_testing_only(); + let (transaction, _) = sim.transfer_txn(transfer_recipient); + let (_, err) = sim.execute_transaction(transaction.clone()).unwrap(); + assert!(err.is_none()); + + // creates checkpoint and advances epoch + sim.advance_epoch(true); + } + + sim.create_checkpoint(); // advance to checkpoint 4 to stabilize indexer + + let (_, pg_store, _, _database) = + set_up_on_mvr_mode(Arc::new(sim), data_ingestion_path, true).await; + wait_for_checkpoint(&pg_store, 4).await?; + let mut connection = pg_store.pool().dedicated_connection().await.unwrap(); + let db_checkpoint: StoredCheckpoint = checkpoints::table + .order(checkpoints::sequence_number.desc()) + .first::(&mut connection) + .await + .expect("Failed reading checkpoint from PostgresDB"); + let db_epoch = epochs::table + .order(epochs::epoch.desc()) + .select(epochs::epoch) + .first::(&mut connection) + .await + .expect("Failed reading epoch from PostgresDB"); + + assert_eq!(db_checkpoint.sequence_number, 4); + assert_eq!(db_checkpoint.epoch, db_epoch); + + // Check that other tables have not been written to + assert_eq!( + 0_i64, + transactions::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + assert_eq!( + 0_i64, + events::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + assert_eq!( + 0_i64, + full_objects_history::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + + // Check that objects_history is being correctly pruned. At epoch 3, we should only have data + // between 2 and 3 inclusive. + loop { + let history_objects = objects_history::table + .select(objects_history::checkpoint_sequence_number) + .load::(&mut connection) + .await?; + + let has_invalid_entries = history_objects.iter().any(|&elem| elem < 2); + + if !has_invalid_entries { + // No more invalid entries found, exit the loop + break; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // After the loop, verify all entries are within expected range + let final_check = objects_history::table + .select(objects_history::checkpoint_sequence_number) + .order_by(objects_history::checkpoint_sequence_number.asc()) + .load::(&mut connection) + .await?; + + for elem in final_check { + assert!(elem >= 2); + } + + Ok(()) +}