diff --git a/Cargo.lock b/Cargo.lock index b6e8732c7cb85..68693a35b9713 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10421,6 +10421,7 @@ dependencies = [ "move-core-types", "move-package", "mysten-metrics", + "odin", "once_cell", "prometheus", "serde", diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 8c2ba76853c5d..98dc4b70d2baa 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -10,7 +10,9 @@ use itertools::Itertools; use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; -use odin::sui_ws::{CoinCreated, CoinMutated, CoinObjectUpdateStatus}; +use odin::sui_ws::{ + AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate, +}; use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate}; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex}; @@ -443,24 +445,16 @@ where // Add out custom maps let status_map = get_object_status_map(&fx); - let input_objects_to_owner = input_objects - .iter() - .map(|input_object| (input_object.id(), input_object.owner().clone())) - .collect::>(); - let output_objects_to_owner = output_objects - .iter() - .map(|output_object| (output_object.id(), output_object.owner().clone())) - .collect::>(); - let (balance_change, object_changes) = + let (balance_change, object_changes, custom_object_changes) = TxChangesProcessor::new(&objects, metrics.clone()) .custom_get_changes( tx, &fx, &tx_digest, status_map, - &input_objects_to_owner, - &output_objects_to_owner, + &input_objects, + &output_objects, ) .await?; @@ -473,6 +467,7 @@ where effects: fx.clone(), object_changes, balance_change, + custom_object_changes, events, transaction_kind, successful_tx_num: if fx.status().is_ok() { @@ -964,6 +959,8 @@ pub fn generate_ws_updates_from_checkpoint_data( let block_number = checkpoint_data.checkpoint.sequence_number; let mut ws_balance_changes: HashMap = HashMap::new(); + let mut account_object_changes: HashMap = HashMap::new(); // account address -> account object changes + let mut objects_changes: Vec = Vec::new(); // transaction balance changes for transaction in checkpoint_data.transactions.iter() { @@ -1018,13 +1015,40 @@ pub fn generate_ws_updates_from_checkpoint_data( } } } + + for (change_owner, object_change) in transaction.custom_object_changes.iter() { + if let Some(owner) = change_owner { + account_object_changes + .entry(owner.to_string()) + .or_insert(AccountObjectsUpdate { + sui_address: owner.to_string(), + sequence_number: block_number, + object_changes: HashMap::new(), + timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + }) + .object_changes + .entry(object_change.object_id.clone()) + .or_insert(object_change.clone()); + } + + objects_changes.push(object_change.clone()); + } } - return ( - checkpoint_data.checkpoint.sequence_number, - ws_balance_changes - .into_values() - .map(|v| SuiWsApiMsg::TokenBalanceUpdate(v)) - .collect(), - ); + let updates: Vec = ws_balance_changes + .into_values() + .map(|v| SuiWsApiMsg::TokenBalanceUpdate(v)) + .chain( + account_object_changes + .into_values() + .map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)), + ) + .chain( + objects_changes + .into_iter() + .map(|v| SuiWsApiMsg::ObjectUpdate(v)), + ) + .collect(); + + return (checkpoint_data.checkpoint.sequence_number, updates); } diff --git a/crates/sui-indexer/src/handlers/tx_processor.rs b/crates/sui-indexer/src/handlers/tx_processor.rs index 64f1278caf454..8bb36521182f7 100644 --- a/crates/sui-indexer/src/handlers/tx_processor.rs +++ b/crates/sui-indexer/src/handlers/tx_processor.rs @@ -7,9 +7,14 @@ use async_trait::async_trait; use mysten_metrics::monitored_scope; use mysten_metrics::spawn_monitored_task; +use odin::sui_ws::ObjectChangeUpdate; +use sui_json_rpc::custom_get_object_changes; use sui_json_rpc::get_balance_changes_with_status_from_effect; use sui_json_rpc_types::ObjectStatus; +use sui_package_resolver::PackageStore; +use sui_package_resolver::Resolver; use sui_rest_api::CheckpointData; +use sui_types::base_types::SuiAddress; use sui_types::object::Owner; use tokio::sync::watch; @@ -221,27 +226,45 @@ impl TxChangesProcessor { effects: &TransactionEffects, tx_digest: &TransactionDigest, status_map: HashMap, - input_objects_to_owner: &HashMap, - output_objects_to_owner: &HashMap, + input_objects: &Vec, + output_objects: &Vec, ) -> IndexerResult<( Vec, Vec, + Vec<(Option, ObjectChangeUpdate)>, )> { let _timer = self .metrics .indexing_tx_object_changes_latency .start_timer(); - let object_change: Vec<_> = get_object_changes( - self, - tx.sender(), - effects.modified_at_versions(), - effects.all_changed_objects(), - effects.all_removed_objects(), - ) - .await? - .into_iter() - .map(IndexedObjectChange::from) - .collect(); + let (original_object_change, custom_object_change): (Vec<_>, Vec<_>) = + custom_get_object_changes( + self, + tx.sender(), + effects.modified_at_versions(), + effects.all_changed_objects(), + effects.all_removed_objects(), + &input_objects, + &output_objects, + ) + .await?; + + let indexed_objects_changes = original_object_change + .into_iter() + .map(IndexedObjectChange::from) + .collect::>(); + + ///////////////////////////////////////////////// Moved here from the parent function index_transactions + let input_objects_to_owner = input_objects + .iter() + .map(|input_object| (input_object.id(), input_object.owner().clone())) + .collect::>(); + let output_objects_to_owner = output_objects + .iter() + .map(|output_object| (output_object.id(), output_object.owner().clone())) + .collect::>(); + ///////////////////////////////////////////////// + let balance_change = get_balance_changes_with_status_from_effect( self, effects, @@ -253,11 +276,15 @@ impl TxChangesProcessor { }), None, status_map, - input_objects_to_owner, - output_objects_to_owner, + &input_objects_to_owner, + &output_objects_to_owner, ) .await?; - Ok((balance_change, object_change)) + Ok(( + balance_change, + indexed_objects_changes, + custom_object_change, + )) } } diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 5f83fcbd6057e..7dd8750a2fe91 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -3,6 +3,7 @@ use crate::errors::IndexerError; use move_core_types::language_storage::StructTag; +use odin::sui_ws::ObjectChangeUpdate; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_json_rpc_types::{ @@ -363,6 +364,7 @@ pub struct CustomIndexedTransaction { pub timestamp_ms: u64, pub object_changes: Vec, pub balance_change: Vec, + pub custom_object_changes: Vec<(Option, ObjectChangeUpdate)>, pub events: Vec, pub transaction_kind: TransactionKind, pub successful_tx_num: u64, diff --git a/crates/sui-json-rpc/Cargo.toml b/crates/sui-json-rpc/Cargo.toml index e03790cecfaa4..de2f6f9f9e957 100644 --- a/crates/sui-json-rpc/Cargo.toml +++ b/crates/sui-json-rpc/Cargo.toml @@ -7,6 +7,8 @@ publish = false edition = "2021" [dependencies] +odin = { workspace = true, version = "0.1.0" } + arc-swap.workspace = true chrono.workspace = true fastcrypto.workspace = true diff --git a/crates/sui-json-rpc/src/object_changes.rs b/crates/sui-json-rpc/src/object_changes.rs index c3fbdf26ad67b..5627c47a82068 100644 --- a/crates/sui-json-rpc/src/object_changes.rs +++ b/crates/sui-json-rpc/src/object_changes.rs @@ -1,13 +1,15 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::BTreeMap; - +use move_core_types::language_storage::StructTag; +use odin::sui_ws::{ObjectChangeUpdate, ObjectUpdateStatus, Received, Sent}; +use std::collections::{BTreeMap, HashMap}; use sui_json_rpc_types::ObjectChange; use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, SuiAddress}; use sui_types::effects::ObjectRemoveKind; -use sui_types::object::Owner; +use sui_types::object::{Object, Owner}; use sui_types::storage::WriteKind; +use tracing::warn; use crate::ObjectProvider; @@ -90,3 +92,346 @@ pub async fn get_object_changes, E>( Ok(object_changes) } + +pub async fn custom_get_object_changes, E>( + object_provider: &P, + sender: SuiAddress, + modified_at_versions: Vec<(ObjectID, SequenceNumber)>, + all_changed_objects: Vec<(ObjectRef, Owner, WriteKind)>, + all_removed_objects: Vec<(ObjectRef, ObjectRemoveKind)>, + input_objects: &Vec, + output_objects: &Vec, +) -> Result<(Vec, Vec<(Option, ObjectChangeUpdate)>), E> { + let mut object_changes = vec![]; + let mut custom_object_changes: Vec<(Option, ObjectChangeUpdate)> = vec![]; + + // Input objects ownership map + let input_ownership_map = input_objects + .iter() + .map(|o| { + ( + o.id().to_string(), + match o.owner.get_owner_address() { + Ok(owner) => Some(owner.to_string()), + Err(_) => None, + }, + ) + }) + .collect::>(); + // Output objects ownership map is needed to check if object ownership changed + let output_ownership_map = output_objects + .iter() + .map(|o| { + ( + o.id().to_string(), + match o.owner.get_owner_address() { + Ok(owner) => Some(owner.to_string()), + Err(_) => None, + }, + ) + }) + .collect::>(); + + let modify_at_version = modified_at_versions.into_iter().collect::>(); + + for ((object_id, version, digest), owner, kind) in all_changed_objects { + let o = object_provider.get_object(&object_id, &version).await?; + if let Some(type_) = o.type_() { + let object_type: StructTag = type_.clone().into(); + + let address_owner = match o.owner.get_owner_address() { + Ok(owner) => Some(owner.to_string()), + Err(_) => None, + }; + + let data = match o.data.try_as_move() { + Some(data) => Some(data.clone().into_contents()), + None => None, + }; + + match kind { + WriteKind::Mutate => { + object_changes.push(ObjectChange::Mutated { + sender, + owner, + object_type: object_type.clone(), + object_id, + version, + // modify_at_version should always be available for mutated object + previous_version: modify_at_version + .get(&object_id) + .cloned() + .unwrap_or_default(), + digest, + }); + + // Check if ownership change occurred, object should have existed in transaction input + let object_owner = match input_ownership_map.get(&object_id.to_string()) { + Some(old_owner) => old_owner.clone(), + None => { + // Should never happen + warn!( + "Object ownership change occurred but object not found in input objects, object_id: {}", object_id + ); + continue; + } + }; + + // Check change + match (object_owner, address_owner) { + (Some(old_owner), Some(new_owner)) => { + // Check if owner changed + if old_owner != new_owner { + // Receiver + custom_object_changes.push(( + Some(new_owner.clone()), + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some( + object_type.to_canonical_string(true), + ), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Received(Received { + sender_address: old_owner.to_string(), + receiver_address: new_owner, + }), + }, + )); + } + } + (Some(_), None) | (None, None) => { + custom_object_changes.push(( + None, + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Mutated, + }, + )); + } + (None, Some(new_owner)) => { + // Object did not have owner before, now has owner + custom_object_changes.push(( + Some(new_owner), + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Mutated, + }, + )); + } + } + } + WriteKind::Create => { + object_changes.push(ObjectChange::Created { + sender, + owner, + object_type: object_type.clone(), + object_id, + version, + digest, + }); + + // Check if object had previous owner + let object_existed = input_ownership_map.get(&object_id.to_string()); + + // 1. (Some(), Some()) Object existed and still has owner + // 2. (Some(), None) Object existed but no longer has owner + // 3. (None, Some()) Object did not exist and now does and has an owner + // 4. (None, None) Object did not exist and but now it does and it does not have an owner + match (object_existed, address_owner) { + // (Option, Option) + (Some(old_owner), Some(new_owner)) => { + // Check if object had owner before and now has a different owner + match old_owner { + Some(old_owner) => { + // object existed and had owner before, check if owner changed + if old_owner != &new_owner { + // Receiver + custom_object_changes.push(( + Some(new_owner.clone()), + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some( + object_type.to_canonical_string(true), + ), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Received(Received { + sender_address: old_owner.to_string(), + receiver_address: new_owner, + }), + }, + )); + } + } + None => { + // Object did not have owner before, now has owner + custom_object_changes.push(( + Some(new_owner), + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some( + object_type.to_canonical_string(true), + ), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Mutated, + }, + )); + } + } + } + (Some(_), None) | (None, None) => { + custom_object_changes.push(( + None, + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Created, + }, + )); + } + // Object did not exist before, now it does and has an owner + (None, Some(new_owner)) => { + custom_object_changes.push(( + Some(new_owner), + ObjectChangeUpdate { + object_id: object_id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: data, + object_metadata: None, + status: ObjectUpdateStatus::Created, + }, + )); + } + } + } + _ => {} + } + } else if let Some(p) = o.data.try_as_package() { + if kind == WriteKind::Create { + object_changes.push(ObjectChange::Published { + package_id: p.id(), + version: p.version(), + digest, + modules: p.serialized_module_map().keys().cloned().collect(), + }) + } + }; + } + + for ((id, version, _), kind) in all_removed_objects { + let o = object_provider + .find_object_lt_or_eq_version(&id, &version) + .await?; + if let Some(o) = o { + if let Some(type_) = o.type_() { + let object_type: StructTag = type_.clone().into(); + match kind { + ObjectRemoveKind::Delete => object_changes.push(ObjectChange::Deleted { + sender, + object_type: object_type.clone(), + object_id: id, + version, + }), + ObjectRemoveKind::Wrap => object_changes.push(ObjectChange::Wrapped { + sender, + object_type: object_type.clone(), + object_id: id, + version, + }), + } + + let data = match o.data.try_as_move() { + Some(data) => data.clone().into_contents(), + None => continue, + }; + + let address_owner = match o.owner.get_owner_address() { + Ok(owner) => Some(owner.to_string()), + Err(_) => None, + }; + + // Check if object has changed ownership + match output_ownership_map.get(&id.to_string()) { + // object still exists aka has changed ownership + Some(new_owner) => { + if let Some(new_owner) = new_owner { + // Sender + + // just in case get old owner from out map + let old_owner = match input_ownership_map.get(&id.to_string()) { + Some(old_owner) => old_owner, + None => { + // Should never happen + warn!( + "Object ownership change occurred but object not found in input objects, object_id: {}", id + ); + continue; + } + }; + custom_object_changes.push(( + old_owner.clone(), + ObjectChangeUpdate { + object_id: id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: Some(data), + object_metadata: None, + status: ObjectUpdateStatus::Sent(Sent { + sender_address: new_owner.clone(), + receiver_address: new_owner.clone(), + }), + }, + )); + } else { + // Object was transferred exists but no longer has owner + custom_object_changes.push(( + None, + ObjectChangeUpdate { + object_id: id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: Some(data), + object_metadata: None, + status: ObjectUpdateStatus::Deleted, + }, + )); + } + } + None => { + // Object was simply deleted + custom_object_changes.push(( + address_owner, + ObjectChangeUpdate { + object_id: id.to_string(), + object_type_tag: Some(object_type.to_canonical_string(true)), + object_version: Some(version.into()), + object_bcs: Some(data), + object_metadata: None, + status: ObjectUpdateStatus::Deleted, + }, + )); + } + } + } + }; + } + + Ok((object_changes, custom_object_changes)) +}