Skip to content

Commit

Permalink
notifications init
Browse files Browse the repository at this point in the history
  • Loading branch information
Giems committed Aug 19, 2024
1 parent 260be09 commit 9e494cb
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 61 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ hashbrown = "0.12"
hdrhistogram = "7.5.1"
hex = "0.4.3"
hex-literal = "0.3.4"
highlight = "all"
highlight = "0.0.3"
http = "0.2.8"
http-body = "0.4.5"
humantime = "2.1.0"
Expand Down Expand Up @@ -716,7 +716,7 @@ semver = "1.0.16"
spinners = "4.1.0"
include_dir = "0.7.3"

odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", rev = "ce64446", package = "odin" }
odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", rev = "063e6ff", package = "odin" }

[patch.crates-io]
quinn-proto = { git = "https://github.com/quinn-rs/quinn.git", rev = "f0fa66f871b80b9d2d7075d76967c649aecc0b77" }
221 changes: 173 additions & 48 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::handlers::CustomCheckpointDataToCommit;
use crate::models::display::StoredDisplay;
use async_trait::async_trait;
use chrono::Utc;
use itertools::Itertools;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use odin::structs::sui_notifications::{
CoinReceived, CoinSent, CoinSwap, NftBurned, NftMinted, NftReceived, NftSent,
SuiIndexerNotification,
};
use odin::sui_ws::{
AccountObjectsUpdate, CoinCreated, CoinMutated, CoinObjectUpdateStatus, ObjectChangeUpdate,
ObjectUpdateStatus,
};
use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate};
use std::collections::{BTreeMap, HashMap};
Expand All @@ -25,7 +31,7 @@ use sui_types::dynamic_field::DynamicFieldType;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
};
use sui_types::nats_queue::{NatsQueueSender, WsPayload};
use sui_types::nats_queue::{NatsQueueSender, NotificationPayload, WsPayload};
use sui_types::object::Object;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -157,8 +163,13 @@ where
.await?;

// Send ws updates via nats
let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data);
self.nats_queue.sender.send(nats_ws_payload).await?;
let (nats_ws_payload, notifications_payload) =
generate_updates_from_checkpoint_data(&checkpoint_data);
self.nats_queue.ws_sender.send(nats_ws_payload).await?;
self.nats_queue
.notifications_sender
.send(notifications_payload)
.await?;

// Convert custom checkpoint data into original type
self.indexed_checkpoint_sender
Expand Down Expand Up @@ -953,102 +964,216 @@ fn try_create_dynamic_field_info(
}))
}

pub fn generate_ws_updates_from_checkpoint_data(
pub fn generate_updates_from_checkpoint_data(
checkpoint_data: &CustomCheckpointDataToCommit,
) -> WsPayload {
) -> (WsPayload, NotificationPayload) {
let block_number = checkpoint_data.checkpoint.sequence_number;

let mut notifications: BTreeMap<u64, Vec<SuiIndexerNotification>> = BTreeMap::new();
let mut ws_balance_changes: HashMap<String, TokenBalanceUpdate> = HashMap::new();
let mut account_object_changes: HashMap<String, AccountObjectsUpdate> = HashMap::new(); // account address -> account object changes
let mut account_object_changes: HashMap<String, AccountObjectsUpdate> = HashMap::new();
let mut objects_changes: Vec<ObjectChangeUpdate> = Vec::new();

// transaction balance changes
for transaction in checkpoint_data.transactions.iter() {
for change in transaction.balance_change.iter() {
let user_address = match &change.owner.get_owner_address() {
for transaction in &checkpoint_data.transactions {
let transaction_id = transaction.tx_sequence_number;
let mut transaction_coin_changes: HashMap<String, HashMap<String, i128>> = HashMap::new();

// Process balance changes
for change in &transaction.balance_change {
let user_address = match change.owner.get_owner_address() {
Ok(address) => address.to_string(),
Err(_) => continue,
};
let coin_type = change.coin_type.to_canonical_string(true);

// Prepare ws update
// Update transaction_coin_changes for notifications
let total_change = transaction_coin_changes
.entry(user_address.clone())
.or_insert_with(HashMap::new)
.entry(coin_type.clone())
.or_insert(0);
*total_change += change.amount;

// Update ws_balance_changes for WS updates
let ws_update = ws_balance_changes
.entry(user_address.clone())
.or_insert(TokenBalanceUpdate {
sui_address: user_address.clone(),
sui_address: user_address,
sequence_number: block_number,
changed_balances: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
timestamp_ms: Utc::now().timestamp_millis() as u64,
})
.changed_balances
.entry(coin_type.clone())
.or_insert(TokenUpdate {
coin_type,
coin_type: coin_type.clone(),
object_changes: HashMap::new(),
});

match change.status {
// New coin is created
ObjectStatus::Created => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Created(CoinCreated {
ws_update.object_changes.insert(
change.object_id.clone(),
CoinObjectUpdateStatus::Created(CoinCreated {
amount: change.amount,
}));
}),
);
}
ObjectStatus::Mutated => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated {
ws_update.object_changes.insert(
change.object_id.clone(),
CoinObjectUpdateStatus::Mutated(CoinMutated {
change: change.amount,
}));
}),
);
}
ObjectStatus::Deleted => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Deleted);
.insert(change.object_id.clone(), CoinObjectUpdateStatus::Deleted);
}
}
}

