Skip to content

Commit

Permalink
update payload generation
Browse files Browse the repository at this point in the history
  • Loading branch information
Giems committed Aug 19, 2024
1 parent 84d5e65 commit b6f326a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 157 deletions.
281 changes: 129 additions & 152 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::handlers::CustomCheckpointDataToCommit;
use crate::models::display::StoredDisplay;
use async_trait::async_trait;
use chrono::Utc;
use itertools::Itertools;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
Expand All @@ -30,7 +31,7 @@ use sui_types::dynamic_field::DynamicFieldType;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
};
use sui_types::nats_queue::{NatsQueueSender, WsPayload};
use sui_types::nats_queue::{NatsQueueSender, NotificationPayload, WsPayload};
use sui_types::object::Object;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -162,8 +163,13 @@ where
.await?;

// Send ws updates via nats
let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data);
self.nats_queue.sender.send(nats_ws_payload).await?;
let (nats_ws_payload, notifications_payload) =
generate_updates_from_checkpoint_data(&checkpoint_data);
self.nats_queue.ws_sender.send(nats_ws_payload).await?;
self.nats_queue
.notifications_sender
.send(notifications_payload)
.await?;

// Convert custom checkpoint data into original type
self.indexed_checkpoint_sender
Expand Down Expand Up @@ -958,203 +964,159 @@ fn try_create_dynamic_field_info(
}))
}

pub fn generate_ws_updates_from_checkpoint_data(
pub fn generate_updates_from_checkpoint_data(
checkpoint_data: &CustomCheckpointDataToCommit,
) -> WsPayload {
) -> (WsPayload, NotificationPayload) {
let block_number = checkpoint_data.checkpoint.sequence_number;

let mut notifications: BTreeMap<u64, Vec<SuiIndexerNotification>> = BTreeMap::new();
let mut ws_balance_changes: HashMap<String, TokenBalanceUpdate> = HashMap::new();
let mut account_object_changes: HashMap<String, AccountObjectsUpdate> = HashMap::new(); // account address -> account object changes
let mut account_object_changes: HashMap<String, AccountObjectsUpdate> = HashMap::new();
let mut objects_changes: Vec<ObjectChangeUpdate> = Vec::new();

// transaction balance changes
for transaction in checkpoint_data.transactions.iter() {
for change in transaction.balance_change.iter() {
let user_address = match &change.owner.get_owner_address() {
for transaction in &checkpoint_data.transactions {
let transaction_id = transaction.tx_sequence_number;
let mut transaction_coin_changes: HashMap<String, HashMap<String, i128>> = HashMap::new();

// Process balance changes
for change in &transaction.balance_change {
let user_address = match change.owner.get_owner_address() {
Ok(address) => address.to_string(),
Err(_) => continue,
};
let coin_type = change.coin_type.to_canonical_string(true);

// Prepare ws update
// Update transaction_coin_changes for notifications
let total_change = transaction_coin_changes
.entry(user_address.clone())
.or_insert_with(HashMap::new)
.entry(coin_type.clone())
.or_insert(0);
*total_change += change.amount;

// Update ws_balance_changes for WS updates
let ws_update = ws_balance_changes
.entry(user_address.clone())
.or_insert(TokenBalanceUpdate {
sui_address: user_address.clone(),
sui_address: user_address,
sequence_number: block_number,
changed_balances: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
timestamp_ms: Utc::now().timestamp_millis() as u64,
})
.changed_balances
.entry(coin_type.clone())
.or_insert(TokenUpdate {
coin_type,
coin_type: coin_type.clone(),
object_changes: HashMap::new(),
});

match change.status {
// New coin is created
ObjectStatus::Created => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Created(CoinCreated {
ws_update.object_changes.insert(
change.object_id.clone(),
CoinObjectUpdateStatus::Created(CoinCreated {
amount: change.amount,
}));
}),
);
}
ObjectStatus::Mutated => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated {
ws_update.object_changes.insert(
change.object_id.clone(),
CoinObjectUpdateStatus::Mutated(CoinMutated {
change: change.amount,
}));
}),
);
}
ObjectStatus::Deleted => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Deleted);
.insert(change.object_id.clone(), CoinObjectUpdateStatus::Deleted);
}
}
}

