Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[json rpc] query events based on transaction digest #20437

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5384,6 +5384,31 @@ impl TransactionKeyValueStoreTrait for AuthorityState {
.map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
.collect())
}

#[instrument(skip(self))]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
if digests.is_empty() {
return Ok(vec![]);
}
let events_digests: Vec<_> = self
.get_transaction_cache_reader()
.multi_get_executed_effects(digests)
.into_iter()
.flat_map(|t| t.map(|t| t.events_digest().cloned()))
.collect();
let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
let mut events = self
.get_transaction_cache_reader()
.multi_get_events(&non_empty_events)
.into_iter();
Ok(events_digests
.into_iter()
.flat_map(|ev| ev.map(|_| events.next()?))
.collect())
}
}

#[cfg(msim)]
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-json-rpc/src/coin_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {
use sui_types::base_types::{ObjectID, SequenceNumber, SuiAddress};
use sui_types::coin::TreasuryCap;
use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEventsDigest};
use sui_types::effects::TransactionEffects;
use sui_types::effects::{TransactionEffects, TransactionEvents};
use sui_types::error::{SuiError, SuiResult};
use sui_types::gas_coin::GAS;
use sui_types::id::UID;
Expand Down Expand Up @@ -479,6 +479,8 @@ mod tests {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(&self,digests: &[TransactionDigest]) -> SuiResult<Vec<Option<TransactionEvents>>>;
}
}

Expand Down
202 changes: 76 additions & 126 deletions crates/sui-json-rpc/src/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use sui_storage::key_value_store::TransactionKeyValueStore;
use sui_types::base_types::{ObjectID, SequenceNumber, TransactionDigest};
use sui_types::collection_types::VecMap;
use sui_types::crypto::AggregateAuthoritySignature;
use sui_types::digests::TransactionEventsDigest;
use sui_types::display::DisplayVersionUpdatedEvent;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
use sui_types::error::{SuiError, SuiObjectResponseError};
Expand Down Expand Up @@ -320,93 +319,57 @@ impl ReadApi {

if opts.show_events {
trace!("getting events");
let events_digests_list = temp_response
.values()
.filter_map(|cache_entry| match &cache_entry.effects {
Some(eff) => eff.events_digest().cloned(),
None => None,
})
.collect::<Vec<TransactionEventsDigest>>();
// filter out empty events digest, as they do not have to be read from the DB
let empty_events_digest = TransactionEvents::default().digest();
let events_digests_list = events_digests_list
.into_iter()
.filter(|d| d != &empty_events_digest)
.collect::<Vec<_>>();

let mut events_digest_to_events = if events_digests_list.is_empty() {
HashMap::new()
} else {
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
let events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events(&events_digests_list)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for events digests {events_digests_list:?}: {e:?}"
))
})?
.into_iter();

