diff --git a/crates/sui-indexer/src/benchmark.rs b/crates/sui-indexer/src/benchmark.rs index 96df25cba9fa6..b5ef1cfb901e2 100644 --- a/crates/sui-indexer/src/benchmark.rs +++ b/crates/sui-indexer/src/benchmark.rs @@ -117,6 +117,7 @@ impl BenchmarkableIndexer for BenchmarkIndexer { None, cancel, Some(committed_checkpoints_tx), + false, /* mvr_mode */ ) .await }); diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index 6db349aa64747..ca53b3671e2b5 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -188,6 +188,7 @@ impl BackFillConfig { const DEFAULT_CHUNK_SIZE: usize = 1000; } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand, Clone, Debug)] pub enum Command { Indexer { @@ -199,6 +200,11 @@ pub enum Command { pruning_options: PruningOptions, #[command(flatten)] upload_options: UploadOptions, + /// If true, the indexer will run in MVR mode. It will only index data to + /// `objects_snapshot`, `objects_history`, `packages`, `checkpoints`, and `epochs` to + /// support MVR queries. + #[clap(long, default_value_t = false)] + mvr_mode: bool, }, JsonRpcService(JsonRpcConfig), ResetDatabase { diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 170bda5ff6108..74c2314604c81 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -54,6 +54,7 @@ pub async fn new_handlers( committed_checkpoints_tx: Option>>, start_checkpoint_opt: Option, end_checkpoint_opt: Option, + mvr_mode: bool, ) -> Result<(CheckpointHandler, u64), IndexerError> { let start_checkpoint = match start_checkpoint_opt { Some(start_checkpoint) => start_checkpoint, @@ -87,6 +88,7 @@ pub async fn new_handlers( committed_checkpoints_tx, start_checkpoint, end_checkpoint_opt, + mvr_mode )); Ok(( CheckpointHandler::new(state, metrics, indexed_checkpoint_sender), diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index b63e8b42a981e..8f9b9321cd655 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -28,6 +28,7 @@ pub async fn start_tx_checkpoint_commit_task( mut committed_checkpoints_tx: Option>>, mut next_checkpoint_sequence_number: CheckpointSequenceNumber, end_checkpoint_opt: Option, + mvr_mode: bool, ) -> IndexerResult<()> where S: IndexerStore + Clone + Sync + Send + 'static, @@ -70,6 +71,7 @@ where epoch, &metrics, &mut committed_checkpoints_tx, + mvr_mode, ) .await; batch = vec![]; @@ -91,7 +93,15 @@ where } } if !batch.is_empty() { - commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await; + commit_checkpoints( + &state, + batch, + None, + &metrics, + &mut committed_checkpoints_tx, + mvr_mode, + ) + .await; batch = vec![]; } @@ -120,6 +130,7 @@ async fn commit_checkpoints( epoch: Option, metrics: &IndexerMetrics, committed_checkpoints_tx: &mut Option>>, + mvr_mode: bool, ) where S: IndexerStore + Clone + Sync + Send + 'static, { @@ -148,15 +159,18 @@ async fn commit_checkpoints( packages, epoch: _, } = indexed_checkpoint; - checkpoint_batch.push(checkpoint); - tx_batch.push(transactions); - events_batch.push(events); - tx_indices_batch.push(tx_indices); - event_indices_batch.push(event_indices); - display_updates_batch.extend(display_updates.into_iter()); - object_changes_batch.push(object_changes); + // In MVR mode, persist only object_history, packages, checkpoints, and epochs + if !mvr_mode { + tx_batch.push(transactions); + events_batch.push(events); + tx_indices_batch.push(tx_indices); + event_indices_batch.push(event_indices); + display_updates_batch.extend(display_updates.into_iter()); + object_changes_batch.push(object_changes); + object_versions_batch.push(object_versions); + } object_history_changes_batch.push(object_history_changes); - object_versions_batch.push(object_versions); + checkpoint_batch.push(checkpoint); packages_batch.push(packages); } @@ -190,23 +204,23 @@ async fn commit_checkpoints( { let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer(); - let mut persist_tasks = vec![ - state.persist_transactions(tx_batch), - state.persist_tx_indices(tx_indices_batch), - state.persist_events(events_batch), - state.persist_event_indices(event_indices_batch), - state.persist_displays(display_updates_batch), - state.persist_packages(packages_batch), - // TODO: There are a few ways we could make the following more memory efficient. - // 1. persist_objects and persist_object_history both call another function to make the final - // committed object list. We could call it early and share the result. - // 2. We could avoid clone by using Arc. - state.persist_objects(object_changes_batch.clone()), - state.persist_object_history(object_history_changes_batch.clone()), - state.persist_full_objects_history(object_history_changes_batch.clone()), - state.persist_objects_version(object_versions_batch.clone()), - state.persist_raw_checkpoints(raw_checkpoints_batch), - ]; + let mut persist_tasks = Vec::new(); + // In MVR mode, persist only packages and object history + persist_tasks.push(state.persist_packages(packages_batch)); + persist_tasks.push(state.persist_object_history(object_history_changes_batch.clone())); + if !mvr_mode { + persist_tasks.push(state.persist_transactions(tx_batch)); + persist_tasks.push(state.persist_tx_indices(tx_indices_batch)); + persist_tasks.push(state.persist_events(events_batch)); + persist_tasks.push(state.persist_event_indices(event_indices_batch)); + persist_tasks.push(state.persist_displays(display_updates_batch)); + persist_tasks.push(state.persist_objects(object_changes_batch.clone())); + persist_tasks + .push(state.persist_full_objects_history(object_history_changes_batch.clone())); + persist_tasks.push(state.persist_objects_version(object_versions_batch.clone())); + persist_tasks.push(state.persist_raw_checkpoints(raw_checkpoints_batch)); + } + if let Some(epoch_data) = epoch.clone() { persist_tasks.push(state.persist_epoch(epoch_data)); } diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index d1819a90a7416..cde7d7c76c894 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -8,7 +8,7 @@ use anyhow::Result; use prometheus::Registry; use tokio::sync::{oneshot, watch}; use tokio_util::sync::CancellationToken; -use tracing::info; +use tracing::{info, warn}; use async_trait::async_trait; use futures::future::try_join_all; @@ -38,9 +38,10 @@ impl Indexer { store: PgIndexerStore, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, - retention_config: Option, + mut retention_config: Option, cancel: CancellationToken, committed_checkpoints_tx: Option>>, + mvr_mode: bool, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", @@ -67,6 +68,14 @@ impl Indexer { ) .await?; + if mvr_mode { + warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention."); + retention_config = Some(RetentionConfig { + epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs. + overrides: Default::default(), + }); + } + if let Some(retention_config) = retention_config { let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?; let cancel_clone = cancel.clone(); @@ -93,6 +102,7 @@ impl Indexer { committed_checkpoints_tx, config.start_checkpoint, config.end_checkpoint, + mvr_mode, ) .await?; // Ingestion task watermarks are snapshotted once on indexer startup based on the diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 85782cff9689e..c6cbe77860611 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -4,7 +4,7 @@ use clap::Parser; use sui_indexer::backfill::backfill_runner::BackfillRunner; use sui_indexer::benchmark::run_indexer_benchmark; -use sui_indexer::config::{Command, UploadOptions}; +use sui_indexer::config::{Command, RetentionConfig, UploadOptions}; use sui_indexer::database::ConnectionPool; use sui_indexer::db::setup_postgres::clear_database; use sui_indexer::db::{ @@ -46,10 +46,20 @@ async fn main() -> anyhow::Result<()> { snapshot_config, pruning_options, upload_options, + mvr_mode, } => { // Make sure to run all migrations on startup, and also serve as a compatibility check. run_migrations(pool.dedicated_connection().await?).await?; - let retention_config = pruning_options.load_from_file(); + + let retention_config = if mvr_mode { + warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention."); + Some(RetentionConfig { + epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs. + overrides: Default::default(), + }) + } else { + pruning_options.load_from_file() + }; if retention_config.is_some() { check_prunable_tables_valid(&mut pool.get().await?).await?; } @@ -64,6 +74,7 @@ async fn main() -> anyhow::Result<()> { retention_config, CancellationToken::new(), None, + mvr_mode, ) .await?; } diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 248183c916c59..ec18aa2348c40 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -80,6 +80,36 @@ pub async fn start_indexer_writer_for_testing( PgIndexerStore, JoinHandle>, CancellationToken, +) { + start_indexer_writer_for_testing_with_mvr_mode( + db_url, + snapshot_config, + retention_config, + data_ingestion_path, + cancel, + start_checkpoint, + end_checkpoint, + false, + ) + .await +} + +/// Separate entrypoint for instantiating an indexer with or without MVR mode enabled. Relevant only +/// for MVR, the production indexer available through start_indexer_writer_for_testing should be +/// generally used. +pub async fn start_indexer_writer_for_testing_with_mvr_mode( + db_url: String, + snapshot_config: Option, + retention_config: Option, + data_ingestion_path: Option, + cancel: Option, + start_checkpoint: Option, + end_checkpoint: Option, + mvr_mode: bool, +) -> ( + PgIndexerStore, + JoinHandle>, + CancellationToken, ) { let token = cancel.unwrap_or_default(); let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig { @@ -135,6 +165,7 @@ pub async fn start_indexer_writer_for_testing( retention_config, token_clone, None, + mvr_mode, ) .await }) @@ -238,6 +269,22 @@ pub async fn set_up( PgIndexerStore, JoinHandle>, TempDb, +) { + set_up_on_mvr_mode(sim, data_ingestion_path, false).await +} + +/// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum. With MVR +/// mode enabled, this indexer writes only to a subset of tables - `objects_snapshot`, +/// `objects_history`, `checkpoints`, `epochs`, and `packages`. +pub async fn set_up_on_mvr_mode( + sim: Arc, + data_ingestion_path: PathBuf, + mvr_mode: bool, +) -> ( + JoinHandle<()>, + PgIndexerStore, + JoinHandle>, + TempDb, ) { let database = TempDb::new().unwrap(); let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port()) @@ -250,14 +297,15 @@ pub async fn set_up( .await; }); // Starts indexer - let (pg_store, pg_handle, _) = start_indexer_writer_for_testing( + let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode( database.database().url().as_str().to_owned(), None, None, Some(data_ingestion_path), - None, /* cancel */ - None, /* start_checkpoint */ - None, /* end_checkpoint */ + None, /* cancel */ + None, /* start_checkpoint */ + None, /* end_checkpoint */ + mvr_mode, /* mvr_mode */ ) .await; (server_handle, pg_store, pg_handle, database) diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index 2b6a31286b27a..a464982ae9da7 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -1,7 +1,9 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use std::sync::Arc; +use std::time::Duration; +use diesel::dsl::count_star; use diesel::ExpressionMethods; use diesel::QueryDsl; use diesel_async::RunQueryDsl; @@ -12,8 +14,13 @@ use sui_indexer::models::{ checkpoints::StoredCheckpoint, objects::StoredObject, objects::StoredObjectSnapshot, transactions::StoredTransaction, }; +use sui_indexer::schema::epochs; +use sui_indexer::schema::events; +use sui_indexer::schema::full_objects_history; +use sui_indexer::schema::objects_history; use sui_indexer::schema::{checkpoints, objects, objects_snapshot, transactions}; use sui_indexer::store::indexer_store::IndexerStore; +use sui_indexer::test_utils::set_up_on_mvr_mode; use sui_indexer::test_utils::{ set_up, set_up_with_start_and_end_checkpoints, wait_for_checkpoint, wait_for_objects_snapshot, }; @@ -339,3 +346,100 @@ pub async fn test_epoch_boundary() -> Result<(), IndexerError> { assert_eq!(db_checkpoint.epoch, 1); Ok(()) } + +#[tokio::test] +pub async fn test_mvr_mode() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + // Create 3 checkpoints and epochs of sequence number 0 through 2 inclusive + for _ in 0..=2 { + let transfer_recipient = SuiAddress::random_for_testing_only(); + let (transaction, _) = sim.transfer_txn(transfer_recipient); + let (_, err) = sim.execute_transaction(transaction.clone()).unwrap(); + assert!(err.is_none()); + + // creates checkpoint and advances epoch + sim.advance_epoch(true); + } + + sim.create_checkpoint(); // advance to checkpoint 4 to stabilize indexer + + let (_, pg_store, _, _database) = + set_up_on_mvr_mode(Arc::new(sim), data_ingestion_path, true).await; + wait_for_checkpoint(&pg_store, 4).await?; + let mut connection = pg_store.pool().dedicated_connection().await.unwrap(); + let db_checkpoint: StoredCheckpoint = checkpoints::table + .order(checkpoints::sequence_number.desc()) + .first::(&mut connection) + .await + .expect("Failed reading checkpoint from PostgresDB"); + let db_epoch = epochs::table + .order(epochs::epoch.desc()) + .select(epochs::epoch) + .first::(&mut connection) + .await + .expect("Failed reading epoch from PostgresDB"); + + assert_eq!(db_checkpoint.sequence_number, 4); + assert_eq!(db_checkpoint.epoch, db_epoch); + + // Check that other tables have not been written to + assert_eq!( + 0_i64, + transactions::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + assert_eq!( + 0_i64, + events::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + assert_eq!( + 0_i64, + full_objects_history::table + .select(count_star()) + .first::(&mut connection) + .await + .expect("Failed to count * transactions") + ); + + // Check that objects_history is being correctly pruned. At epoch 3, we should only have data + // between 2 and 3 inclusive. + loop { + let history_objects = objects_history::table + .select(objects_history::checkpoint_sequence_number) + .load::(&mut connection) + .await?; + + let has_invalid_entries = history_objects.iter().any(|&elem| elem < 2); + + if !has_invalid_entries { + // No more invalid entries found, exit the loop + break; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // After the loop, verify all entries are within expected range + let final_check = objects_history::table + .select(objects_history::checkpoint_sequence_number) + .order_by(objects_history::checkpoint_sequence_number.asc()) + .load::(&mut connection) + .await?; + + for elem in final_check { + assert!(elem >= 2); + } + + Ok(()) +}