Skip to content

Commit

Permalink
balance changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Giems committed Jul 31, 2024
1 parent b6a70ad commit bf18685
Show file tree
Hide file tree
Showing 11 changed files with 560 additions and 35 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ sui-execution = { path = "sui-execution" }
# move-bytecode-verifier = { path = "external-crates/move/move-bytecode-verifier" }


odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", branch = "sui-indexer-nats-update", package = "odin" }
odin = { git = "ssh://[email protected]/nightly-labs/alexandria.git", rev = "ad438b1", package = "odin" }

# suiop dependencies
docker-api = "0.12.2"
Expand Down
175 changes: 161 additions & 14 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

use crate::handlers::committer::start_tx_checkpoint_commit_task;
use crate::handlers::tx_processor::IndexingPackageBuffer;
use crate::handlers::CustomCheckpointDataToCommit;
use crate::models::display::StoredDisplay;
use async_trait::async_trait;
use itertools::Itertools;
use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout};
use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use odin::sui_ws::{CoinCreated, CoinMutated, CoinObjectUpdateStatus};
use odin::sui_ws::{SuiWsApiMsg, TokenBalanceUpdate, TokenUpdate};
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use sui_package_resolver::{PackageStore, PackageStoreWithLruCache, Resolver};
Expand All @@ -20,17 +23,16 @@ use sui_types::dynamic_field::DynamicFieldType;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
};
use sui_types::nats_queue::NatsQueueSender;
use sui_types::nats_queue::{NatsQueueSender, WsPayload};
use sui_types::object::Object;
use tokio_util::sync::CancellationToken;

use tokio::sync::watch;
use tokio_util::sync::CancellationToken;

use diesel::r2d2::R2D2Connection;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use sui_data_ingestion_core::Worker;
use sui_json_rpc_types::SuiMoveValue;
use sui_json_rpc_types::{ObjectStatus, SuiMoveValue};
use sui_types::base_types::SequenceNumber;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
use sui_types::event::SystemEpochInfoEvent;
Expand All @@ -50,8 +52,8 @@ use crate::db::ConnectionPool;
use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver};
use crate::store::{IndexerStore, PgIndexerStore};
use crate::types::{
IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject,
IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
CustomIndexedTransaction, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo,
IndexedEvent, IndexedObject, IndexedPackage, IndexerResult, TransactionKind, TxIndex,
};

use super::tx_processor::EpochEndIndexingObjectStore;
Expand Down Expand Up @@ -151,7 +153,15 @@ where
self.package_resolver.clone(),
)
.await?;
self.indexed_checkpoint_sender.send(checkpoint_data).await?;

// Send ws updates via nats
let nats_ws_payload = generate_ws_updates_from_checkpoint_data(&checkpoint_data);
self.nats_queue.sender.send(nats_ws_payload).await?;

// Convert custom checkpoint data into original type
self.indexed_checkpoint_sender
.send(checkpoint_data.into())
.await?;
Ok(())
}

Expand Down Expand Up @@ -280,7 +290,7 @@ where
metrics: Arc<IndexerMetrics>,
packages: Vec<IndexedPackage>,
package_resolver: Arc<Resolver<impl PackageStore>>,
) -> Result<CheckpointDataToCommit, IndexerError> {
) -> Result<CustomCheckpointDataToCommit, IndexerError> {
let checkpoint_seq = data.checkpoint_summary.sequence_number;
info!(checkpoint_seq, "Indexing checkpoint data blob");

Expand Down Expand Up @@ -336,7 +346,7 @@ where
checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
);