// construct a hashmap of events digests -> events for fast lookup
let events_map = events_digests_list
.into_iter()
.zip(events)
.collect::<HashMap<_, _>>();
// Double check that all events are `Some` and their digests match the key
for (events_digest, events) in events_map.iter() {
if let Some(events) = events {
if &events.digest() != events_digest {
return Err(Error::UnexpectedError(format!(
"Events digest {events_digest:?} does not match the key {:?}",
events.digest()
)));
}
} else {
return Err(Error::UnexpectedError(format!(
"Events of digest {events_digest:?} is None, but it should not be"
)));
let mut non_empty_digests = vec![];
for cache_entry in temp_response.values() {
if let Some(effects) = &cache_entry.effects {
if effects.events_digest().is_some() {
non_empty_digests.push(cache_entry.digest);
}
}
events_map
}
// fetch events from the DB with retry, retry each 0.5s for 3s
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
multiplier: 1.0,
..ExponentialBackoff::default()
};
events_digest_to_events.insert(empty_events_digest, Some(TransactionEvents::default()));
let mut events = retry(backoff, || async {
match self
.transaction_kv_store
.multi_get_events_by_tx_digests(&non_empty_digests)
.await
{
// Only return Ok when all the queried transaction events are found, otherwise retry
// until timeout, then return Err.
Ok(events) if !events.contains(&None) => Ok(events),
Ok(_) => Err(backoff::Error::transient(Error::UnexpectedError(
"Events not found, transaction execution may be incomplete.".into(),
))),
Err(e) => Err(backoff::Error::permanent(Error::UnexpectedError(format!(
"Failed to call multi_get_events: {e:?}"
)))),
}
})
.await
.map_err(|e| {
Error::UnexpectedError(format!(
"Retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
))
})?
.into_iter();

// fill cache with the events
for (_, cache_entry) in temp_response.iter_mut() {
let transaction_digest = cache_entry.digest;
if let Some(events_digest) =
cache_entry.effects.as_ref().and_then(|e| e.events_digest())
{
let events = events_digest_to_events
.get(events_digest)
.cloned()
.unwrap_or_else(|| panic!("Expect event digest {events_digest:?} to be found in cache for transaction {transaction_digest}"))
.map(|events| to_sui_transaction_events(self, cache_entry.digest, events));
match events {
Some(Ok(e)) => cache_entry.events = Some(e),
Some(Err(e)) => cache_entry.errors.push(e.to_string()),
None => {
match events.next() {
Some(Some(ev)) => {
cache_entry.events =
Some(to_sui_transaction_events(self, cache_entry.digest, ev)?)
}
None | Some(None) => {
error!("Failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}");
cache_entry.errors.push(format!(
"Failed to fetch events with event digest {events_digest:?}",
Expand Down Expand Up @@ -836,30 +799,26 @@ impl ReadApiServer for ReadApi {
}

if opts.show_events && temp_response.effects.is_some() {
// safe to unwrap because we have checked is_some
if let Some(event_digest) = temp_response.effects.as_ref().unwrap().events_digest()
{
let transaction_kv_store = self.transaction_kv_store.clone();
let event_digest = *event_digest;
let events = spawn_monitored_task!(async move {
transaction_kv_store
.get_events(event_digest)
.await
.map_err(|e| {
error!("Failed to call get transaction events for events digest: {event_digest:?} with error {e:?}");
Error::from(e)
})
})
let transaction_kv_store = self.transaction_kv_store.clone();
let events = spawn_monitored_task!(async move {
transaction_kv_store
.multi_get_events_by_tx_digests(&[digest])
.await
.map_err(Error::from)??;
match to_sui_transaction_events(self, digest, events) {
.map_err(|e| {
error!("Failed to call get transaction events for transaction: {digest:?} with error {e:?}");
Error::from(e)
})
})
.await
.map_err(Error::from)??
.pop()
.flatten();
match events {
None => temp_response.events = Some(SuiTransactionBlockEvents::default()),
Some(events) => match to_sui_transaction_events(self, digest, events) {
Ok(e) => temp_response.events = Some(e),
Err(e) => temp_response.errors.push(e.to_string()),
};
} else {
// events field will be Some if and only if `show_events` is true and
// there is no error in converting fetching events
temp_response.events = Some(SuiTransactionBlockEvents::default());
},
}
}

Expand Down Expand Up @@ -941,38 +900,29 @@ impl ReadApiServer for ReadApi {
let transaction_kv_store = self.transaction_kv_store.clone();
spawn_monitored_task!(async move{
let store = state.load_epoch_store_one_call_per_task();
let effect = transaction_kv_store
.get_fx_by_tx_digest(transaction_digest)
.await
.map_err(Error::from)?;
let events = if let Some(event_digest) = effect.events_digest() {
transaction_kv_store
.get_events(*event_digest)
let events = transaction_kv_store
.multi_get_events_by_tx_digests(&[transaction_digest])
.await
.map_err(
|e| {
error!("Failed to get transaction events for event digest {event_digest:?} with error: {e:?}");
error!("Failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
Error::StateReadError(e.into())
})?
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(
e,
*effect.transaction_digest(),
seq as u64,
None,
layout,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?
} else {
vec![]
};
Ok(events)
.pop()
.flatten();
Ok(match events {
Some(events) => events
.data
.into_iter()
.enumerate()
.map(|(seq, e)| {
let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
SuiEvent::try_from(e, transaction_digest, seq as u64, None, layout)
})
.collect::<Result<Vec<_>, _>>()
.map_err(Error::SuiError)?,
None => vec![],
})
}).await.map_err(Error::from)?
})
}
Expand Down
24 changes: 24 additions & 0 deletions crates/sui-storage/src/http_key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum Key {
CheckpointSummaryByDigest(CheckpointDigest),
TxToCheckpoint(TransactionDigest),
ObjectKey(ObjectID, VersionNumber),
EventsByTxDigest(TransactionDigest),
}

impl Key {
Expand All @@ -99,6 +100,7 @@ impl Key {
Key::CheckpointSummaryByDigest(_) => "cs",
Key::TxToCheckpoint(_) => "tx2c",
Key::ObjectKey(_, _) => "ob",
Key::EventsByTxDigest(_) => "evtx",
}
}

Expand All @@ -117,6 +119,7 @@ impl Key {
Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
Key::TxToCheckpoint(digest) => encode_digest(digest),
Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
Key::EventsByTxDigest(digest) => encode_digest(digest),
}
}

Expand Down Expand Up @@ -569,4 +572,25 @@ impl TransactionKeyValueStoreTrait for HttpKVStore {

Ok(results)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let keys = digests
.iter()
.map(|digest| Key::EventsByTxDigest(*digest))
.collect::<Vec<_>>();
Ok(self
.multi_fetch(keys)
.await
.iter()
.zip(digests.iter())
.map(map_fetch)
.map(|maybe_bytes| {
maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
})
.collect::<Vec<_>>())
}
}
30 changes: 30 additions & 0 deletions crates/sui-storage/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ impl TransactionKeyValueStore {
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
self.inner.multi_get_transaction_checkpoint(digests).await
}

pub async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
self.inner.multi_get_events_by_tx_digests(digests).await
}
}

/// Immutable key/value store trait for storing/retrieving transactions, effects, and events.
Expand Down Expand Up @@ -454,6 +461,11 @@ pub trait TransactionKeyValueStoreTrait {
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>>;

async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>>;
}

/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any key for which the
Expand Down Expand Up @@ -630,6 +642,24 @@ impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {

Ok(res)
}

#[instrument(level = "trace", skip_all)]
async fn multi_get_events_by_tx_digests(
&self,
digests: &[TransactionDigest],
) -> SuiResult<Vec<Option<TransactionEvents>>> {
let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
let (fallback, indices) = find_fallback(&res, digests);
if fallback.is_empty() {
return Ok(res);
}
let secondary_res = self
.fallback
.multi_get_events_by_tx_digests(&fallback)
.await?;
merge_res(&mut res, secondary_res, &indices);
Ok(res)
}
}

fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
Expand Down
Loading
Loading