diff --git a/lib/ain-ocean/src/api/common.rs b/lib/ain-ocean/src/api/common.rs index 64d79f0d9f..5482bff46d 100644 --- a/lib/ain-ocean/src/api/common.rs +++ b/lib/ain-ocean/src/api/common.rs @@ -11,9 +11,10 @@ use super::query::PaginationQuery; use crate::{ error::{ Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu, - InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, + InvalidPriceTickerSortKeySnafu, InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, }, hex_encoder::as_sha256, + model::PriceTickerId, network::Network, Result, }; @@ -128,6 +129,30 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> { Ok((height, txid)) } +pub fn parse_price_ticker_sort(item: &str) -> Result { + let mut parts = item.split('-'); + let count_height_token = parts.next().context(InvalidPriceTickerSortKeySnafu { item })?; + let encoded_count = &count_height_token[..8]; + let encoded_height = &count_height_token[8..16]; + let token = &count_height_token[16..]; + let token = token.to_string(); + + let count: [u8; 4] = hex::decode(encoded_count)? + .try_into() + .map_err(|_| ToArrayError)?; + + let height: [u8; 4] = hex::decode(encoded_height)? + .try_into() + .map_err(|_| ToArrayError)?; + + let currency = parts + .next() + .context(InvalidTokenCurrencySnafu { item })? + .to_string(); + + Ok((count, height, token, currency)) +} + #[must_use] pub fn format_number(v: Decimal) -> String { if v == dec!(0) { diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index d320e3b07d..df3e79e47c 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use super::{ - common::parse_token_currency, + common::{parse_token_currency, parse_price_ticker_sort}, oracle::OraclePriceFeedResponse, query::PaginationQuery, response::{ApiPagedResponse, Response}, @@ -119,28 +119,26 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let next = query + .next + .map(|item| { + let id = parse_price_ticker_sort(&item)?; + Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id) + }) + .transpose()?; let prices = ctx .services .price_ticker .by_id - .list(None, SortOrder::Descending)? - .flat_map(|item| { + .list(next.clone(), SortOrder::Descending)? + .take(query.size + usize::from(next.clone().is_some())) + .skip(usize::from(next.is_some())) + .map(|item| { let ((_, _, token, currency), v) = item?; - let has_key = set.contains(&(token.clone(), currency.clone())); - if !has_key { - set.insert((token.clone(), currency.clone())); - Ok::, Error>(Some(PriceTickerResponse::from(( - (token, currency), - v, - )))) - } else { - Ok(None) - } + Ok(PriceTickerResponse::from(((token, currency), v))) }) - .flatten() - .collect::>(); + .collect::>>()?; Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() diff --git a/lib/ain-ocean/src/error.rs b/lib/ain-ocean/src/error.rs index 2e32aca456..b4d5bda2e9 100644 --- a/lib/ain-ocean/src/error.rs +++ b/lib/ain-ocean/src/error.rs @@ -212,6 +212,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Invalid price ticker sort key: {}", item))] + InvalidPriceTickerSortKey { + item: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Invalid amount format: {}", item))] InvalidAmount { item: String, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index c9fb315fd3..589313c842 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -359,11 +359,8 @@ fn index_set_oracle_data( context: &Context, pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { - let oracle_repo = &services.oracle_price_aggregated; - for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; - let Some(price_aggregated) = price_aggregated else { continue; }; @@ -377,15 +374,26 @@ fn index_set_oracle_data( price_aggregated.block.median_time.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), ); - oracle_repo.by_id.put(&id, &price_aggregated)?; - + services.oracle_price_aggregated.by_id.put(&id, &price_aggregated)?; + let price_repo = &services.price_ticker; let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), - token, - currency, + token.clone(), + currency.clone(), ); - services.price_ticker.by_id.put( + let prev_price = price_repo + .by_id + .list(Some(id.clone()), SortOrder::Descending)? + .find(|item| match item { + Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, + _ => true + }) + .transpose()?; + if let Some((k, _)) = prev_price { + price_repo.by_id.delete(&k)? + } + price_repo.by_id.put( &id, &PriceTicker { price: price_aggregated,