Skip to content

Commit

Permalink
[mvr] Introduce mvr-mode on production indexer (#20271) (#20287)
Browse files Browse the repository at this point in the history
## Description 

introduce mvr-mode flag. If true, enables pruning objects_history, and
toggles committer to write only a subset of tables to db. Compatible
with the production indexer schema.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
wlmyng authored Nov 15, 2024
1 parent a5d8b0d commit b023ef8
Showing 10 changed files with 230 additions and 40 deletions.
1 change: 1 addition & 0 deletions crates/sui-indexer/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -117,6 +117,7 @@ impl BenchmarkableIndexer for BenchmarkIndexer {
None,
cancel,
Some(committed_checkpoints_tx),
false, /* mvr_mode */
)
.await
});
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -188,6 +188,7 @@ impl BackFillConfig {
const DEFAULT_CHUNK_SIZE: usize = 1000;
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Clone, Debug)]
pub enum Command {
Indexer {
@@ -199,6 +200,11 @@ pub enum Command {
pruning_options: PruningOptions,
#[command(flatten)]
upload_options: UploadOptions,
/// If true, the indexer will run in MVR mode. It will only index data to
/// `objects_snapshot`, `objects_history`, `packages`, `checkpoints`, and `epochs` to
/// support MVR queries.
#[clap(long, default_value_t = false)]
mvr_mode: bool,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ pub async fn new_handlers(
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
start_checkpoint_opt: Option<CheckpointSequenceNumber>,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
) -> Result<(CheckpointHandler, u64), IndexerError> {
let start_checkpoint = match start_checkpoint_opt {
Some(start_checkpoint) => start_checkpoint,
@@ -87,6 +88,7 @@ pub async fn new_handlers(
committed_checkpoints_tx,
start_checkpoint,
end_checkpoint_opt,
mvr_mode
));
Ok((
CheckpointHandler::new(state, metrics, indexed_checkpoint_sender),
66 changes: 40 additions & 26 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ pub async fn start_tx_checkpoint_commit_task<S>(
mut committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
@@ -70,6 +71,7 @@ where
epoch,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
batch = vec![];
@@ -91,7 +93,15 @@ where
}
}
if !batch.is_empty() {
commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await;
commit_checkpoints(
&state,
batch,
None,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
batch = vec![];
}

@@ -120,6 +130,7 @@ async fn commit_checkpoints<S>(
epoch: Option<EpochToCommit>,
metrics: &IndexerMetrics,
committed_checkpoints_tx: &mut Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) where
S: IndexerStore + Clone + Sync + Send + 'static,
{
@@ -148,15 +159,18 @@ async fn commit_checkpoints<S>(
packages,
epoch: _,
} = indexed_checkpoint;
checkpoint_batch.push(checkpoint);
tx_batch.push(transactions);
events_batch.push(events);
tx_indices_batch.push(tx_indices);
event_indices_batch.push(event_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
// In MVR mode, persist only object_history, packages, checkpoints, and epochs
if !mvr_mode {
tx_batch.push(transactions);
events_batch.push(events);
tx_indices_batch.push(tx_indices);
event_indices_batch.push(event_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
object_versions_batch.push(object_versions);
}
object_history_changes_batch.push(object_history_changes);
object_versions_batch.push(object_versions);
checkpoint_batch.push(checkpoint);
packages_batch.push(packages);
}

@@ -190,23 +204,23 @@ async fn commit_checkpoints<S>(

{
let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
let mut persist_tasks = vec![
state.persist_transactions(tx_batch),
state.persist_tx_indices(tx_indices_batch),
state.persist_events(events_batch),
state.persist_event_indices(event_indices_batch),
state.persist_displays(display_updates_batch),
state.persist_packages(packages_batch),
// TODO: There are a few ways we could make the following more memory efficient.
// 1. persist_objects and persist_object_history both call another function to make the final
// committed object list. We could call it early and share the result.
// 2. We could avoid clone by using Arc.
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_full_objects_history(object_history_changes_batch.clone()),
state.persist_objects_version(object_versions_batch.clone()),
state.persist_raw_checkpoints(raw_checkpoints_batch),
];
let mut persist_tasks = Vec::new();
// In MVR mode, persist only packages and object history
persist_tasks.push(state.persist_packages(packages_batch));
persist_tasks.push(state.persist_object_history(object_history_changes_batch.clone()));
if !mvr_mode {
persist_tasks.push(state.persist_transactions(tx_batch));
persist_tasks.push(state.persist_tx_indices(tx_indices_batch));
persist_tasks.push(state.persist_events(events_batch));
persist_tasks.push(state.persist_event_indices(event_indices_batch));
persist_tasks.push(state.persist_displays(display_updates_batch));
persist_tasks.push(state.persist_objects(object_changes_batch.clone()));
persist_tasks
.push(state.persist_full_objects_history(object_history_changes_batch.clone()));
persist_tasks.push(state.persist_objects_version(object_versions_batch.clone()));
persist_tasks.push(state.persist_raw_checkpoints(raw_checkpoints_batch));
}

if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
}
2 changes: 0 additions & 2 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -288,8 +288,6 @@ pub enum CommitterTables {
TxDigests,
TxInputObjects,
TxKinds,
TxRecipients,
TxSenders,

Checkpoints,
PrunerCpWatermark,
4 changes: 0 additions & 4 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
@@ -65,8 +65,6 @@ pub enum PrunableTable {
TxDigests,
TxInputObjects,
TxKinds,
TxRecipients,
TxSenders,

Checkpoints,
PrunerCpWatermark,
@@ -96,8 +94,6 @@ impl PrunableTable {
PrunableTable::TxDigests => tx,
PrunableTable::TxInputObjects => tx,
PrunableTable::TxKinds => tx,
PrunableTable::TxRecipients => tx,
PrunableTable::TxSenders => tx,

PrunableTable::Checkpoints => cp,
PrunableTable::PrunerCpWatermark => cp,
14 changes: 12 additions & 2 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use anyhow::Result;
use prometheus::Registry;
use tokio::sync::{oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{info, warn};

use async_trait::async_trait;
use futures::future::try_join_all;
@@ -38,9 +38,10 @@ impl Indexer {
store: PgIndexerStore,
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
retention_config: Option<RetentionConfig>,
mut retention_config: Option<RetentionConfig>,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) -> Result<(), IndexerError> {
info!(
"Sui Indexer Writer (version {:?}) started...",
@@ -67,6 +68,14 @@ impl Indexer {
)
.await?;

if mvr_mode {
warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention.");
retention_config = Some(RetentionConfig {
epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs.
overrides: Default::default(),
});
}

if let Some(retention_config) = retention_config {
let pruner = Pruner::new(store.clone(), retention_config, metrics.clone())?;
let cancel_clone = cancel.clone();
@@ -93,6 +102,7 @@ impl Indexer {
committed_checkpoints_tx,
config.start_checkpoint,
config.end_checkpoint,
mvr_mode,
)
.await?;
// Ingestion task watermarks are snapshotted once on indexer startup based on the
15 changes: 13 additions & 2 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
use clap::Parser;
use sui_indexer::backfill::backfill_runner::BackfillRunner;
use sui_indexer::benchmark::run_indexer_benchmark;
use sui_indexer::config::{Command, UploadOptions};
use sui_indexer::config::{Command, RetentionConfig, UploadOptions};
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::setup_postgres::clear_database;
use sui_indexer::db::{
@@ -46,10 +46,20 @@ async fn main() -> anyhow::Result<()> {
snapshot_config,
pruning_options,
upload_options,
mvr_mode,
} => {
// Make sure to run all migrations on startup, and also serve as a compatibility check.
run_migrations(pool.dedicated_connection().await?).await?;
let retention_config = pruning_options.load_from_file();

let retention_config = if mvr_mode {
warn!("Indexer in MVR mode is configured to prune `objects_history` to 2 epochs. The other tables have a 2000 epoch retention.");
Some(RetentionConfig {
epochs_to_keep: 2000, // epochs, roughly 5+ years. We really just care about pruning `objects_history` per the default 2 epochs.
overrides: Default::default(),
})
} else {
pruning_options.load_from_file()
};
if retention_config.is_some() {
check_prunable_tables_valid(&mut pool.get().await?).await?;
}
@@ -64,6 +74,7 @@ async fn main() -> anyhow::Result<()> {
retention_config,
CancellationToken::new(),
None,
mvr_mode,
)
.await?;
}
56 changes: 52 additions & 4 deletions crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -81,6 +81,36 @@ pub async fn start_indexer_writer_for_testing(
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
CancellationToken,
) {
start_indexer_writer_for_testing_with_mvr_mode(
db_url,
snapshot_config,
retention_config,
data_ingestion_path,
cancel,
start_checkpoint,
end_checkpoint,
false,
)
.await
}

/// Separate entrypoint for instantiating an indexer with or without MVR mode enabled. Relevant only
/// for MVR, the production indexer available through start_indexer_writer_for_testing should be
/// generally used.
pub async fn start_indexer_writer_for_testing_with_mvr_mode(
db_url: String,
snapshot_config: Option<SnapshotLagConfig>,
retention_config: Option<RetentionConfig>,
data_ingestion_path: Option<PathBuf>,
cancel: Option<CancellationToken>,
start_checkpoint: Option<u64>,
end_checkpoint: Option<u64>,
mvr_mode: bool,
) -> (
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
CancellationToken,
) {
let token = cancel.unwrap_or_default();
let snapshot_config = snapshot_config.unwrap_or(SnapshotLagConfig {
@@ -136,6 +166,7 @@ pub async fn start_indexer_writer_for_testing(
retention_config,
token_clone,
None,
mvr_mode,
)
.await
})
@@ -239,6 +270,22 @@ pub async fn set_up(
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
TempDb,
) {
set_up_on_mvr_mode(sim, data_ingestion_path, false).await
}

/// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum. With MVR
/// mode enabled, this indexer writes only to a subset of tables - `objects_snapshot`,
/// `objects_history`, `checkpoints`, `epochs`, and `packages`.
pub async fn set_up_on_mvr_mode(
sim: Arc<Simulacrum>,
data_ingestion_path: PathBuf,
mvr_mode: bool,
) -> (
JoinHandle<()>,
PgIndexerStore,
JoinHandle<Result<(), IndexerError>>,
TempDb,
) {
let database = TempDb::new().unwrap();
let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
@@ -251,14 +298,15 @@ pub async fn set_up(
.await;
});
// Starts indexer
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing_with_mvr_mode(
database.database().url().as_str().to_owned(),
None,
None,
Some(data_ingestion_path),
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
None, /* cancel */
None, /* start_checkpoint */
None, /* end_checkpoint */
mvr_mode, /* mvr_mode */
)
.await;
(server_handle, pg_store, pg_handle, database)
Loading

0 comments on commit b023ef8

Please sign in to comment.