diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index ce190dc2d2e4b1..45c2523e18fb04 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -500,7 +500,17 @@ impl IndexerStreamCoordinator { pub fn set_highest_known_version(&mut self) -> anyhow::Result<()> { let info = self.context.get_latest_ledger_info_wrapped()?; - self.highest_known_version = info.ledger_version.0; + let latest_table_info_version = self + .context + .indexer_reader + .as_ref() + .expect("Table info reader not set") + .get_latest_table_info_ledger_version()? + .expect("Table info ledger version not set"); + + self.highest_known_version = + std::cmp::min(info.ledger_version.0, latest_table_info_version); + Ok(()) } diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs index 535d043d50119a..f82b1ba6aad8a0 100644 --- a/storage/indexer/src/indexer_reader.rs +++ b/storage/indexer/src/indexer_reader.rs @@ -58,6 +58,13 @@ impl IndexerReader for IndexerReaders { anyhow::bail!("DB indexer reader is not available") } + fn get_latest_table_info_ledger_version(&self) -> anyhow::Result> { + if let Some(table_info_reader) = &self.table_info_reader { + return Ok(Some(table_info_reader.next_version())); + } + anyhow::bail!("Table info reader is not available") + } + fn get_events( &self, event_key: &EventKey, diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs index 164b927de37dc8..0ab962c826374e 100644 --- a/types/src/indexer/indexer_db_reader.rs +++ b/types/src/indexer/indexer_db_reader.rs @@ -60,6 +60,7 @@ pub trait IndexerReader: Send + Sync { ) -> Result> + '_>>; fn get_latest_internal_indexer_ledger_version(&self) -> Result>; + fn get_latest_table_info_ledger_version(&self) -> Result>; #[cfg(any(test, feature = "fuzzing"))] fn wait_for_internal_indexer(&self, version: Version) -> Result<()> {