From 6272cf499b288a58adc00247c25f21a7ff05c1d8 Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Wed, 23 Oct 2024 13:36:23 -0700 Subject: [PATCH] indexer grpc fullnode now consider both indexer ledger version + table info version to have strong consistency. --- .../indexer-grpc-fullnode/src/stream_coordinator.rs | 12 +++++++++++- storage/indexer/src/indexer_reader.rs | 7 +++++++ types/src/indexer/indexer_db_reader.rs | 1 + 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index ce190dc2d2e4b1..45c2523e18fb04 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -500,7 +500,17 @@ impl IndexerStreamCoordinator { pub fn set_highest_known_version(&mut self) -> anyhow::Result<()> { let info = self.context.get_latest_ledger_info_wrapped()?; - self.highest_known_version = info.ledger_version.0; + let latest_table_info_version = self + .context + .indexer_reader + .as_ref() + .expect("Table info reader not set") + .get_latest_table_info_ledger_version()? + .expect("Table info ledger version not set"); + + self.highest_known_version = + std::cmp::min(info.ledger_version.0, latest_table_info_version); + Ok(()) } diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs index 535d043d50119a..f82b1ba6aad8a0 100644 --- a/storage/indexer/src/indexer_reader.rs +++ b/storage/indexer/src/indexer_reader.rs @@ -58,6 +58,13 @@ impl IndexerReader for IndexerReaders { anyhow::bail!("DB indexer reader is not available") } + fn get_latest_table_info_ledger_version(&self) -> anyhow::Result> { + if let Some(table_info_reader) = &self.table_info_reader { + return Ok(Some(table_info_reader.next_version())); + } + anyhow::bail!("Table info reader is not available") + } + fn get_events( &self, event_key: &EventKey, diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs index 164b927de37dc8..0ab962c826374e 100644 --- a/types/src/indexer/indexer_db_reader.rs +++ b/types/src/indexer/indexer_db_reader.rs @@ -60,6 +60,7 @@ pub trait IndexerReader: Send + Sync { ) -> Result> + '_>>; fn get_latest_internal_indexer_ledger_version(&self) -> Result>; + fn get_latest_table_info_ledger_version(&self) -> Result>; #[cfg(any(test, feature = "fuzzing"))] fn wait_for_internal_indexer(&self, version: Version) -> Result<()> {