for (change_owner, object_change) in transaction.custom_object_changes.iter() {
if let Some(owner) = change_owner {
account_object_changes
.entry(owner.to_string())
.or_insert(AccountObjectsUpdate {
sui_address: owner.to_string(),
sequence_number: block_number,
object_changes: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
})
.object_changes
.entry(object_change.object_id.clone())
.or_insert(object_change.clone());
}

objects_changes.push(object_change.clone());
}
}

let updates: Vec<SuiWsApiMsg> = ws_balance_changes
.into_values()
.map(|v| SuiWsApiMsg::TokenBalanceUpdate(v))
.chain(
account_object_changes
.into_values()
.map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)),
)
.chain(
objects_changes
.into_iter()
.map(|v| SuiWsApiMsg::ObjectUpdate(v)),
)
.collect();

return (checkpoint_data.checkpoint.sequence_number, updates);
}

// for now just clone the method from above and modify it to return SuiIndexerNotification
pub fn generate_notifications_updates_from_checkpoint_data(
checkpoint_data: &CustomCheckpointDataToCommit,
) -> (u64, BTreeMap<u64, Vec<SuiIndexerNotification>>) {
let mut notifications: BTreeMap<u64, Vec<SuiIndexerNotification>> = BTreeMap::new();

for transaction in &checkpoint_data.transactions {
let mut transaction_coin_changes: HashMap<String, HashMap<String, i128>> = HashMap::new(); // user_address -> coin_type -> total_change
let transaction_id = transaction.tx_sequence_number;

// Process balance changes
for change in transaction.balance_change.iter() {
let user_address = match change.owner.get_owner_address() {
Ok(address) => address.to_string(),
Err(_) => continue,
};
let coin_type = change.coin_type.to_canonical_string(true);

let total_change = transaction_coin_changes
.entry(user_address)
.or_insert_with(HashMap::new)
.entry(coin_type)
.or_insert(0);

*total_change += change.amount;
}

// Generate notifications
for (sui_address, coin_changes) in transaction_coin_changes.iter() {
if coin_changes.len() < 3 {
continue; // Not enough changes for a swap
}

let mut sui_change = 0i128;
let mut base_coin: Option<(String, i128)> = None;
let mut quote_coin: Option<(String, i128)> = None;
for (sui_address, coin_changes) in &transaction_coin_changes {
if coin_changes.len() >= 3 {
// Potential swap scenario
let mut sui_change = 0i128;
let mut base_coin: Option<(String, i128)> = None;
let mut quote_coin: Option<(String, i128)> = None;

for (coin_type, amount) in coin_changes {
if coin_type == "0x2::sui::SUI" {
sui_change = *amount;
} else if *amount < 0 && base_coin.is_none() {
base_coin = Some((coin_type.clone(), -amount));
} else if *amount > 0 && quote_coin.is_none() {
quote_coin = Some((coin_type.clone(), *amount));
}
}

for (coin_type, amount) in coin_changes.iter() {
if coin_type == "0x2::sui::SUI" {
sui_change = *amount;
} else if *amount < 0 && base_coin.is_none() {
base_coin = Some((coin_type.clone(), -amount)); // Store positive amount
} else if *amount > 0 && quote_coin.is_none() {
quote_coin = Some((coin_type.clone(), *amount));
if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() {
let (base_coin_type, base_amount) = base_coin.unwrap();
let (quote_coin_type, quote_amount) = quote_coin.unwrap();
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSwap(CoinSwap {
sui_address: sui_address.clone(),
base_coin_type,
base_amount,
quote_coin_type,
quote_amount,
}));
continue;
}
}

// Check if we have a valid swap scenario
if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() {
let (base_coin_type, base_amount) = base_coin.unwrap();
let (quote_coin_type, quote_amount) = quote_coin.unwrap();

notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSwap(CoinSwap {
sui_address: sui_address.clone(),
base_coin_type,
base_amount,
quote_coin_type,
quote_amount,
}));
} else {
// If not a swap, generate individual send/receive notifications
for (coin_type, amount) in coin_changes.iter() {
if coin_type == "0x2::sui::SUI" {
continue; // Skip SUI changes
}
if *amount < 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSent(CoinSent {
sender_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: -amount, // Convert to positive
}));
} else if *amount > 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinReceived(CoinReceived {
receiver_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: *amount,
}));
}
// If not a swap, generate individual send/receive notifications
for (coin_type, amount) in coin_changes {
if coin_type == "0x2::sui::SUI" {
continue;
}
if *amount < 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSent(CoinSent {
sender_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: -amount,
}));
} else if *amount > 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinReceived(CoinReceived {
receiver_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: *amount,
}));
}
}
}

