From e2df3ec4d3dd0bd99c584763f94b4663087d4b73 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:27:51 -0500 Subject: [PATCH] [json rpc] query events based on transaction digest --- crates/sui-core/src/authority.rs | 25 +++ crates/sui-json-rpc/src/coin_api.rs | 4 +- crates/sui-json-rpc/src/read_api.rs | 202 +++++++----------- .../sui-storage/src/http_key_value_store.rs | 24 +++ crates/sui-storage/src/key_value_store.rs | 30 +++ crates/sui-storage/tests/key_value_tests.rs | 7 + 6 files changed, 165 insertions(+), 127 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 9c1d442298cf2..d10bed7cffb16 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -5384,6 +5384,31 @@ impl TransactionKeyValueStoreTrait for AuthorityState { .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint)) .collect()) } + + #[instrument(skip(self))] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + if digests.is_empty() { + return Ok(vec![]); + } + let events_digests: Vec<_> = self + .get_transaction_cache_reader() + .multi_get_executed_effects(digests) + .into_iter() + .flat_map(|t| t.map(|t| t.events_digest().cloned())) + .collect(); + let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect(); + let mut events = self + .get_transaction_cache_reader() + .multi_get_events(&non_empty_events) + .into_iter(); + Ok(events_digests + .into_iter() + .flat_map(|ev| ev.map(|_| events.next()?)) + .collect()) + } } #[cfg(msim)] diff --git a/crates/sui-json-rpc/src/coin_api.rs b/crates/sui-json-rpc/src/coin_api.rs index 43a880f525513..a6b3eeafa8660 100644 --- a/crates/sui-json-rpc/src/coin_api.rs +++ b/crates/sui-json-rpc/src/coin_api.rs @@ -436,7 +436,7 @@ mod tests { use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress}; use sui_types::coin::TreasuryCap; use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEventsDigest}; - use sui_types::effects::TransactionEffects; + use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult}; use sui_types::gas_coin::GAS; use sui_types::id::UID; @@ -479,6 +479,8 @@ mod tests { &self, digests: &[TransactionDigest], ) -> SuiResult>>; + + async fn multi_get_events_by_tx_digests(&self,digests: &[TransactionDigest]) -> SuiResult>>; } } diff --git a/crates/sui-json-rpc/src/read_api.rs b/crates/sui-json-rpc/src/read_api.rs index 69921be79a625..3074e3b4ed451 100644 --- a/crates/sui-json-rpc/src/read_api.rs +++ b/crates/sui-json-rpc/src/read_api.rs @@ -40,7 +40,6 @@ use sui_storage::key_value_store::TransactionKeyValueStore; use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest}; use sui_types::collection_types::VecMap; use sui_types::crypto::AggregateAuthoritySignature; -use sui_types::digests::TransactionEventsDigest; use sui_types::display::DisplayVersionUpdatedEvent; use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents}; use sui_types::error::{SuiError, SuiObjectResponseError}; @@ -320,77 +319,44 @@ impl ReadApi { if opts.show_events { trace!("getting events"); - let events_digests_list = temp_response - .values() - .filter_map(|cache_entry| match &cache_entry.effects { - Some(eff) => eff.events_digest().cloned(), - None => None, - }) - .collect::>(); - // filter out empty events digest, as they do not have to be read from the DB - let empty_events_digest = TransactionEvents::default().digest(); - let events_digests_list = events_digests_list - .into_iter() - .filter(|d| d != &empty_events_digest) - .collect::>(); - - let mut events_digest_to_events = if events_digests_list.is_empty() { - HashMap::new() - } else { - // fetch events from the DB with retry, retry each 0.5s for 3s - let backoff = ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs(3)), - multiplier: 1.0, - ..ExponentialBackoff::default() - }; - let events = retry(backoff, || async { - match self - .transaction_kv_store - .multi_get_events(&events_digests_list) - .await - { - // Only return Ok when all the queried transaction events are found, otherwise retry - // until timeout, then return Err. - Ok(events) if !events.contains(&None) => Ok(events), - Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError( - "Events not found, transaction execution may be incomplete.".into(), - ))), - Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!( - "Failed to call multi_get_events: {e:?}" - )))), - } - }) - .await - .map_err(|e| { - Error::UnexpectedError(format!( - "Retrieving events with retry failed for events digests {events_digests_list:?}: {e:?}" - )) - })? - .into_iter(); - - // construct a hashmap of events digests -> events for fast lookup - let events_map = events_digests_list - .into_iter() - .zip(events) - .collect::>(); - // Double check that all events are `Some` and their digests match the key - for (events_digest, events) in events_map.iter() { - if let Some(events) = events { - if &events.digest() != events_digest { - return Err(Error::UnexpectedError(format!( - "Events digest {events_digest:?} does not match the key {:?}", - events.digest() - ))); - } - } else { - return Err(Error::UnexpectedError(format!( - "Events of digest {events_digest:?} is None, but it should not be" - ))); + let mut non_empty_digests = vec![]; + for cache_entry in temp_response.values() { + if let Some(effects) = &cache_entry.effects { + if effects.events_digest().is_some() { + non_empty_digests.push(cache_entry.digest); } } - events_map + } + // fetch events from the DB with retry, retry each 0.5s for 3s + let backoff = ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(3)), + multiplier: 1.0, + ..ExponentialBackoff::default() }; - events_digest_to_events.insert(empty_events_digest, Some(TransactionEvents::default())); + let mut events = retry(backoff, || async { + match self + .transaction_kv_store + .multi_get_events_by_tx_digests(&non_empty_digests) + .await + { + // Only return Ok when all the queried transaction events are found, otherwise retry + // until timeout, then return Err. + Ok(events) if !events.contains(&None) => Ok(events), + Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError( + "Events not found, transaction execution may be incomplete.".into(), + ))), + Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!( + "Failed to call multi_get_events: {e:?}" + )))), + } + }) + .await + .map_err(|e| { + Error::UnexpectedError(format!( + "Retrieving events with retry failed for transaction digests {digests:?}: {e:?}" + )) + })? + .into_iter(); // fill cache with the events for (_, cache_entry) in temp_response.iter_mut() { @@ -398,15 +364,12 @@ impl ReadApi { if let Some(events_digest) = cache_entry.effects.as_ref().and_then(|e| e.events_digest()) { - let events = events_digest_to_events - .get(events_digest) - .cloned() - .unwrap_or_else(|| panic!("Expect event digest {events_digest:?} to be found in cache for transaction {transaction_digest}")) - .map(|events| to_sui_transaction_events(self, cache_entry.digest, events)); - match events { - Some(Ok(e)) => cache_entry.events = Some(e), - Some(Err(e)) => cache_entry.errors.push(e.to_string()), - None => { + match events.next() { + Some(Some(ev)) => { + cache_entry.events = + Some(to_sui_transaction_events(self, cache_entry.digest, ev)?) + } + None | Some(None) => { error!("Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"); cache_entry.errors.push(format!( "Failed to fetch events with event digest {events_digest:?}", @@ -836,30 +799,26 @@ impl ReadApiServer for ReadApi { } if opts.show_events && temp_response.effects.is_some() { - // safe to unwrap because we have checked is_some - if let Some(event_digest) = temp_response.effects.as_ref().unwrap().events_digest() - { - let transaction_kv_store = self.transaction_kv_store.clone(); - let event_digest = *event_digest; - let events = spawn_monitored_task!(async move { - transaction_kv_store - .get_events(event_digest) - .await - .map_err(|e| { - error!("Failed to call get transaction events for events digest: {event_digest:?} with error {e:?}"); - Error::from(e) - }) - }) + let transaction_kv_store = self.transaction_kv_store.clone(); + let events = spawn_monitored_task!(async move { + transaction_kv_store + .multi_get_events_by_tx_digests(&[digest]) .await - .map_err(Error::from)??; - match to_sui_transaction_events(self, digest, events) { + .map_err(|e| { + error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}"); + Error::from(e) + }) + }) + .await + .map_err(Error::from)?? + .pop() + .flatten(); + match events { + None => temp_response.events = Some(SuiTransactionBlockEvents::default()), + Some(events) => match to_sui_transaction_events(self, digest, events) { Ok(e) => temp_response.events = Some(e), Err(e) => temp_response.errors.push(e.to_string()), - }; - } else { - // events field will be Some if and only if `show_events` is true and - // there is no error in converting fetching events - temp_response.events = Some(SuiTransactionBlockEvents::default()); + }, } } @@ -941,38 +900,29 @@ impl ReadApiServer for ReadApi { let transaction_kv_store = self.transaction_kv_store.clone(); spawn_monitored_task!(async move{ let store = state.load_epoch_store_one_call_per_task(); - let effect = transaction_kv_store - .get_fx_by_tx_digest(transaction_digest) - .await - .map_err(Error::from)?; - let events = if let Some(event_digest) = effect.events_digest() { - transaction_kv_store - .get_events(*event_digest) + let events = transaction_kv_store + .multi_get_events_by_tx_digests(&[transaction_digest]) .await .map_err( |e| { - error!("Failed to get transaction events for event digest {event_digest:?} with error: {e:?}"); + error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}"); Error::StateReadError(e.into()) })? - .data - .into_iter() - .enumerate() - .map(|(seq, e)| { - let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?; - SuiEvent::try_from( - e, - *effect.transaction_digest(), - seq as u64, - None, - layout, - ) - }) - .collect::, _>>() - .map_err(Error::SuiError)? - } else { - vec![] - }; - Ok(events) + .pop() + .flatten(); + Ok(match events { + Some(events) => events + .data + .into_iter() + .enumerate() + .map(|(seq, e)| { + let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?; + SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout) + }) + .collect::, _>>() + .map_err(Error::SuiError)?, + None => vec![], + }) }).await.map_err(Error::from)? }) } diff --git a/crates/sui-storage/src/http_key_value_store.rs b/crates/sui-storage/src/http_key_value_store.rs index 5fc9dcc3128ba..df3a0200fb362 100644 --- a/crates/sui-storage/src/http_key_value_store.rs +++ b/crates/sui-storage/src/http_key_value_store.rs @@ -84,6 +84,7 @@ pub enum Key { CheckpointSummaryByDigest(CheckpointDigest), TxToCheckpoint(TransactionDigest), ObjectKey(ObjectID, VersionNumber), + EventsByTxDigest(TransactionDigest), } impl Key { @@ -99,6 +100,7 @@ impl Key { Key::CheckpointSummaryByDigest(_) => "cs", Key::TxToCheckpoint(_) => "tx2c", Key::ObjectKey(_, _) => "ob", + Key::EventsByTxDigest(_) => "evtx", } } @@ -117,6 +119,7 @@ impl Key { Key::CheckpointSummaryByDigest(digest) => encode_digest(digest), Key::TxToCheckpoint(digest) => encode_digest(digest), Key::ObjectKey(object_id, version) => encode_object_key(object_id, version), + Key::EventsByTxDigest(digest) => encode_digest(digest), } } @@ -569,4 +572,25 @@ impl TransactionKeyValueStoreTrait for HttpKVStore { Ok(results) } + + #[instrument(level = "trace", skip_all)] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + let keys = digests + .iter() + .map(|digest| Key::EventsByTxDigest(*digest)) + .collect::>(); + Ok(self + .multi_fetch(keys) + .await + .iter() + .zip(digests.iter()) + .map(map_fetch) + .map(|maybe_bytes| { + maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes)) + }) + .collect::>()) + } } diff --git a/crates/sui-storage/src/key_value_store.rs b/crates/sui-storage/src/key_value_store.rs index bf9b62843426b..62fe68b0ead8e 100644 --- a/crates/sui-storage/src/key_value_store.rs +++ b/crates/sui-storage/src/key_value_store.rs @@ -416,6 +416,13 @@ impl TransactionKeyValueStore { ) -> SuiResult>> { self.inner.multi_get_transaction_checkpoint(digests).await } + + pub async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + self.inner.multi_get_events_by_tx_digests(digests).await + } } /// Immutable key/value store trait for storing/retrieving transactions, effects, and events. @@ -454,6 +461,11 @@ pub trait TransactionKeyValueStoreTrait { &self, digests: &[TransactionDigest], ) -> SuiResult>>; + + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>>; } /// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the @@ -630,6 +642,24 @@ impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore { Ok(res) } + + #[instrument(level = "trace", skip_all)] + async fn multi_get_events_by_tx_digests( + &self, + digests: &[TransactionDigest], + ) -> SuiResult>> { + let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?; + let (fallback, indices) = find_fallback(&res, digests); + if fallback.is_empty() { + return Ok(res); + } + let secondary_res = self + .fallback + .multi_get_events_by_tx_digests(&fallback) + .await?; + merge_res(&mut res, secondary_res, &indices); + Ok(res) + } } fn find_fallback(values: &[Option], keys: &[K]) -> (Vec, Vec) { diff --git a/crates/sui-storage/tests/key_value_tests.rs b/crates/sui-storage/tests/key_value_tests.rs index 09e550283c113..ad253b8086ad3 100644 --- a/crates/sui-storage/tests/key_value_tests.rs +++ b/crates/sui-storage/tests/key_value_tests.rs @@ -244,6 +244,13 @@ impl TransactionKeyValueStoreTrait for MockTxStore { .map(|digest| self.tx_to_checkpoint.get(digest).cloned()) .collect()) } + + async fn multi_get_events_by_tx_digests( + &self, + _: &[TransactionDigest], + ) -> SuiResult>> { + Ok(vec![]) + } } #[tokio::test]