diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 47e0d44f32..54a157bc2a 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -8,11 +8,9 @@ use axum::{ Extension, Router, }; use bitcoin::{hashes::Hash, Txid}; -use indexmap::IndexSet; use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; -use snafu::OptionExt; use super::{ common::parse_token_currency, @@ -22,7 +20,7 @@ use super::{ AppContext, }; use crate::{ - error::{ApiError, Error, OtherSnafu}, + error::{ApiError, Error}, model::{ BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceAggregatedIntervalAggregated, PriceTicker, @@ -121,39 +119,28 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let sorted_ids = ctx + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + + let prices = ctx .services .price_ticker - .by_key + .by_id .list(None, SortOrder::Descending)? - .map(|item| { - let (_, id) = item?; - Ok(id) - }) - .collect::>>()?; - - // use IndexSet to rm dup without changing order - let mut sorted_ids_set = IndexSet::new(); - for id in sorted_ids { - sorted_ids_set.insert(id); - } - - let prices = sorted_ids_set - .into_iter() - .take(query.size) - .map(|id| { - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&id)? - .context(OtherSnafu { - msg: "Missing price ticker index", - })?; - - Ok(PriceTickerResponse::from((id, price_ticker))) + .flat_map(|item| { + let ((_, _, token, currency), v) = item?; + let has_key = set.contains(&(token.clone(), currency.clone())); + if !has_key { + set.insert((token.clone(), currency.clone())); + Ok::, Error>(Some(PriceTickerResponse::from(( + (token, currency), + v, + )))) + } else { + Ok(None) + } }) - .collect::>>()?; + .flatten() + .collect::>(); Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() @@ -167,11 +154,12 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&(token.clone(), currency.clone()))?; + let price_ticker = ctx.services.price_ticker.by_id.get(&( + [0xffu8; 4], + [0xffu8; 4], + token.clone(), + currency.clone(), + ))?; let Some(price_ticker) = price_ticker else { return Ok(Response::new(None)); diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index c66fdf9e28..de29ea27cf 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -1,5 +1,10 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, +}; +use ain_dftx::{Currency, Token}; use cached::proc_macro::cached; use defichain_rpc::{ defichain_rpc_json::token::TokenPagination, json::account::AccountAmount, AccountRPC, Client, @@ -22,7 +27,7 @@ use crate::{ stats::get_block_reward_distribution, AppContext, }, - error::{DecimalConversionSnafu, OtherSnafu}, + error::{DecimalConversionSnafu, Error, OtherSnafu}, model::MasternodeStatsData, storage::{RepositoryOps, SortOrder}, Result, @@ -100,12 +105,19 @@ pub async fn get_count(ctx: &Arc) -> Result { .get_latest()? .map_or(0, |mn| mn.stats.count); + let mut set: HashSet<(Token, Currency)> = HashSet::new(); let prices = ctx .services .price_ticker .by_id .list(None, SortOrder::Descending)? - .collect::>(); + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); Ok(Count { blocks: 0, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index cc35ca42ef..d05d2ecd9a 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{loans::SetLoanToken, Currency, Token}; use log::trace; @@ -6,6 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ + error::Error, indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, @@ -86,15 +87,21 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); - - for (ticker_id, _) in price_tickers { - perform_active_price_tick(services, ticker_id, block)?; + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + for (token, currency) in pairs { + perform_active_price_tick(services, (token, currency), block)?; } } Ok(()) @@ -136,14 +143,26 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + // convert to vector to reverse the hashset is required + let mut vec = Vec::new(); + for pair in pairs { + vec.insert(0, pair); + } - for ((token, currency), _) in price_tickers.into_iter().rev() { + for (token, currency) in vec { services.oracle_price_active.by_id.delete(&( token, currency, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 6ca9d02aeb..c9fb315fd3 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -360,7 +360,6 @@ fn index_set_oracle_data( pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { let oracle_repo = &services.oracle_price_aggregated; - let ticker_repo = &services.price_ticker; for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; @@ -380,15 +379,14 @@ fn index_set_oracle_data( ); oracle_repo.by_id.put(&id, &price_aggregated)?; - let key = ( + let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), token, currency, ); - ticker_repo.by_key.put(&key, pair)?; - ticker_repo.by_id.put( - &pair.clone(), + services.price_ticker.by_id.put( + &id, &PriceTicker { price: price_aggregated, }, diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index c72b6a9f67..bde84b9312 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -93,7 +93,6 @@ pub struct OracleHistoryService { pub struct PriceTickerService { by_id: PriceTicker, - by_key: PriceTickerKey, } pub struct ScriptActivityService { @@ -196,7 +195,6 @@ impl Services { }, price_ticker: PriceTickerService { by_id: PriceTicker::new(Arc::clone(&store)), - by_key: PriceTickerKey::new(Arc::clone(&store)), }, script_activity: ScriptActivityService { by_id: ScriptActivity::new(Arc::clone(&store)), diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 777e693649..158fb96157 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -2,8 +2,7 @@ use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; -pub type PriceTickerId = (String, String); //token-currency -pub type PriceTickerKey = ([u8; 4], [u8; 4], String, String); // total-height-token-currency +pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 92ef6cfc9b..1f373022c4 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -372,15 +372,6 @@ define_table! { } } -define_table! { - #[derive(Debug)] - pub struct PriceTickerKey { - key_type = model::PriceTickerKey, - value_type = model::PriceTickerId, - }, - SecondaryIndex = PriceTicker -} - define_table! { #[derive(Debug)] pub struct RawBlock { @@ -517,7 +508,7 @@ define_table! { } } -pub const COLUMN_NAMES: [&str; 28] = [ +pub const COLUMN_NAMES: [&str; 27] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -534,7 +525,6 @@ pub const COLUMN_NAMES: [&str; 28] = [ PoolSwapAggregatedKey::NAME, PoolSwap::NAME, PriceTicker::NAME, - PriceTickerKey::NAME, RawBlock::NAME, ScriptActivity::NAME, ScriptAggregation::NAME,