diff --git a/Cargo.lock b/Cargo.lock index 188711a11d150..27ca9bbcbf530 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13594,6 +13594,7 @@ dependencies = [ "mysten-metrics", "prometheus", "serde", + "serde_json", "serde_yaml 0.8.26", "sui-config", "sui-data-ingestion-core", diff --git a/crates/sui-deepbook-indexer/Cargo.toml b/crates/sui-deepbook-indexer/Cargo.toml index 035ad9e8fc739..24908cc5aa05f 100644 --- a/crates/sui-deepbook-indexer/Cargo.toml +++ b/crates/sui-deepbook-indexer/Cargo.toml @@ -33,6 +33,7 @@ sui-indexer-builder.workspace = true tempfile.workspace = true axum.workspace = true bigdecimal = { version = "0.4.5" } +serde_json = { version = "1.0", features = ["preserve_order"] } [dev-dependencies] hex-literal = "0.3.4" diff --git a/crates/sui-deepbook-indexer/src/server.rs b/crates/sui-deepbook-indexer/src/server.rs index 8e7cc411d24f6..8e7bc27c58a49 100644 --- a/crates/sui-deepbook-indexer/src/server.rs +++ b/crates/sui-deepbook-indexer/src/server.rs @@ -8,7 +8,6 @@ use crate::{ sui_deepbook_indexer::PgDeepbookPersistent, }; use axum::{ - debug_handler, extract::{Path, Query, State}, http::StatusCode, routing::get, @@ -19,16 +18,35 @@ use diesel::BoolExpressionMethods; use diesel::QueryDsl; use diesel::{ExpressionMethods, SelectableHelper}; use diesel_async::RunQueryDsl; +use serde_json::Value; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, net::SocketAddr}; use tokio::{net::TcpListener, task::JoinHandle}; +use std::str::FromStr; +use sui_json_rpc_types::{SuiObjectData, SuiObjectDataOptions, SuiObjectResponse}; +use sui_sdk::SuiClientBuilder; +use sui_types::{ + base_types::{ObjectID, ObjectRef, SuiAddress}, + programmable_transaction_builder::ProgrammableTransactionBuilder, + transaction::{Argument, CallArg, Command, ObjectArg, ProgrammableMoveCall, TransactionKind}, + type_input::TypeInput, + TypeTag, +}; + +pub const SUI_MAINNET_URL: &str = "https://fullnode.mainnet.sui.io:443"; pub const GET_POOLS_PATH: &str = "/get_pools"; pub const GET_HISTORICAL_VOLUME_BY_BALANCE_MANAGER_ID_WITH_INTERVAL: &str = "/get_historical_volume_by_balance_manager_id_with_interval/:pool_ids/:balance_manager_id"; pub const GET_HISTORICAL_VOLUME_BY_BALANCE_MANAGER_ID: &str = "/get_historical_volume_by_balance_manager_id/:pool_ids/:balance_manager_id"; -pub const GET_HISTORICAL_VOLUME_PATH: &str = "/get_historical_volume/:pool_ids"; +pub const HISTORICAL_VOLUME_PATH: &str = "/historical_volume/:pool_names"; +pub const ALL_HISTORICAL_VOLUME_PATH: &str = "/all_historical_volume"; +pub const LEVEL2_PATH: &str = "/orderbook/:pool_name"; +pub const LEVEL2_MODULE: &str = "pool"; +pub const LEVEL2_FUNCTION: &str = "get_level2_ticks_from_mid"; +pub const DEEPBOOK_PACKAGE_ID: &str = + "0x2c8d603bc51326b8c13cef9dd07031a408a48dddb541963357661df5d3204809"; pub fn run_server(socket_address: SocketAddr, state: PgDeepbookPersistent) -> JoinHandle<()> { tokio::spawn(async move { @@ -41,7 +59,8 @@ pub(crate) fn make_router(state: PgDeepbookPersistent) -> Router { Router::new() .route("/", get(health_check)) .route(GET_POOLS_PATH, get(get_pools)) - .route(GET_HISTORICAL_VOLUME_PATH, get(get_historical_volume)) + .route(HISTORICAL_VOLUME_PATH, get(historical_volume)) + .route(ALL_HISTORICAL_VOLUME_PATH, get(all_historical_volume)) .route( GET_HISTORICAL_VOLUME_BY_BALANCE_MANAGER_ID_WITH_INTERVAL, get(get_historical_volume_by_balance_manager_id_with_interval), @@ -50,6 +69,7 @@ pub(crate) fn make_router(state: PgDeepbookPersistent) -> Router { GET_HISTORICAL_VOLUME_BY_BALANCE_MANAGER_ID, get(get_historical_volume_by_balance_manager_id), ) + .route(LEVEL2_PATH, get(orderbook)) .with_state(state) } @@ -78,7 +98,6 @@ async fn health_check() -> StatusCode { } /// Get all pools stored in database -#[debug_handler] async fn get_pools( State(state): State, ) -> Result>, DeepBookError> { @@ -91,16 +110,32 @@ async fn get_pools( Ok(Json(results)) } -async fn get_historical_volume( - Path(pool_ids): Path, +async fn historical_volume( + Path(pool_names): Path, Query(params): Query>, State(state): State, ) -> Result>, DeepBookError> { - let connection = &mut state.pool.get().await?; - - let pool_ids_list: Vec = pool_ids.split(',').map(|s| s.to_string()).collect(); + // Fetch all pools to map names to IDs + let pools: Json> = get_pools(State(state.clone())).await?; + let pool_name_to_id: HashMap = pools + .0 + .into_iter() + .map(|pool| (pool.pool_name, pool.pool_id)) + .collect(); + + // Map provided pool names to pool IDs + let pool_ids_list: Vec = pool_names + .split(',') + .filter_map(|name| pool_name_to_id.get(name).cloned()) + .collect(); + + if pool_ids_list.is_empty() { + return Err(DeepBookError::InternalError( + "No valid pool names provided".to_string(), + )); + } - // Get start_time and end_time from query parameters (in seconds) + // Parse start_time and end_time from query parameters (in seconds) and convert to milliseconds let end_time = params .get("end_time") .and_then(|v| v.parse::().ok()) @@ -129,6 +164,8 @@ async fn get_historical_volume( sql::("quote_quantity") }; + // Query the database for the historical volume + let connection = &mut state.pool.get().await?; let results: Vec<(String, i64)> = schema::order_fills::table .select((schema::order_fills::pool_id, column_to_query)) .filter(schema::order_fills::pool_id.eq_any(pool_ids_list)) @@ -136,15 +173,38 @@ async fn get_historical_volume( .load(connection) .await?; - // Aggregate volume by pool + // Aggregate volume by pool ID and map back to pool names let mut volume_by_pool = HashMap::new(); for (pool_id, volume) in results { - *volume_by_pool.entry(pool_id).or_insert(0) += volume as u64; + if let Some(pool_name) = pool_name_to_id + .iter() + .find(|(_, id)| **id == pool_id) + .map(|(name, _)| name) + { + *volume_by_pool.entry(pool_name.clone()).or_insert(0) += volume as u64; + } } Ok(Json(volume_by_pool)) } +/// Get all historical volume for all pools +async fn all_historical_volume( + Query(params): Query>, + State(state): State, +) -> Result>, DeepBookError> { + let pools: Json> = get_pools(State(state.clone())).await?; + + let pool_names: String = pools + .0 + .into_iter() + .map(|pool| pool.pool_name) + .collect::>() + .join(","); + + historical_volume(Path(pool_names), Query(params), State(state)).await +} + async fn get_historical_volume_by_balance_manager_id( Path((pool_ids, balance_manager_id)): Path<(String, String)>, Query(params): Query>, @@ -310,3 +370,248 @@ async fn get_historical_volume_by_balance_manager_id_with_interval( Ok(Json(metrics_by_interval)) } + +/// Level2 data for all pools +async fn orderbook( + Path(pool_name): Path, + Query(params): Query>, + State(state): State, +) -> Result>, DeepBookError> { + let depth = params + .get("depth") + .map(|v| v.parse::()) + .transpose() + .map_err(|_| { + DeepBookError::InternalError("Depth must be a non-negative integer".to_string()) + })? + .map(|depth| if depth == 0 { 200 } else { depth }); + + if let Some(depth) = depth { + if depth == 1 { + return Err(DeepBookError::InternalError( + "Depth cannot be 1. Use a value greater than 1 or 0 for the entire orderbook" + .to_string(), + )); + } + } + + let level = params + .get("level") + .map(|v| v.parse::()) + .transpose() + .map_err(|_| { + DeepBookError::InternalError("Level must be an integer between 1 and 2".to_string()) + })?; + + if let Some(level) = level { + if !(1..=2).contains(&level) { + return Err(DeepBookError::InternalError( + "Level must be 1 or 2".to_string(), + )); + } + } + + let ticks_from_mid = match (depth, level) { + (Some(_), Some(1)) => 1u64, // Depth + Level 1 → Best bid and ask + (Some(depth), Some(2)) | (Some(depth), None) => depth / 2, // Depth + Level 2 → Use depth + (None, Some(1)) => 1u64, // Only Level 1 → Best bid and ask + (None, Some(2)) | (None, None) => 100u64, // Level 2 or default → 100 ticks + _ => 100u64, // Fallback to default + }; + + // Fetch the pool data from the `pools` table + let connection = &mut state.pool.get().await?; + let pool_data = schema::pools::table + .filter(schema::pools::pool_name.eq(pool_name.clone())) + .select(( + schema::pools::pool_id, + schema::pools::base_asset_id, + schema::pools::base_asset_decimals, + schema::pools::quote_asset_id, + schema::pools::quote_asset_decimals, + )) + .first::<(String, String, i16, String, i16)>(connection) + .await?; + + let (pool_id, base_asset_id, base_decimals, quote_asset_id, quote_decimals) = pool_data; + let base_decimals = base_decimals as u8; + let quote_decimals = quote_decimals as u8; + + let pool_address = ObjectID::from_hex_literal(&pool_id)?; + + let sui_client = SuiClientBuilder::default().build(SUI_MAINNET_URL).await?; + let mut ptb = ProgrammableTransactionBuilder::new(); + + let pool_object: SuiObjectResponse = sui_client + .read_api() + .get_object_with_options(pool_address, SuiObjectDataOptions::full_content()) + .await?; + let pool_data: &SuiObjectData = + pool_object + .data + .as_ref() + .ok_or(DeepBookError::InternalError(format!( + "Missing data in pool object response for '{}'", + pool_name + )))?; + let pool_object_ref: ObjectRef = (pool_data.object_id, pool_data.version, pool_data.digest); + + let pool_input = CallArg::Object(ObjectArg::ImmOrOwnedObject(pool_object_ref)); + ptb.input(pool_input)?; + + let input_argument = CallArg::Pure(bcs::to_bytes(&ticks_from_mid).map_err(|_| { + DeepBookError::InternalError("Failed to serialize ticks_from_mid".to_string()) + })?); + ptb.input(input_argument)?; + + let sui_clock_object_id = ObjectID::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000006", + )?; + let sui_clock_object: SuiObjectResponse = sui_client + .read_api() + .get_object_with_options(sui_clock_object_id, SuiObjectDataOptions::full_content()) + .await?; + let clock_data: &SuiObjectData = + sui_clock_object + .data + .as_ref() + .ok_or(DeepBookError::InternalError( + "Missing data in clock object response".to_string(), + ))?; + + let sui_clock_object_ref: ObjectRef = + (clock_data.object_id, clock_data.version, clock_data.digest); + + let clock_input = CallArg::Object(ObjectArg::ImmOrOwnedObject(sui_clock_object_ref)); + ptb.input(clock_input)?; + + let base_coin_type = parse_type_input(&base_asset_id)?; + let quote_coin_type = parse_type_input("e_asset_id)?; + + let package = ObjectID::from_hex_literal(DEEPBOOK_PACKAGE_ID) + .map_err(|e| DeepBookError::InternalError(format!("Invalid pool ID: {}", e)))?; + let module = LEVEL2_MODULE.to_string(); + let function = LEVEL2_FUNCTION.to_string(); + + ptb.command(Command::MoveCall(Box::new(ProgrammableMoveCall { + package, + module, + function, + type_arguments: vec![base_coin_type, quote_coin_type], + arguments: vec![Argument::Input(0), Argument::Input(1), Argument::Input(2)], + }))); + + let builder = ptb.finish(); + let tx = TransactionKind::ProgrammableTransaction(builder); + + let result = sui_client + .read_api() + .dev_inspect_transaction_block(SuiAddress::default(), tx, None, None, None) + .await?; + + let mut binding = result.results.ok_or(DeepBookError::InternalError( + "No results from dev_inspect_transaction_block".to_string(), + ))?; + let bid_prices = &binding + .first_mut() + .ok_or(DeepBookError::InternalError( + "No return values for bid prices".to_string(), + ))? + .return_values + .first_mut() + .ok_or(DeepBookError::InternalError( + "No bid price data found".to_string(), + ))? + .0; + let bid_parsed_prices: Vec = bcs::from_bytes(bid_prices).map_err(|_| { + DeepBookError::InternalError("Failed to deserialize bid prices".to_string()) + })?; + let bid_quantities = &binding + .first_mut() + .ok_or(DeepBookError::InternalError( + "No return values for bid quantities".to_string(), + ))? + .return_values + .get(1) + .ok_or(DeepBookError::InternalError( + "No bid quantity data found".to_string(), + ))? + .0; + let bid_parsed_quantities: Vec = bcs::from_bytes(bid_quantities).map_err(|_| { + DeepBookError::InternalError("Failed to deserialize bid quantities".to_string()) + })?; + + let ask_prices = &binding + .first_mut() + .ok_or(DeepBookError::InternalError( + "No return values for ask prices".to_string(), + ))? + .return_values + .get(2) + .ok_or(DeepBookError::InternalError( + "No ask price data found".to_string(), + ))? + .0; + let ask_parsed_prices: Vec = bcs::from_bytes(ask_prices).map_err(|_| { + DeepBookError::InternalError("Failed to deserialize ask prices".to_string()) + })?; + let ask_quantities = &binding + .first_mut() + .ok_or(DeepBookError::InternalError( + "No return values for ask quantities".to_string(), + ))? + .return_values + .get(3) + .ok_or(DeepBookError::InternalError( + "No ask quantity data found".to_string(), + ))? + .0; + let ask_parsed_quantities: Vec = bcs::from_bytes(ask_quantities).map_err(|_| { + DeepBookError::InternalError("Failed to deserialize ask quantities".to_string()) + })?; + + let mut result = HashMap::new(); + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|_| DeepBookError::InternalError("System time error".to_string()))? + .as_millis() as i64; + result.insert("timestamp".to_string(), Value::from(timestamp.to_string())); + + let bids: Vec = bid_parsed_prices + .into_iter() + .zip(bid_parsed_quantities.into_iter()) + .take(ticks_from_mid as usize) + .map(|(price, quantity)| { + let price_factor = 10u64.pow((9 - base_decimals + quote_decimals).into()); + let quantity_factor = 10u64.pow((base_decimals).into()); + Value::Array(vec![ + Value::from((price as f64 / price_factor as f64).to_string()), + Value::from((quantity as f64 / quantity_factor as f64).to_string()), + ]) + }) + .collect(); + result.insert("bids".to_string(), Value::Array(bids)); + + let asks: Vec = ask_parsed_prices + .into_iter() + .zip(ask_parsed_quantities.into_iter()) + .take(ticks_from_mid as usize) + .map(|(price, quantity)| { + let price_factor = 10u64.pow((9 - base_decimals + quote_decimals).into()); + let quantity_factor = 10u64.pow((base_decimals).into()); + Value::Array(vec![ + Value::from((price as f64 / price_factor as f64).to_string()), + Value::from((quantity as f64 / quantity_factor as f64).to_string()), + ]) + }) + .collect(); + result.insert("asks".to_string(), Value::Array(asks)); + + Ok(Json(result)) +} + +fn parse_type_input(type_str: &str) -> Result { + let type_tag = TypeTag::from_str(type_str)?; + Ok(TypeInput::from(type_tag)) +}