// Process custom object changes (NFT part remains unchanged)
// Process custom object changes
for (change_owner, object_change) in &transaction.custom_object_changes {
if let Some(owner) = change_owner {
let owner_address = owner.to_string();
let nft_id = object_change.object_id.to_string();

// Update account_object_changes for WS updates
account_object_changes
.entry(owner_address.clone())
.or_insert(AccountObjectsUpdate {
sui_address: owner_address.clone(),
sequence_number: block_number,
object_changes: HashMap::new(),
timestamp_ms: Utc::now().timestamp_millis() as u64,
})
.object_changes
.insert(object_change.object_id.clone(), object_change.clone());

objects_changes.push(object_change.clone());

// Generate notifications for NFT changes
match &object_change.status {
ObjectUpdateStatus::Created => {
notifications
Expand Down Expand Up @@ -1198,5 +1160,20 @@ pub fn generate_notifications_updates_from_checkpoint_data(
}
}

(checkpoint_data.checkpoint.sequence_number, notifications)
// Prepare WS updates
let ws_updates: Vec<SuiWsApiMsg> = ws_balance_changes
.into_values()
.map(SuiWsApiMsg::TokenBalanceUpdate)
.chain(
account_object_changes
.into_values()
.map(SuiWsApiMsg::AccountObjectsUpdate),
)
.chain(objects_changes.into_iter().map(SuiWsApiMsg::ObjectUpdate))
.collect();

return (
(checkpoint_data.checkpoint.sequence_number, ws_updates),
(checkpoint_data.checkpoint.sequence_number, notifications),
);
}
10 changes: 5 additions & 5 deletions crates/sui-types/src/nats_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub type NotificationPayload = (u64, BTreeMap<u64, Vec<SuiIndexerNotification>>)

pub struct NatsQueueSender {
pub init_checkpoint: u64,
pub sender: Arc<Sender<WsPayload>>,
pub receiver: Arc<Mutex<Receiver<WsPayload>>>,
pub ws_sender: Arc<Sender<WsPayload>>,
pub ws_receiver: Arc<Mutex<Receiver<WsPayload>>>,
pub notifications_sender: Arc<Sender<NotificationPayload>>,
pub notifications_receiver: Arc<Mutex<Receiver<NotificationPayload>>>,
odin: Arc<Odin>,
Expand All @@ -30,8 +30,8 @@ pub fn nats_queue(odin: Arc<Odin>) -> NatsQueueSender {

NatsQueueSender {
init_checkpoint: u64::MAX,
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
ws_sender: Arc::new(tx),
ws_receiver: Arc::new(Mutex::new(rx)),
notifications_sender: Arc::new(tx_notifications),
notifications_receiver: Arc::new(Mutex::new(rx_notifications)),
odin,
Expand All @@ -42,7 +42,7 @@ impl NatsQueueSender {
pub async fn run(&mut self) {
// Spawn task that will order the messages
let odin = self.odin.clone();
let receiver = self.receiver.clone();
let receiver = self.ws_receiver.clone();
let init_checkpoint = self.init_checkpoint;

// Task for ws updates
Expand Down

0 comments on commit b6f326a

Please sign in to comment.