// Generate notifications
for (sui_address, coin_changes) in &transaction_coin_changes {
if coin_changes.len() >= 3 {
// Potential swap scenario
let mut sui_change = 0i128;
let mut base_coin: Option<(String, i128)> = None;
let mut quote_coin: Option<(String, i128)> = None;

for (coin_type, amount) in coin_changes {
if coin_type == "0x2::sui::SUI" {
sui_change = *amount;
} else if *amount < 0 && base_coin.is_none() {
base_coin = Some((coin_type.clone(), -amount));
} else if *amount > 0 && quote_coin.is_none() {
quote_coin = Some((coin_type.clone(), *amount));
}
}

if sui_change < 0 && base_coin.is_some() && quote_coin.is_some() {
let (base_coin_type, base_amount) = base_coin.unwrap();
let (quote_coin_type, quote_amount) = quote_coin.unwrap();
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSwap(CoinSwap {
sui_address: sui_address.clone(),
base_coin_type,
base_amount,
quote_coin_type,
quote_amount,
}));
continue;
}
}

// If not a swap, generate individual send/receive notifications
for (coin_type, amount) in coin_changes {
if coin_type == "0x2::sui::SUI" {
continue;
}
if *amount < 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinSent(CoinSent {
sender_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: -amount,
}));
} else if *amount > 0 {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::CoinReceived(CoinReceived {
receiver_address: sui_address.clone(),
coin_type: coin_type.clone(),
amount: *amount,
}));
}
}
}

for (change_owner, object_change) in transaction.custom_object_changes.iter() {
// Process custom object changes
for (change_owner, object_change) in &transaction.custom_object_changes {
if let Some(owner) = change_owner {
let owner_address = owner.to_string();
let nft_id = object_change.object_id.to_string();

// Update account_object_changes for WS updates
account_object_changes
.entry(owner.to_string())
.entry(owner_address.clone())
.or_insert(AccountObjectsUpdate {
sui_address: owner.to_string(),
sui_address: owner_address.clone(),
sequence_number: block_number,
object_changes: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
timestamp_ms: Utc::now().timestamp_millis() as u64,
})
.object_changes
.entry(object_change.object_id.clone())
.or_insert(object_change.clone());
.insert(object_change.object_id.clone(), object_change.clone());

objects_changes.push(object_change.clone());

// Generate notifications for NFT changes
match &object_change.status {
ObjectUpdateStatus::Created => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftMinted(NftMinted {
sui_address: owner_address,
nft_id,
}));
}
ObjectUpdateStatus::Received(received) => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftReceived(NftReceived {
receiver_address: received.receiver_address.clone(),
nft_id,
}));
}
ObjectUpdateStatus::Deleted => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftBurned(NftBurned {
sui_address: owner_address,
nft_id,
}));
}
ObjectUpdateStatus::Sent(sent) => {
notifications
.entry(transaction_id)
.or_insert_with(Vec::new)
.push(SuiIndexerNotification::NftSent(NftSent {
sender_address: sent.sender_address.clone(),
nft_id,
}));
}
_ => {}
}
}

objects_changes.push(object_change.clone());
}
}

let updates: Vec<SuiWsApiMsg> = ws_balance_changes
// Prepare WS updates
let ws_updates: Vec<SuiWsApiMsg> = ws_balance_changes
.into_values()
.map(|v| SuiWsApiMsg::TokenBalanceUpdate(v))
.map(SuiWsApiMsg::TokenBalanceUpdate)
.chain(
account_object_changes
.into_values()
.map(|v| SuiWsApiMsg::AccountObjectsUpdate(v)),
)
.chain(
objects_changes
.into_iter()
.map(|v| SuiWsApiMsg::ObjectUpdate(v)),
.map(SuiWsApiMsg::AccountObjectsUpdate),
)
.chain(objects_changes.into_iter().map(SuiWsApiMsg::ObjectUpdate))
.collect();

return (checkpoint_data.checkpoint.sequence_number, updates);
return (
(checkpoint_data.checkpoint.sequence_number, ws_updates),
(checkpoint_data.checkpoint.sequence_number, notifications),
);
}
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::sync::Arc;

use clap::Parser;
use odin::{get_odin, ConnectOptions, Odin};
use odin::{ConnectOptions, Odin};
use sui_types::nats_queue::nats_queue;
use tracing::info;

Expand Down
Loading

0 comments on commit 9e494cb

Please sign in to comment.