Skip to content

Commit

Permalink
[mvr] Introduce mvr-mode on production indexer (#20271)
Browse files Browse the repository at this point in the history
## Description 

introduce mvr-mode flag. If true, enables pruning objects_history, and
toggles committer to write only a subset of tables to db. Compatible
with the production indexer schema.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Nov 15, 2024
1 parent 5271252 commit 1e2f4b3
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 34 deletions.
1 change: 1 addition & 0 deletions crates/sui-indexer/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl BenchmarkableIndexer for BenchmarkIndexer {
None,
cancel,
Some(committed_checkpoints_tx),
false, /* mvr_mode */
)
.await
});
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl BackFillConfig {
const DEFAULT_CHUNK_SIZE: usize = 1000;
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Clone, Debug)]
pub enum Command {
Indexer {
Expand All @@ -199,6 +200,11 @@ pub enum Command {
pruning_options: PruningOptions,
#[command(flatten)]
upload_options: UploadOptions,
/// If true, the indexer will run in MVR mode. It will only index data to
/// `objects_snapshot`, `objects_history`, `packages`, `checkpoints`, and `epochs` to
/// support MVR queries.
#[clap(long, default_value_t = false)]
mvr_mode: bool,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub async fn new_handlers(
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
start_checkpoint_opt: Option<CheckpointSequenceNumber>,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
) -> Result<(CheckpointHandler, u64), IndexerError> {
let start_checkpoint = match start_checkpoint_opt {
Some(start_checkpoint) => start_checkpoint,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub async fn new_handlers(
committed_checkpoints_tx,
start_checkpoint,
end_checkpoint_opt,
mvr_mode
));
Ok((
CheckpointHandler::new(state, metrics, indexed_checkpoint_sender),
Expand Down
66 changes: 40 additions & 26 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub async fn start_tx_checkpoint_commit_task<S>(
mut committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand Down Expand Up @@ -70,6 +71,7 @@ where
epoch,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
batch = vec![];
Expand All @@ -91,7 +93,15 @@ where
}
}
if !batch.is_empty() {
commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await;
commit_checkpoints(
&state,
batch,
None,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
batch = vec![];
}

Expand Down Expand Up @@ -120,6 +130,7 @@ async fn commit_checkpoints<S>(
epoch: Option<EpochToCommit>,
metrics: &IndexerMetrics,
committed_checkpoints_tx: &mut Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) where
S: IndexerStore + Clone + Sync + Send + 'static,
{
Expand Down Expand Up @@ -148,15 +159,18 @@ async fn commit_checkpoints<S>(
packages,
epoch: _,
} = indexed_checkpoint;
checkpoint_batch.push(checkpoint);
tx_batch.push(transactions);
events_batch.push(events);
tx_indices_batch.push(tx_indices);
event_indices_batch.push(event_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
// In MVR mode, persist only object_history, packages, checkpoints, and epochs
if !mvr_mode {
tx_batch.push(transactions);
events_batch.push(events);
tx_indices_batch.push(tx_indices);
event_indices_batch.push(event_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
object_versions_batch.push(object_versions);
}
object_history_changes_batch.push(object_history_changes);
object_versions_batch.push(object_versions);
checkpoint_batch.push(checkpoint);
packages_batch.push(packages);
}

Expand Down Expand Up @@ -190,23 +204,23 @@ async fn commit_checkpoints<S>(

{
let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
let mut persist_tasks = vec![
state.persist_transactions(tx_batch),
state.persist_tx_indices(tx_indices_batch),
state.persist_events(events_batch),
state.persist_event_indices(event_indices_batch),
state.persist_displays(display_updates_batch),
state.persist_packages(packages_batch),
// TODO: There are a few ways we could make the following more memory efficient.
// 1. persist_objects and persist_object_history both call another function to make the final
// committed object list. We could call it early and share the result.
// 2. We could avoid clone by using Arc.
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_full_objects_history(object_history_changes_batch.clone()),
state.persist_objects_version(object_versions_batch.clone()),
state.persist_raw_checkpoints(raw_checkpoints_batch),
];
let mut persist_tasks = Vec::new();
// In MVR mode, persist only packages and object history
persist_tasks.push(state.persist_packages(packages_batch));
persist_tasks.push(state.persist_object_history(object_history_changes_batch.clone()));
if !mvr_mode {
persist_tasks.push(state.persist_transactions(tx_batch));
persist_tasks.push(state.persist_tx_indices(tx_indices_batch));
persist_tasks.push(state.persist_events(events_batch));
persist_tasks.push(state.persist_event_indices(event_indices_batch));
persist_tasks.push(state.persist_displays(display_updates_batch));
persist_tasks.push(state.persist_objects(object_changes_batch.clone()));
persist_tasks
.push(state.persist_full_objects_history(object_history_changes_batch.clone()));
persist_tasks.push(state.persist_objects_version(object_versions_batch.clone()));
persist_tasks.push(state.persist_raw_checkpoints(raw_checkpoints_batch));
}

if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
}
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::Result;
use prometheus::Registry;
use tokio::sync::{oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{info, warn};

use async_trait::async_trait;
use futures::future::try_join_all;
Expand Down Expand Up @@ -38,9 +38,10 @@ impl Indexer {
store: PgIndexerStore,
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
retention_config: Option<RetentionConfig>,
mut retention_config: Option<RetentionConfig>,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) -> Result<(), IndexerError> {
info!(
"Sui Indexer Writer (version {:?}) started...",
Expand All @@ -67,6 +68,14 @@ impl Indexer {
)
.await?;

if mvr_mode {
warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention.");
retention_config = Some(RetentionConfig {
epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs.
overrides: Default::default(),
});
}

if let Some(retention_config) = retention_config {
let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?;
let cancel_clone = cancel.clone();
Expand All @@ -93,6 +102,7 @@ impl Indexer {
committed_checkpoints_tx,
config.start_checkpoint,
config.end_checkpoint,
mvr_mode,
)
.await?;
// Ingestion task watermarks are snapshotted once on indexer startup based on the
Expand Down
15 changes: 13 additions & 2 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use clap::Parser;
use sui_indexer::backfill::backfill_runner::BackfillRunner;
use sui_indexer::benchmark::run_indexer_benchmark;
use sui_indexer::config::{Command, UploadOptions};
use sui_indexer::config::{Command, RetentionConfig, UploadOptions};
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::setup_postgres::clear_database;
use sui_indexer::db::{
Expand Down Expand Up @@ -46,10 +46,20 @@ async fn main() -> anyhow::Result<()> {
snapshot_config,
pruning_options,
upload_options,
mvr_mode,
} => {
// Make sure to run all migrations on startup, and also serve as a compatibility check.
run_migrations(pool.dedicated_connection().await?).await?;
let retention_config = pruning_options.load_from_file();

let retention_config = if mvr_mode {
warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention.");
Some(RetentionConfig {
epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs.
overrides: Default::default(),
})
} else {
pruning_options.load_from_file()
};
if retention_config.is_some() {
check_prunable_tables_valid(&mut pool.get().await?).await?;
}
Expand All @@ -64,6 +74,7 @@ async fn main() -> anyhow::Result<()> {
retention_config,
CancellationToken::new(),
None,
mvr_mode,
)
.await?;
}
Expand Down
56 changes: 52 additions & 4 deletions crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ pub async fn start_indexer_writer_for_testing(
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
CancellationToken,
) {
start_indexer_writer_for_testing_with_mvr_mode(
db_url,
snapshot_config,
retention_config,
data_ingestion_path,
cancel,
start_checkpoint,
end_checkpoint,
false,
)
.await
}

/// Separate entrypoint for instantiating an indexer with or without MVR mode enabled. Relevant only
/// for MVR, the production indexer available through start_indexer_writer_for_testing should be
/// generally used.
pub async fn start_indexer_writer_for_testing_with_mvr_mode(
db_url: String,
snapshot_config: Option<SnapshotLagConfig>,
retention_config: Option<RetentionConfig>,
data_ingestion_path: Option<PathBuf>,
cancel: Option<CancellationToken>,
start_checkpoint: Option<u64>,
end_checkpoint: Option<u64>,
mvr_mode: bool,
) -> (
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
CancellationToken,
) {
let token = cancel.unwrap_or_default();
let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig {
Expand Down Expand Up @@ -135,6 +165,7 @@ pub async fn start_indexer_writer_for_testing(
retention_config,
token_clone,
None,
mvr_mode,
)
.await
})
Expand Down Expand Up @@ -238,6 +269,22 @@ pub async fn set_up(
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
TempDb,
) {
set_up_on_mvr_mode(sim, data_ingestion_path, false).await
}

/// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum. With MVR
/// mode enabled, this indexer writes only to a subset of tables - `objects_snapshot`,
/// `objects_history`, `checkpoints`, `epochs`, and `packages`.
pub async fn set_up_on_mvr_mode(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
mvr_mode: bool,
) -> (
JoinHandle<()>,
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
TempDb,
) {
let database = TempDb::new().unwrap();
let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
Expand All @@ -250,14 +297,15 @@ pub async fn set_up(
.await;
});
// Starts indexer
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode(
database.database().url().as_str().to_owned(),
None,
None,
Some(data_ingestion_path),
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
mvr_mode, /* mvr_mode */
)
.await;
(server_handle, pg_store, pg_handle, database)
Expand Down
Loading

0 comments on commit 1e2f4b3

Please sign in to comment.