Skip to content

Commit

Permalink
Merge pull request #5 from nightly-labs/objects-changes-ws
Browse files Browse the repository at this point in the history
objects update
  • Loading branch information
Giems authored Aug 1, 2024
2 parents 63aef7a + f538720 commit be35593
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 43 additions & 19 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use itertools::Itertools;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use odin::sui_ws::{CoinCreated, CoinMutated, CoinObjectUpdateStatus};
use odin::sui_ws::{
AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate,
};
use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -443,24 +445,16 @@ where

// Add out custom maps
let status_map = get_object_status_map(&fx);
let input_objects_to_owner = input_objects
.iter()
.map(|input_object| (input_object.id(), input_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();
let output_objects_to_owner = output_objects
.iter()
.map(|output_object| (output_object.id(), output_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();

let (balance_change, object_changes) =
let (balance_change, object_changes, custom_object_changes) =
TxChangesProcessor::new(&objects, metrics.clone())
.custom_get_changes(
tx,
&fx,
&tx_digest,
status_map,
&input_objects_to_owner,
&output_objects_to_owner,
&input_objects,
&output_objects,
)
.await?;

Expand All @@ -473,6 +467,7 @@ where
effects: fx.clone(),
object_changes,
balance_change,
custom_object_changes,
events,
transaction_kind,
successful_tx_num: if fx.status().is_ok() {
Expand Down Expand Up @@ -964,6 +959,8 @@ pub fn generate_ws_updates_from_checkpoint_data(
let block_number = checkpoint_data.checkpoint.sequence_number;

let mut ws_balance_changes: HashMap<String, TokenBalanceUpdate> = HashMap::new();
let mut account_object_changes: HashMap<String, AccountObjectsUpdate> = HashMap::new(); // account address -> account object changes
let mut objects_changes: Vec<ObjectChangeUpdate> = Vec::new();

// transaction balance changes
for transaction in checkpoint_data.transactions.iter() {
Expand Down Expand Up @@ -1018,13 +1015,40 @@ pub fn generate_ws_updates_from_checkpoint_data(
}
}
}

for (change_owner, object_change) in transaction.custom_object_changes.iter() {
if let Some(owner) = change_owner {
account_object_changes
.entry(owner.to_string())
.or_insert(AccountObjectsUpdate {
sui_address: owner.to_string(),
sequence_number: block_number,
object_changes: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
})
.object_changes
.entry(object_change.object_id.clone())
.or_insert(object_change.clone());
}

objects_changes.push(object_change.clone());
}
}

return (
checkpoint_data.checkpoint.sequence_number,
ws_balance_changes
.into_values()
.map(|v| SuiWsApiMsg::TokenBalanceUpdate(v))
.collect(),
);
let updates: Vec<SuiWsApiMsg> = ws_balance_changes
.into_values()
.map(|v| SuiWsApiMsg::TokenBalanceUpdate(v))
.chain(
account_object_changes
.into_values()
.map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)),
)
.chain(
objects_changes
.into_iter()
.map(|v| SuiWsApiMsg::ObjectUpdate(v)),
)
.collect();

return (checkpoint_data.checkpoint.sequence_number, updates);
}
59 changes: 43 additions & 16 deletions crates/sui-indexer/src/handlers/tx_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
use async_trait::async_trait;
use mysten_metrics::monitored_scope;
use mysten_metrics::spawn_monitored_task;
use odin::sui_ws::ObjectChangeUpdate;
use sui_json_rpc::custom_get_object_changes;
use sui_json_rpc::get_balance_changes_with_status_from_effect;
use sui_json_rpc_types::ObjectStatus;
use sui_package_resolver::PackageStore;
use sui_package_resolver::Resolver;
use sui_rest_api::CheckpointData;
use sui_types::base_types::SuiAddress;
use sui_types::object::Owner;
use tokio::sync::watch;

Expand Down Expand Up @@ -221,27 +226,45 @@ impl TxChangesProcessor {
effects: &TransactionEffects,
tx_digest: &TransactionDigest,
status_map: HashMap<ObjectID, ObjectStatus>,
input_objects_to_owner: &HashMap<ObjectID, Owner>,
output_objects_to_owner: &HashMap<ObjectID, Owner>,
input_objects: &Vec<Object>,
output_objects: &Vec<Object>,
) -> IndexerResult<(
Vec<sui_json_rpc_types::BalanceChangeWithStatus>,
Vec<IndexedObjectChange>,
Vec<(Option<String>, ObjectChangeUpdate)>,
)> {
let _timer = self
.metrics
.indexing_tx_object_changes_latency
.start_timer();
let object_change: Vec<_> = get_object_changes(
self,
tx.sender(),
effects.modified_at_versions(),
effects.all_changed_objects(),
effects.all_removed_objects(),
)
.await?
.into_iter()
.map(IndexedObjectChange::from)
.collect();
let (original_object_change, custom_object_change): (Vec<_>, Vec<_>) =
custom_get_object_changes(
self,
tx.sender(),
effects.modified_at_versions(),
effects.all_changed_objects(),
effects.all_removed_objects(),
&input_objects,
&output_objects,
)
.await?;

let indexed_objects_changes = original_object_change
.into_iter()
.map(IndexedObjectChange::from)
.collect::<Vec<_>>();

///////////////////////////////////////////////// Moved here from the parent function index_transactions
let input_objects_to_owner = input_objects
.iter()
.map(|input_object| (input_object.id(), input_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();
let output_objects_to_owner = output_objects
.iter()
.map(|output_object| (output_object.id(), output_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();
/////////////////////////////////////////////////

let balance_change = get_balance_changes_with_status_from_effect(
self,
effects,
Expand All @@ -253,11 +276,15 @@ impl TxChangesProcessor {
}),
None,
status_map,
input_objects_to_owner,
output_objects_to_owner,
&input_objects_to_owner,
&output_objects_to_owner,
)
.await?;
Ok((balance_change, object_change))
Ok((
balance_change,
indexed_objects_changes,
custom_object_change,
))
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::errors::IndexerError;
use move_core_types::language_storage::StructTag;
use odin::sui_ws::ObjectChangeUpdate;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use sui_json_rpc_types::{
Expand Down Expand Up @@ -363,6 +364,7 @@ pub struct CustomIndexedTransaction {
pub timestamp_ms: u64,
pub object_changes: Vec<IndexedObjectChange>,
pub balance_change: Vec<sui_json_rpc_types::BalanceChangeWithStatus>,
pub custom_object_changes: Vec<(Option<String>, ObjectChangeUpdate)>,
pub events: Vec<sui_types::event::Event>,
pub transaction_kind: TransactionKind,
pub successful_tx_num: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-json-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ publish = false
edition = "2021"

[dependencies]
odin = { workspace = true, version = "0.1.0" }

arc-swap.workspace = true
chrono.workspace = true
fastcrypto.workspace = true
Expand Down
Loading

0 comments on commit be35593

Please sign in to comment.