Skip to content

Commit

Permalink
optimize prices indexer & api
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Nov 26, 2024
1 parent cbf76d4 commit dc495bf
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 72 deletions.
64 changes: 26 additions & 38 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::FromStr, sync::Arc};
use std::{collections::HashSet, str::FromStr, sync::Arc};

use ain_dftx::{Currency, Token, Weightage, COIN};
use ain_macros::ocean_endpoint;
Expand All @@ -8,11 +8,9 @@ use axum::{
Extension, Router,
};
use bitcoin::{hashes::Hash, Txid};
use indexmap::IndexSet;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use snafu::OptionExt;

use super::{
common::parse_token_currency,
Expand All @@ -22,7 +20,7 @@ use super::{
AppContext,
};
use crate::{
error::{ApiError, Error, OtherSnafu},
error::{ApiError, Error},
model::{
BlockContext, OracleIntervalSeconds, OraclePriceActive,
OraclePriceAggregatedIntervalAggregated, PriceTicker,
Expand Down Expand Up @@ -121,39 +119,28 @@ async fn list_prices(
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<PriceTickerResponse>> {
let sorted_ids = ctx
let mut set: HashSet<(Token, Currency)> = HashSet::new();

let prices = ctx
.services
.price_ticker
.by_key
.by_id
.list(None, SortOrder::Descending)?
.map(|item| {
let (_, id) = item?;
Ok(id)
})
.collect::<Result<Vec<_>>>()?;

// use IndexSet to rm dup without changing order
let mut sorted_ids_set = IndexSet::new();
for id in sorted_ids {
sorted_ids_set.insert(id);
}

let prices = sorted_ids_set
.into_iter()
.take(query.size)
.map(|id| {
let price_ticker = ctx
.services
.price_ticker
.by_id
.get(&id)?
.context(OtherSnafu {
msg: "Missing price ticker index",
})?;

Ok(PriceTickerResponse::from((id, price_ticker)))
.flat_map(|item| {
let ((_, _, token, currency), v) = item?;
let has_key = set.contains(&(token.clone(), currency.clone()));
if !has_key {
set.insert((token.clone(), currency.clone()));
Ok::<Option<PriceTickerResponse>, Error>(Some(PriceTickerResponse::from((
(token, currency),
v,
))))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()?;
.flatten()
.collect::<Vec<_>>();

Ok(ApiPagedResponse::of(prices, query.size, |price| {
price.sort.to_string()
Expand All @@ -167,11 +154,12 @@ async fn get_price(
) -> Result<Response<Option<PriceTickerResponse>>> {
let (token, currency) = parse_token_currency(&key)?;

let price_ticker = ctx
.services
.price_ticker
.by_id
.get(&(token.clone(), currency.clone()))?;
let price_ticker = ctx.services.price_ticker.by_id.get(&(
[0xffu8; 4],
[0xffu8; 4],
token.clone(),
currency.clone(),
))?;

let Some(price_ticker) = price_ticker else {
return Ok(Response::new(None));
Expand Down
18 changes: 15 additions & 3 deletions lib/ain-ocean/src/api/stats/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
};

use ain_dftx::{Currency, Token};
use cached::proc_macro::cached;
use defichain_rpc::{
defichain_rpc_json::token::TokenPagination, json::account::AccountAmount, AccountRPC, Client,
Expand All @@ -22,7 +27,7 @@ use crate::{
stats::get_block_reward_distribution,
AppContext,
},
error::{DecimalConversionSnafu, OtherSnafu},
error::{DecimalConversionSnafu, Error, OtherSnafu},
model::MasternodeStatsData,
storage::{RepositoryOps, SortOrder},
Result,
Expand Down Expand Up @@ -100,12 +105,19 @@ pub async fn get_count(ctx: &Arc<AppContext>) -> Result<Count> {
.get_latest()?
.map_or(0, |mn| mn.stats.count);

let mut set: HashSet<(Token, Currency)> = HashSet::new();
let prices = ctx
.services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.collect::<Vec<_>>();
.flat_map(|item| {
let ((_, _, token, currency), _) = item?;
set.insert((token, currency));
Ok::<HashSet<(Token, Currency)>, Error>(set.clone())
})
.next()
.unwrap_or(set);

Ok(Count {
blocks: 0,
Expand Down
41 changes: 30 additions & 11 deletions lib/ain-ocean/src/indexer/loan_token.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{str::FromStr, sync::Arc};
use std::{collections::HashSet, str::FromStr, sync::Arc};

use ain_dftx::{loans::SetLoanToken, Currency, Token};
use log::trace;
use rust_decimal::{prelude::Zero, Decimal};
use rust_decimal_macros::dec;

use crate::{
error::Error,
indexer::{Context, Index, Result},
model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated},
network::Network,
Expand Down Expand Up @@ -86,15 +87,21 @@ pub fn index_active_price(services: &Arc<Services>, block: &BlockContext) -> Res
_ => 120,
};
if block.height % block_interval == 0 {
let price_tickers = services
let mut set: HashSet<(Token, Currency)> = HashSet::new();
let pairs = services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.flatten()
.collect::<Vec<_>>();

for (ticker_id, _) in price_tickers {
perform_active_price_tick(services, ticker_id, block)?;
.flat_map(|item| {
let ((_, _, token, currency), _) = item?;
set.insert((token, currency));
Ok::<HashSet<(Token, Currency)>, Error>(set.clone())
})
.next()
.unwrap_or(set);

for (token, currency) in pairs {
perform_active_price_tick(services, (token, currency), block)?;
}
}
Ok(())
Expand Down Expand Up @@ -136,14 +143,26 @@ pub fn invalidate_active_price(services: &Arc<Services>, block: &BlockContext) -
_ => 120,
};
if block.height % block_interval == 0 {
let price_tickers = services
let mut set: HashSet<(Token, Currency)> = HashSet::new();
let pairs = services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.flatten()
.collect::<Vec<_>>();
.flat_map(|item| {
let ((_, _, token, currency), _) = item?;
set.insert((token, currency));
Ok::<HashSet<(Token, Currency)>, Error>(set.clone())
})
.next()
.unwrap_or(set);

// convert to vector to reverse the hashset is required
let mut vec = Vec::new();
for pair in pairs {
vec.insert(0, pair);
}

for ((token, currency), _) in price_tickers.into_iter().rev() {
for (token, currency) in vec {
services.oracle_price_active.by_id.delete(&(
token,
currency,
Expand Down
8 changes: 3 additions & 5 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ fn index_set_oracle_data(
pairs: &HashSet<(Token, Currency)>,
) -> Result<()> {
let oracle_repo = &services.oracle_price_aggregated;
let ticker_repo = &services.price_ticker;

for pair in pairs {
let price_aggregated = map_price_aggregated(services, context, pair)?;
Expand All @@ -380,15 +379,14 @@ fn index_set_oracle_data(
);
oracle_repo.by_id.put(&id, &price_aggregated)?;

let key = (
let id = (
price_aggregated.aggregated.oracles.total.to_be_bytes(),
price_aggregated.block.height.to_be_bytes(),
token,
currency,
);
ticker_repo.by_key.put(&key, pair)?;
ticker_repo.by_id.put(
&pair.clone(),
services.price_ticker.by_id.put(
&id,
&PriceTicker {
price: price_aggregated,
},
Expand Down
2 changes: 0 additions & 2 deletions lib/ain-ocean/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ pub struct OracleHistoryService {

pub struct PriceTickerService {
by_id: PriceTicker,
by_key: PriceTickerKey,
}

pub struct ScriptActivityService {
Expand Down Expand Up @@ -196,7 +195,6 @@ impl Services {
},
price_ticker: PriceTickerService {
by_id: PriceTicker::new(Arc::clone(&store)),
by_key: PriceTickerKey::new(Arc::clone(&store)),
},
script_activity: ScriptActivityService {
by_id: ScriptActivity::new(Arc::clone(&store)),
Expand Down
3 changes: 1 addition & 2 deletions lib/ain-ocean/src/model/price_ticker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use serde::{Deserialize, Serialize};

use super::oracle_price_aggregated::OraclePriceAggregated;

pub type PriceTickerId = (String, String); //token-currency
pub type PriceTickerKey = ([u8; 4], [u8; 4], String, String); // total-height-token-currency
pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
Expand Down
12 changes: 1 addition & 11 deletions lib/ain-ocean/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,6 @@ define_table! {
}
}

define_table! {
#[derive(Debug)]
pub struct PriceTickerKey {
key_type = model::PriceTickerKey,
value_type = model::PriceTickerId,
},
SecondaryIndex = PriceTicker
}

define_table! {
#[derive(Debug)]
pub struct RawBlock {
Expand Down Expand Up @@ -517,7 +508,7 @@ define_table! {
}
}

pub const COLUMN_NAMES: [&str; 28] = [
pub const COLUMN_NAMES: [&str; 27] = [
Block::NAME,
BlockByHeight::NAME,
MasternodeStats::NAME,
Expand All @@ -534,7 +525,6 @@ pub const COLUMN_NAMES: [&str; 28] = [
PoolSwapAggregatedKey::NAME,
PoolSwap::NAME,
PriceTicker::NAME,
PriceTickerKey::NAME,
RawBlock::NAME,
ScriptActivity::NAME,
ScriptAggregation::NAME,
Expand Down

0 comments on commit dc495bf

Please sign in to comment.