diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index d7b214f3154927..5e81eca6f06d1c 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -6,6 +6,7 @@ use crate::handlers::tx_processor::IndexingPackageBuffer; use crate::handlers::CustomCheckpointDataToCommit; use crate::models::display::StoredDisplay; use async_trait::async_trait; +use chrono::Utc; use itertools::Itertools; use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; @@ -30,7 +31,7 @@ use sui_types::dynamic_field::DynamicFieldType; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; -use sui_types::nats_queue::{NatsQueueSender, WsPayload}; +use sui_types::nats_queue::{NatsQueueSender, NotificationPayload, WsPayload}; use sui_types::object::Object; use tokio::sync::watch; use tokio_util::sync::CancellationToken; @@ -162,8 +163,13 @@ where .await?; // Send ws updates via nats - let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data); - self.nats_queue.sender.send(nats_ws_payload).await?; + let (nats_ws_payload, notifications_payload) = + generate_updates_from_checkpoint_data(&checkpoint_data); + self.nats_queue.ws_sender.send(nats_ws_payload).await?; + self.nats_queue + .notifications_sender + .send(notifications_payload) + .await?; // Convert custom checkpoint data into original type self.indexed_checkpoint_sender @@ -958,203 +964,159 @@ fn try_create_dynamic_field_info( })) } -pub fn generate_ws_updates_from_checkpoint_data( +pub fn generate_updates_from_checkpoint_data( checkpoint_data: &CustomCheckpointDataToCommit, -) -> WsPayload { +) -> (WsPayload, NotificationPayload) { let block_number = checkpoint_data.checkpoint.sequence_number; - + let mut notifications: BTreeMap> = BTreeMap::new(); let mut ws_balance_changes: HashMap = HashMap::new(); - let mut account_object_changes: HashMap = HashMap::new(); // account address -> account object changes + let mut account_object_changes: HashMap = HashMap::new(); let mut objects_changes: Vec = Vec::new(); - // transaction balance changes - for transaction in checkpoint_data.transactions.iter() { - for change in transaction.balance_change.iter() { - let user_address = match &change.owner.get_owner_address() { + for transaction in &checkpoint_data.transactions { + let transaction_id = transaction.tx_sequence_number; + let mut transaction_coin_changes: HashMap> = HashMap::new(); + + // Process balance changes + for change in &transaction.balance_change { + let user_address = match change.owner.get_owner_address() { Ok(address) => address.to_string(), Err(_) => continue, }; let coin_type = change.coin_type.to_canonical_string(true); - // Prepare ws update + // Update transaction_coin_changes for notifications + let total_change = transaction_coin_changes + .entry(user_address.clone()) + .or_insert_with(HashMap::new) + .entry(coin_type.clone()) + .or_insert(0); + *total_change += change.amount; + + // Update ws_balance_changes for WS updates let ws_update = ws_balance_changes .entry(user_address.clone()) .or_insert(TokenBalanceUpdate { - sui_address: user_address.clone(), + sui_address: user_address, sequence_number: block_number, changed_balances: HashMap::new(), - timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + timestamp_ms: Utc::now().timestamp_millis() as u64, }) .changed_balances .entry(coin_type.clone()) .or_insert(TokenUpdate { - coin_type, + coin_type: coin_type.clone(), object_changes: HashMap::new(), }); + match change.status { - // New coin is created ObjectStatus::Created => { - // Update ws update - ws_update - .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Created(CoinCreated { + ws_update.object_changes.insert( + change.object_id.clone(), + CoinObjectUpdateStatus::Created(CoinCreated { amount: change.amount, - })); + }), + ); } ObjectStatus::Mutated => { - // Update ws update - ws_update - .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated { + ws_update.object_changes.insert( + change.object_id.clone(), + CoinObjectUpdateStatus::Mutated(CoinMutated { change: change.amount, - })); + }), + ); } ObjectStatus::Deleted => { - // Update ws update ws_update .object_changes - .entry(change.object_id.clone()) - .or_insert(CoinObjectUpdateStatus::Deleted); + .insert(change.object_id.clone(), CoinObjectUpdateStatus::Deleted); } } } - for (change_owner, object_change) in transaction.custom_object_changes.iter() { - if let Some(owner) = change_owner { - account_object_changes - .entry(owner.to_string()) - .or_insert(AccountObjectsUpdate { - sui_address: owner.to_string(), - sequence_number: block_number, - object_changes: HashMap::new(), - timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, - }) - .object_changes - .entry(object_change.object_id.clone()) - .or_insert(object_change.clone()); - } - - objects_changes.push(object_change.clone()); - } - } - - let updates: Vec = ws_balance_changes - .into_values() - .map(|v| SuiWsApiMsg::TokenBalanceUpdate(v)) - .chain( - account_object_changes - .into_values() - .map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)), - ) - .chain( - objects_changes - .into_iter() - .map(|v| SuiWsApiMsg::ObjectUpdate(v)), - ) - .collect(); - - return (checkpoint_data.checkpoint.sequence_number, updates); -} - -// for now just clone the method from above and modify it to return SuiIndexerNotification -pub fn generate_notifications_updates_from_checkpoint_data( - checkpoint_data: &CustomCheckpointDataToCommit, -) -> (u64, BTreeMap>) { - let mut notifications: BTreeMap> = BTreeMap::new(); - - for transaction in &checkpoint_data.transactions { - let mut transaction_coin_changes: HashMap> = HashMap::new(); // user_address -> coin_type -> total_change - let transaction_id = transaction.tx_sequence_number; - - // Process balance changes - for change in transaction.balance_change.iter() { - let user_address = match change.owner.get_owner_address() { - Ok(address) => address.to_string(), - Err(_) => continue, - }; - let coin_type = change.coin_type.to_canonical_string(true); - - let total_change = transaction_coin_changes - .entry(user_address) - .or_insert_with(HashMap::new) - .entry(coin_type) - .or_insert(0); - - *total_change += change.amount; - } - // Generate notifications - for (sui_address, coin_changes) in transaction_coin_changes.iter() { - if coin_changes.len() < 3 { - continue; // Not enough changes for a swap - } - - let mut sui_change = 0i128; - let mut base_coin: Option<(String, i128)> = None; - let mut quote_coin: Option<(String, i128)> = None; + for (sui_address, coin_changes) in &transaction_coin_changes { + if coin_changes.len() >= 3 { + // Potential swap scenario + let mut sui_change = 0i128; + let mut base_coin: Option<(String, i128)> = None; + let mut quote_coin: Option<(String, i128)> = None; + + for (coin_type, amount) in coin_changes { + if coin_type == "0x2::sui::SUI" { + sui_change = *amount; + } else if *amount < 0 && base_coin.is_none() { + base_coin = Some((coin_type.clone(), -amount)); + } else if *amount > 0 && quote_coin.is_none() { + quote_coin = Some((coin_type.clone(), *amount)); + } + } - for (coin_type, amount) in coin_changes.iter() { - if coin_type == "0x2::sui::SUI" { - sui_change = *amount; - } else if *amount < 0 && base_coin.is_none() { - base_coin = Some((coin_type.clone(), -amount)); // Store positive amount - } else if *amount > 0 && quote_coin.is_none() { - quote_coin = Some((coin_type.clone(), *amount)); + if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() { + let (base_coin_type, base_amount) = base_coin.unwrap(); + let (quote_coin_type, quote_amount) = quote_coin.unwrap(); + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSwap(CoinSwap { + sui_address: sui_address.clone(), + base_coin_type, + base_amount, + quote_coin_type, + quote_amount, + })); + continue; } } - // Check if we have a valid swap scenario - if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() { - let (base_coin_type, base_amount) = base_coin.unwrap(); - let (quote_coin_type, quote_amount) = quote_coin.unwrap(); - - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinSwap(CoinSwap { - sui_address: sui_address.clone(), - base_coin_type, - base_amount, - quote_coin_type, - quote_amount, - })); - } else { - // If not a swap, generate individual send/receive notifications - for (coin_type, amount) in coin_changes.iter() { - if coin_type == "0x2::sui::SUI" { - continue; // Skip SUI changes - } - if *amount < 0 { - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinSent(CoinSent { - sender_address: sui_address.clone(), - coin_type: coin_type.clone(), - amount: -amount, // Convert to positive - })); - } else if *amount > 0 { - notifications - .entry(transaction_id) - .or_insert_with(Vec::new) - .push(SuiIndexerNotification::CoinReceived(CoinReceived { - receiver_address: sui_address.clone(), - coin_type: coin_type.clone(), - amount: *amount, - })); - } + // If not a swap, generate individual send/receive notifications + for (coin_type, amount) in coin_changes { + if coin_type == "0x2::sui::SUI" { + continue; + } + if *amount < 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinSent(CoinSent { + sender_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: -amount, + })); + } else if *amount > 0 { + notifications + .entry(transaction_id) + .or_insert_with(Vec::new) + .push(SuiIndexerNotification::CoinReceived(CoinReceived { + receiver_address: sui_address.clone(), + coin_type: coin_type.clone(), + amount: *amount, + })); } } } - // Process custom object changes (NFT part remains unchanged) + // Process custom object changes for (change_owner, object_change) in &transaction.custom_object_changes { if let Some(owner) = change_owner { let owner_address = owner.to_string(); let nft_id = object_change.object_id.to_string(); + // Update account_object_changes for WS updates + account_object_changes + .entry(owner_address.clone()) + .or_insert(AccountObjectsUpdate { + sui_address: owner_address.clone(), + sequence_number: block_number, + object_changes: HashMap::new(), + timestamp_ms: Utc::now().timestamp_millis() as u64, + }) + .object_changes + .insert(object_change.object_id.clone(), object_change.clone()); + + objects_changes.push(object_change.clone()); + + // Generate notifications for NFT changes match &object_change.status { ObjectUpdateStatus::Created => { notifications @@ -1198,5 +1160,20 @@ pub fn generate_notifications_updates_from_checkpoint_data( } } - (checkpoint_data.checkpoint.sequence_number, notifications) + // Prepare WS updates + let ws_updates: Vec = ws_balance_changes + .into_values() + .map(SuiWsApiMsg::TokenBalanceUpdate) + .chain( + account_object_changes + .into_values() + .map(SuiWsApiMsg::AccountObjectsUpdate), + ) + .chain(objects_changes.into_iter().map(SuiWsApiMsg::ObjectUpdate)) + .collect(); + + return ( + (checkpoint_data.checkpoint.sequence_number, ws_updates), + (checkpoint_data.checkpoint.sequence_number, notifications), + ); } diff --git a/crates/sui-types/src/nats_queue.rs b/crates/sui-types/src/nats_queue.rs index 5ed6928887f67d..ad406090bd0b2b 100644 --- a/crates/sui-types/src/nats_queue.rs +++ b/crates/sui-types/src/nats_queue.rs @@ -15,8 +15,8 @@ pub type NotificationPayload = (u64, BTreeMap>) pub struct NatsQueueSender { pub init_checkpoint: u64, - pub sender: Arc>, - pub receiver: Arc>>, + pub ws_sender: Arc>, + pub ws_receiver: Arc>>, pub notifications_sender: Arc>, pub notifications_receiver: Arc>>, odin: Arc, @@ -30,8 +30,8 @@ pub fn nats_queue(odin: Arc) -> NatsQueueSender { NatsQueueSender { init_checkpoint: u64::MAX, - sender: Arc::new(tx), - receiver: Arc::new(Mutex::new(rx)), + ws_sender: Arc::new(tx), + ws_receiver: Arc::new(Mutex::new(rx)), notifications_sender: Arc::new(tx_notifications), notifications_receiver: Arc::new(Mutex::new(rx_notifications)), odin, @@ -42,7 +42,7 @@ impl NatsQueueSender { pub async fn run(&mut self) { // Spawn task that will order the messages let odin = self.odin.clone(); - let receiver = self.receiver.clone(); + let receiver = self.ws_receiver.clone(); let init_checkpoint = self.init_checkpoint; // Task for ws updates