Ok(CheckpointDataToCommit {
Ok(CustomCheckpointDataToCommit {
checkpoint,
transactions: db_transactions,
events: db_events,
Expand All @@ -355,7 +365,7 @@ where
checkpoint_contents: &CheckpointContents,
metrics: &IndexerMetrics,
) -> IndexerResult<(
Vec<IndexedTransaction>,
Vec<CustomIndexedTransaction>,
Vec<IndexedEvent>,
Vec<TxIndex>,
BTreeMap<String, StoredDisplay>,
Expand All @@ -380,14 +390,14 @@ where
let mut db_displays = BTreeMap::new();
let mut db_indices = Vec::new();

for tx in transactions {
for checkpoint_tx in transactions {
let CheckpointTransaction {
transaction: sender_signed_data,
effects: fx,
events,
input_objects,
output_objects,
} = tx;
} = checkpoint_tx;
// Unwrap safe - we checked they have equal length above
let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap();
if tx_digest != *sender_signed_data.digest() {
Expand Down Expand Up @@ -431,12 +441,30 @@ where
.chain(output_objects.iter())
.collect::<Vec<_>>();

// Add out custom maps
let status_map = get_object_status_map(&fx);
let input_objects_to_owner = input_objects
.iter()
.map(|input_object| (input_object.id(), input_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();
let output_objects_to_owner = output_objects
.iter()
.map(|output_object| (output_object.id(), output_object.owner().clone()))
.collect::<HashMap<ObjectID, Owner>>();

let (balance_change, object_changes) =
TxChangesProcessor::new(&objects, metrics.clone())
.get_changes(tx, &fx, &tx_digest)
.custom_get_changes(
tx,
&fx,
&tx_digest,
status_map,
&input_objects_to_owner,
&output_objects_to_owner,
)
.await?;

let db_txn = IndexedTransaction {
let db_txn = CustomIndexedTransaction {
tx_sequence_number,
tx_digest,
checkpoint_sequence_number: *checkpoint_summary.sequence_number(),
Expand Down Expand Up @@ -720,6 +748,54 @@ where
}
}

pub fn get_object_status_map(effects: &TransactionEffects) -> HashMap<ObjectID, ObjectStatus> {
let mut object_to_status: HashMap<ObjectID, ObjectStatus> = HashMap::new();
// Fill object_to_status objects
{
// Fill mutated objects
effects
.mutated()
.into_iter()
.map(|((id, _, _), _)| (id, ObjectStatus::Mutated))
.collect::<HashMap<ObjectID, ObjectStatus>>()
.iter()
.for_each(|(k, v)| {
object_to_status.insert(*k, v.clone());
});
// Fill deleted objects
effects
.all_tombstones()
.into_iter()
.map(|(id, _)| (id, ObjectStatus::Deleted))
.collect::<HashMap<ObjectID, ObjectStatus>>()
.iter()
.for_each(|(k, v)| {
object_to_status.insert(*k, v.clone());
});
// Fill created objects
effects
.created()
.into_iter()
.map(|((id, _, _), _)| (id, ObjectStatus::Created))
.collect::<HashMap<ObjectID, ObjectStatus>>()
.iter()
.for_each(|(k, v)| {
object_to_status.insert(*k, v.clone());
});
// Fill created objects
effects
.unwrapped()
.into_iter()
.map(|((id, _, _), _)| (id, ObjectStatus::Created))
.collect::<HashMap<ObjectID, ObjectStatus>>()
.iter()
.for_each(|(k, v)| {
object_to_status.insert(*k, v.clone());
});
}
object_to_status
}

async fn get_move_struct_layout_map(
objects: &[Object],
package_resolver: Arc<Resolver<impl PackageStore>>,
Expand Down Expand Up @@ -881,3 +957,74 @@ fn try_create_dynamic_field_info(
},
}))
}

pub fn generate_ws_updates_from_checkpoint_data(
checkpoint_data: &CustomCheckpointDataToCommit,
) -> WsPayload {
let block_number = checkpoint_data.checkpoint.sequence_number;

let mut ws_balance_changes: HashMap<String, TokenBalanceUpdate> = HashMap::new();

// transaction balance changes
for transaction in checkpoint_data.transactions.iter() {
for change in transaction.balance_change.iter() {
let user_address = match &change.owner.get_owner_address() {
Ok(address) => address.to_string(),
Err(_) => continue,
};
let coin_type = change.coin_type.to_canonical_string(true);

// Prepare ws update
let ws_update = ws_balance_changes
.entry(user_address.clone())
.or_insert(TokenBalanceUpdate {
sui_address: user_address.clone(),
sequence_number: block_number,
changed_balances: HashMap::new(),
timestamp_ms: chrono::Utc::now().timestamp_millis() as u64,
})
.changed_balances
.entry(coin_type.clone())
.or_insert(TokenUpdate {
coin_type,
object_changes: HashMap::new(),
});
match change.status {
// New coin is created
ObjectStatus::Created => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Created(CoinCreated {
amount: change.amount,
}));
}
ObjectStatus::Mutated => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Mutated(CoinMutated {
change: change.amount,
}));
}
ObjectStatus::Deleted => {
// Update ws update
ws_update
.object_changes
.entry(change.object_id.clone())
.or_insert(CoinObjectUpdateStatus::Deleted);
}
}
}
}

return (
checkpoint_data.checkpoint.sequence_number,
ws_balance_changes
.into_values()
.map(|v| SuiWsApiMsg::TokenBalanceUpdate(v))
.collect(),
);
}
33 changes: 31 additions & 2 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::collections::BTreeMap;
use crate::{
models::display::StoredDisplay,
types::{
IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject,
IndexedPackage, IndexedTransaction, TxIndex,
CustomIndexedTransaction, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo,
IndexedEvent, IndexedObject, IndexedPackage, IndexedTransaction, TxIndex,
},
};

Expand All @@ -29,6 +29,35 @@ pub struct CheckpointDataToCommit {
pub epoch: Option<EpochToCommit>,
}

#[derive(Debug)]
pub struct CustomCheckpointDataToCommit {
pub checkpoint: IndexedCheckpoint,
pub transactions: Vec<CustomIndexedTransaction>,
pub events: Vec<IndexedEvent>,
pub tx_indices: Vec<TxIndex>,
pub display_updates: BTreeMap<String, StoredDisplay>,
pub object_changes: TransactionObjectChangesToCommit,
pub object_history_changes: TransactionObjectChangesToCommit,
pub packages: Vec<IndexedPackage>,
pub epoch: Option<EpochToCommit>,
}

impl From<CustomCheckpointDataToCommit> for CheckpointDataToCommit {
fn from(data: CustomCheckpointDataToCommit) -> Self {
Self {
checkpoint: data.checkpoint,
transactions: data.transactions.into_iter().map(Into::into).collect(),
events: data.events,
tx_indices: data.tx_indices,
display_updates: data.display_updates,
object_changes: data.object_changes,
object_history_changes: data.object_history_changes,
packages: data.packages,
epoch: data.epoch,
}
}
}

#[derive(Clone, Debug)]
pub struct TransactionObjectChangesToCommit {
pub changed_objects: Vec<IndexedObject>,
Expand Down
Loading

0 comments on commit bf18685

Please sign in to comment.