From 997a654253d5f4ecadcd6f537561cada42999708 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Wed, 27 Nov 2024 04:15:49 +0800 Subject: [PATCH] Ocean: fix active price (#3126) * fix get_feed_active api resp * fix OraclePriceAggregatedId & OraclePriceFeedId sort key * fix index_block_start and index_block_end * clippy * fmt * add missing pagination * [oracle] convert u32,i64 to be bytes * ScriptAggregationId height u32 -> bytes * storage: endianness - mn, poolswap * storage: endianness - OracleHistory * ScriptUnspentKey uzize to be * fmt_rs * ApiRpcResponse * optimize prices indexer & api * missing: script activity key usize to BE * fmt_rs * use filter_map to perform set --- lib/ain-ocean/src/api/address.rs | 23 ++- lib/ain-ocean/src/api/loan.rs | 15 +- lib/ain-ocean/src/api/oracle.rs | 6 +- lib/ain-ocean/src/api/prices.rs | 164 +++++++++++------- lib/ain-ocean/src/api/response.rs | 14 ++ lib/ain-ocean/src/api/rpc.rs | 6 +- lib/ain-ocean/src/api/stats/cache.rs | 14 +- lib/ain-ocean/src/indexer/loan_token.rs | 77 ++++---- lib/ain-ocean/src/indexer/mod.rs | 108 ++++++------ lib/ain-ocean/src/indexer/oracle.rs | 44 +++-- lib/ain-ocean/src/indexer/poolswap.rs | 107 ++++++------ lib/ain-ocean/src/lib.rs | 2 - .../src/model/oracle_price_active.rs | 2 +- .../src/model/oracle_price_aggregated.rs | 2 +- .../model/oracle_price_aggregated_interval.rs | 2 +- lib/ain-ocean/src/model/oracle_price_feed.rs | 2 +- lib/ain-ocean/src/model/price_ticker.rs | 3 +- lib/ain-ocean/src/model/script_activity.rs | 2 +- lib/ain-ocean/src/model/script_aggregation.rs | 2 +- lib/ain-ocean/src/model/script_unspent.rs | 4 +- lib/ain-ocean/src/storage/mod.rs | 121 +++++++++++-- 21 files changed, 465 insertions(+), 255 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index 6e98e80c46e..be6ee17749e 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -173,7 +173,7 @@ fn get_latest_aggregation( .services .script_aggregation .by_id - .list(Some((hid, u32::MAX)), SortOrder::Descending)? + .list(Some((hid, [0xffu8; 4])), SortOrder::Descending)? .take(1) .take_while(|item| match item { Ok(((v, _), _)) => v == &hid, @@ -342,8 +342,8 @@ async fn list_transactions( _ => ScriptActivityTypeHex::Vout, }; let txid = Txid::from_str(txid)?; - let n = n.parse::()?; - Ok::<([u8; 4], ScriptActivityTypeHex, Txid, usize), Error>(( + let n = n.parse::()?.to_be_bytes(); + Ok::<([u8; 4], ScriptActivityTypeHex, Txid, [u8; 8]), Error>(( height, vin_vout_type, txid, @@ -352,10 +352,10 @@ async fn list_transactions( }) .transpose()? .unwrap_or(( - [u8::MAX, u8::MAX, u8::MAX, u8::MAX], + [0xffu8; 4], ScriptActivityTypeHex::Vout, Txid::from_byte_array([0xffu8; 32]), - usize::MAX, + [0xffu8; 8], )); let res = ctx @@ -458,15 +458,14 @@ async fn list_transaction_unspent( msg: format!("Invalid height: {}", height), })?; let txid = Txid::from_str(txid)?; - let n = n.parse::()?; - Ok::<([u8; 4], Txid, usize), Error>((height, txid, n)) + let decoded_n = hex::decode(n)?; + let n = decoded_n.try_into().map_err(|_| Error::Other { + msg: format!("Invalid txno: {}", n), + })?; + Ok::<([u8; 4], Txid, [u8; 8]), Error>((height, txid, n)) }) .transpose()? - .unwrap_or(( - [0u8, 0u8, 0u8, 0u8], - Txid::from_byte_array([0x00u8; 32]), - usize::default(), - )); + .unwrap_or(([0u8; 4], Txid::from_byte_array([0x00u8; 32]), [0u8; 8])); let res = ctx .services diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index 85731fc68b4..fb67b4b63f8 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -138,7 +138,14 @@ fn get_active_price( .services .oracle_price_active .by_id - .list(Some((token, currency, u32::MAX)), SortOrder::Descending)? + .list( + Some((token.clone(), currency.clone(), [0xffu8; 4])), + SortOrder::Descending, + )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency, + _ => true, + }) .next() .map(|item| { let (_, v) = item?; @@ -264,7 +271,7 @@ async fn list_loan_token( .services .oracle_price_active .by_id - .list(Some((token, currency, u32::MAX)), SortOrder::Descending)? + .list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)? .next() .map(|item| { let (_, v) = item?; @@ -671,7 +678,7 @@ async fn map_liquidation_batches( let id = ( Txid::from_str(vault_id)?, batch.index.to_be_bytes(), - [0xffu8, 0xffu8, 0xffu8, 0xffu8], + [0xffu8; 4], Txid::from_byte_array([0xffu8; 32]), ); let bids = repo @@ -733,7 +740,7 @@ async fn map_token_amounts( .oracle_price_active .by_id .list( - Some((token_info.symbol.clone(), "USD".to_string(), u32::MAX)), + Some((token_info.symbol.clone(), "USD".to_string(), [0xffu8; 4])), SortOrder::Descending, )? .take_while(|item| match item { diff --git a/lib/ain-ocean/src/api/oracle.rs b/lib/ain-ocean/src/api/oracle.rs index f1f42dd06ce..de0bf825dc4 100644 --- a/lib/ain-ocean/src/api/oracle.rs +++ b/lib/ain-ocean/src/api/oracle.rs @@ -103,15 +103,15 @@ async fn get_feed( .list(None, SortOrder::Descending)? .paginate(&query) .flatten() - .filter(|((token, currency, oracle_id, _), _)| { + .filter(|((token, currency, oracle_id, _, _), _)| { key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id) }) - .map(|((token, currency, oracle_id, txid), feed)| { + .map(|((token, currency, oracle_id, height, txid), feed)| { let amount = Decimal::from(feed.amount) / Decimal::from(COIN); OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), - sort: hex::encode(feed.block.height.to_string() + &txid.to_string()), + sort: hex::encode(u32::from_be_bytes(height).to_string() + &txid.to_string()), token, currency, oracle_id, diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 11fc582d301..d320e3b07d4 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -8,11 +8,9 @@ use axum::{ Extension, Router, }; use bitcoin::{hashes::Hash, Txid}; -use indexmap::IndexSet; use rust_decimal::{prelude::ToPrimitive, Decimal}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; -use snafu::OptionExt; use super::{ common::parse_token_currency, @@ -22,9 +20,9 @@ use super::{ AppContext, }; use crate::{ - error::{ApiError, OtherSnafu}, + error::{ApiError, Error}, model::{ - BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceActiveNext, + BlockContext, OracleIntervalSeconds, OraclePriceActive, OraclePriceAggregatedIntervalAggregated, PriceTicker, }, storage::{RepositoryOps, SortOrder}, @@ -121,39 +119,28 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let sorted_ids = ctx + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + + let prices = ctx .services .price_ticker - .by_key + .by_id .list(None, SortOrder::Descending)? - .map(|item| { - let (_, id) = item?; - Ok(id) - }) - .collect::>>()?; - - // use IndexSet to rm dup without changing order - let mut sorted_ids_set = IndexSet::new(); - for id in sorted_ids { - sorted_ids_set.insert(id); - } - - let prices = sorted_ids_set - .into_iter() - .take(query.size) - .map(|id| { - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&id)? - .context(OtherSnafu { - msg: "Missing price ticker index", - })?; - - Ok(PriceTickerResponse::from((id, price_ticker))) + .flat_map(|item| { + let ((_, _, token, currency), v) = item?; + let has_key = set.contains(&(token.clone(), currency.clone())); + if !has_key { + set.insert((token.clone(), currency.clone())); + Ok::, Error>(Some(PriceTickerResponse::from(( + (token, currency), + v, + )))) + } else { + Ok(None) + } }) - .collect::>>()?; + .flatten() + .collect::>(); Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() @@ -171,9 +158,14 @@ async fn get_price( .services .price_ticker .by_id - .get(&(token.clone(), currency.clone()))?; - - let Some(price_ticker) = price_ticker else { + .list( + Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), + SortOrder::Descending, + )? + .next() + .transpose()?; + + let Some((_, price_ticker)) = price_ticker else { return Ok(Response::new(None)); }; @@ -189,9 +181,21 @@ async fn get_feed( Extension(ctx): Extension>, ) -> Result> { let (token, currency) = parse_token_currency(&key)?; + let next = query + .next + .map(|q| { + let median_time = &q[..16]; + let height = &q[16..]; + + let median_time = median_time.parse::()?.to_be_bytes(); + let height = height.parse::()?.to_be_bytes(); + Ok::<([u8; 8], [u8; 4]), Error>((median_time, height)) + }) + .transpose()? + .unwrap_or(([0xffu8; 8], [0xffu8; 4])); let repo = &ctx.services.oracle_price_aggregated; - let id = (token.to_string(), currency.to_string(), u32::MAX); + let id = (token.clone(), currency.clone(), next.0, next.1); let oracle_aggregated = repo .by_id .list(Some(id), SortOrder::Descending)? @@ -203,7 +207,7 @@ async fn get_feed( .map(|item| { let (k, v) = item?; let res = OraclePriceAggregatedResponse { - id: format!("{}-{}-{}", k.0, k.1, k.2), + id: format!("{}-{}-{}", k.0, k.1, i64::from_be_bytes(k.2)), key: format!("{}-{}", k.0, k.1), sort: format!( "{}{}", @@ -233,14 +237,22 @@ async fn get_feed( )) } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OraclePriceActiveNextResponse { + pub amount: String, // convert to logical amount + pub weightage: Decimal, + pub oracles: OraclePriceActiveNextOraclesResponse, +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct OraclePriceActiveResponse { pub id: String, // token-currency-height pub key: String, // token-currency pub sort: String, // height - pub active: Option, - pub next: Option, + pub active: Option, + pub next: Option, pub is_live: bool, pub block: BlockContext, } @@ -251,8 +263,22 @@ impl OraclePriceActiveResponse { id: format!("{}-{}-{}", token, currency, v.block.height), key: format!("{}-{}", token, currency), sort: hex::encode(v.block.height.to_be_bytes()).to_string(), - active: v.active, - next: v.next, + active: v.active.map(|active| OraclePriceActiveNextResponse { + amount: format!("{:.8}", active.amount / Decimal::from(COIN)), + weightage: active.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: active.oracles.active.to_i32().unwrap_or_default(), + total: active.oracles.total, + }, + }), + next: v.next.map(|next| OraclePriceActiveNextResponse { + amount: format!("{:.8}", next.amount / Decimal::from(COIN)), + weightage: next.weightage, + oracles: OraclePriceActiveNextOraclesResponse { + active: next.oracles.active.to_i32().unwrap_or_default(), + total: next.oracles.total, + }, + }), is_live: v.is_live, block: v.block, } @@ -267,7 +293,16 @@ async fn get_feed_active( ) -> Result> { let (token, currency) = parse_token_currency(&key)?; - let id = (token.clone(), currency.clone(), u32::MAX); + let next = query + .next + .map(|q| { + let height = q.parse::()?.to_be_bytes(); + Ok::<[u8; 4], Error>(height) + }) + .transpose()? + .unwrap_or([0xffu8; 4]); + + let id = (token.clone(), currency.clone(), next); let price_active = ctx .services .oracle_price_active @@ -333,12 +368,17 @@ async fn get_feed_with_interval( 86400 => OracleIntervalSeconds::OneDay, _ => return Err(From::from("Invalid oracle interval")), }; - let id = ( - token.clone(), - currency.clone(), - interval_type.clone(), - u32::MAX, - ); + + let next = query + .next + .map(|q| { + let height = q.parse::()?.to_be_bytes(); + Ok::<[u8; 4], Error>(height) + }) + .transpose()? + .unwrap_or([0xffu8; 4]); + + let id = (token.clone(), currency.clone(), interval_type.clone(), next); let items = ctx .services @@ -358,9 +398,10 @@ async fn get_feed_with_interval( let mut prices = Vec::new(); for (id, item) in items { let start = item.block.median_time - (item.block.median_time % interval); + let height = u32::from_be_bytes(id.3); let price = OraclePriceAggregatedIntervalResponse { - id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, id.3), + id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height), key: format!("{}-{}-{:?}", id.0, id.1, id.2), sort: format!( "{}{}", @@ -412,11 +453,16 @@ async fn list_price_oracles( ) -> Result> { let (token, currency) = parse_token_currency(&key)?; - let id = ( - token.clone(), - currency.clone(), - Txid::from_byte_array([0xffu8; 32]), - ); + let next = query + .next + .map(|q| { + let oracle_id = Txid::from_str(&q)?; + Ok::(oracle_id) + }) + .transpose()? + .unwrap_or(Txid::from_byte_array([0xffu8; 32])); + + let id = (token.clone(), currency.clone(), next); let token_currencies = ctx .services .oracle_token_currency @@ -441,6 +487,7 @@ async fn list_price_oracles( token.clone(), currency.clone(), oracle_id, + [0xffu8; 4], Txid::from_byte_array([0xffu8; 32]), )), SortOrder::Descending, @@ -464,11 +511,12 @@ async fn list_price_oracles( let token = id.0; let currency = id.1; let oracle_id = id.2; - let txid = id.3; + let height = u32::from_be_bytes(id.3); + let txid = id.4; OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), - sort: hex::encode(f.block.height.to_string() + &txid.to_string()), + sort: hex::encode(height.to_string() + &txid.to_string()), token: token.clone(), currency: currency.clone(), oracle_id, diff --git a/lib/ain-ocean/src/api/response.rs b/lib/ain-ocean/src/api/response.rs index ead721a2781..a8726210108 100644 --- a/lib/ain-ocean/src/api/response.rs +++ b/lib/ain-ocean/src/api/response.rs @@ -12,6 +12,20 @@ impl Response { } } +#[derive(Debug, Serialize)] +pub struct ApiRpcResponse { + result: T, + // TODO: map error and id from rpc + // error: T, + // id: T, +} + +impl ApiRpcResponse { + pub fn new(result: T) -> Self { + Self { result } + } +} + /// ApiPagedResponse indicates that this response of data array slice is part of a sorted list of items. /// Items are part of a larger sorted list and the slice indicates a window within the large sorted list. /// Each ApiPagedResponse holds the data array and the "token" for next part of the slice. diff --git a/lib/ain-ocean/src/api/rpc.rs b/lib/ain-ocean/src/api/rpc.rs index 809f0853ec1..0bebb6836a7 100644 --- a/lib/ain-ocean/src/api/rpc.rs +++ b/lib/ain-ocean/src/api/rpc.rs @@ -5,7 +5,7 @@ use axum::{routing::post, Extension, Json, Router}; use defichain_rpc::RpcApi; use serde::{Deserialize, Serialize}; -use super::{response::Response, AppContext}; +use super::{response::ApiRpcResponse, AppContext}; use crate::{ error::{ApiError, Error}, Result, @@ -54,12 +54,12 @@ fn method_whitelist(method: &str) -> Result<()> { async fn rpc( Extension(ctx): Extension>, Json(body): Json, -) -> Result> { +) -> Result> { method_whitelist(&body.method)?; let res: serde_json::Value = ctx.client.call(&body.method, &body.params).await?; - Ok(Response::new(res)) + Ok(ApiRpcResponse::new(res)) } pub fn router(ctx: Arc) -> Router { diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index c66fdf9e284..cdc60a00b8e 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -1,5 +1,10 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, +}; +use ain_dftx::{Currency, Token}; use cached::proc_macro::cached; use defichain_rpc::{ defichain_rpc_json::token::TokenPagination, json::account::AccountAmount, AccountRPC, Client, @@ -22,7 +27,7 @@ use crate::{ stats::get_block_reward_distribution, AppContext, }, - error::{DecimalConversionSnafu, OtherSnafu}, + error::{DecimalConversionSnafu, Error, OtherSnafu}, model::MasternodeStatsData, storage::{RepositoryOps, SortOrder}, Result, @@ -105,7 +110,10 @@ pub async fn get_count(ctx: &Arc) -> Result { .price_ticker .by_id .list(None, SortOrder::Descending)? - .collect::>(); + .filter_map(|item| { + item.ok().map(|((_, _, token, currency), _)| (token, currency)) + }) + .collect::>(); Ok(Count { blocks: 0, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 3ae78b1e210..d05d2ecd9a1 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{loans::SetLoanToken, Currency, Token}; use log::trace; @@ -6,7 +6,8 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ - indexer::{Context, Index, IndexBlockEnd, Result}, + error::Error, + indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, storage::{RepositoryOps, SortOrder}, @@ -25,23 +26,13 @@ impl Index for SetLoanToken { let ticker_id = ( self.currency_pair.token.clone(), self.currency_pair.currency.clone(), - context.block.height, + context.block.height.to_be_bytes(), ); services.oracle_price_active.by_id.delete(&ticker_id)?; Ok(()) } } -impl IndexBlockEnd for SetLoanToken { - fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { - index_active_price(services, block) - } - - fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { - invalidate_active_price(services, block) - } -} - fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool { if (aggregate.block.time - block.time).abs() >= 3600 { return false; @@ -96,15 +87,21 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); - - for (ticker_id, _) in price_tickers { - perform_active_price_tick(services, ticker_id, block)?; + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + for (token, currency) in pairs { + perform_active_price_tick(services, (token, currency), block)?; } } Ok(()) @@ -146,18 +143,31 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - _ => 120, }; if block.height % block_interval == 0 { - let price_tickers = services + let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flatten() - .collect::>(); - - for ((token, currency), _) in price_tickers.into_iter().rev() { - services - .oracle_price_active - .by_id - .delete(&(token, currency, block.height))?; + .flat_map(|item| { + let ((_, _, token, currency), _) = item?; + set.insert((token, currency)); + Ok::, Error>(set.clone()) + }) + .next() + .unwrap_or(set); + + // convert to vector to reverse the hashset is required + let mut vec = Vec::new(); + for pair in pairs { + vec.insert(0, pair); + } + + for (token, currency) in vec { + services.oracle_price_active.by_id.delete(&( + token, + currency, + block.height.to_be_bytes(), + ))?; } } @@ -169,7 +179,12 @@ pub fn perform_active_price_tick( ticker_id: (Token, Currency), block: &BlockContext, ) -> Result<()> { - let id = (ticker_id.0, ticker_id.1, u32::MAX); + let id = ( + ticker_id.0.clone(), + ticker_id.1.clone(), + [0xffu8; 8], + [0xffu8; 4], + ); let prev = services .oracle_price_aggregated @@ -182,6 +197,7 @@ pub fn perform_active_price_tick( return Ok(()); }; + let id = (ticker_id.0, ticker_id.1, [0xffu8; 4]); let repo = &services.oracle_price_active; let prev = repo .by_id @@ -197,7 +213,8 @@ pub fn perform_active_price_tick( let active_price = map_active_price(block, aggregated_price, prev_price); - repo.by_id.put(&(id.0, id.1, block.height), &active_price)?; + repo.by_id + .put(&(id.0, id.1, block.height.to_be_bytes()), &active_price)?; Ok(()) } diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index eea99999f9b..a572323dcf1 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -17,8 +17,10 @@ use std::{ use ain_dftx::{deserialize, is_skipped_tx, DfTx, Stack}; use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout}; use helper::check_if_evm_tx; +use loan_token::{index_active_price, invalidate_active_price}; use log::trace; pub use poolswap::PoolSwapAggregatedInterval; +use poolswap::{index_pool_swap_aggregated, invalidate_pool_swap_aggregated}; use crate::{ error::{Error, IndexAction}, @@ -41,18 +43,6 @@ pub trait Index { fn invalidate(&self, services: &Arc, ctx: &Context) -> Result<()>; } -pub trait IndexBlockStart: Index { - fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; - - fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; -} - -pub trait IndexBlockEnd: Index { - fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; - - fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; -} - #[derive(Debug)] pub struct Context { block: BlockContext, @@ -138,7 +128,7 @@ fn index_script_activity_vin( block.height.to_be_bytes(), ScriptActivityTypeHex::Vin, vin.txid, - vin.vout, + vin.vout.to_be_bytes(), ); services.script_activity.by_id.put(&id, &script_activity)?; @@ -170,7 +160,11 @@ fn index_script_unspent_vin( vin: &VinStandard, ctx: &Context, ) -> Result<()> { - let key = (ctx.block.height.to_be_bytes(), vin.txid, vin.vout); + let key = ( + ctx.block.height.to_be_bytes(), + vin.txid, + vin.vout.to_be_bytes(), + ); let id = services.script_unspent.by_key.get(&key)?; if let Some(id) = id { services.script_unspent.by_id.delete(&id)?; @@ -212,7 +206,7 @@ fn index_script_activity_vout(services: &Arc, vout: &Vout, ctx: &Conte block.height.to_be_bytes(), ScriptActivityTypeHex::Vout, tx.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_activity.by_id.put(&id, &script_activity)?; Ok(()) @@ -265,8 +259,13 @@ fn index_script_unspent_vout(services: &Arc, vout: &Vout, ctx: &Contex }, }; - let id = (hid, block.height.to_be_bytes(), tx.txid, vout.n); - let key = (block.height.to_be_bytes(), tx.txid, vout.n); + let id = ( + hid, + block.height.to_be_bytes(), + tx.txid, + vout.n.to_be_bytes(), + ); + let key = (block.height.to_be_bytes(), tx.txid, vout.n.to_be_bytes()); services.script_unspent.by_key.put(&key, &id)?; services.script_unspent.by_id.put(&id, &script_unspent)?; Ok(()) @@ -327,7 +326,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> let repo = &services.script_aggregation; let latest = repo .by_id - .list(Some((aggregation.hid, u32::MAX)), SortOrder::Descending)? + .list(Some((aggregation.hid, [0xffu8; 4])), SortOrder::Descending)? .take(1) .take_while(|item| match item { Ok(((hid, _), _)) => &aggregation.hid == hid, @@ -351,8 +350,10 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> aggregation.statistic.tx_in_count + aggregation.statistic.tx_out_count; aggregation.amount.unspent = aggregation.amount.tx_in - aggregation.amount.tx_out; - repo.by_id - .put(&(aggregation.hid, ctx.block.height), &aggregation)?; + repo.by_id.put( + &(aggregation.hid, ctx.block.height.to_be_bytes()), + &aggregation, + )?; record.insert(aggregation.hid, aggregation); } @@ -414,7 +415,7 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction services .script_aggregation .by_id - .delete(&(hid, block.height))? + .delete(&(hid, block.height.to_be_bytes()))? } Ok(()) @@ -468,12 +469,12 @@ fn invalidate_script_unspent_vin( hid, transaction.block.height.to_be_bytes(), transaction.txid, - vout.n, + vout.n.to_be_bytes(), ); let key = ( transaction.block.height.to_be_bytes(), transaction.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_unspent.by_key.put(&key, &id)?; @@ -493,7 +494,7 @@ fn invalidate_script_activity_vin( height.to_be_bytes(), ScriptActivityTypeHex::Vin, vin.txid, - vin.vout, + vin.vout.to_be_bytes(), ); services.script_activity.by_id.delete(&id)?; @@ -506,8 +507,17 @@ fn invalidate_script_unspent_vout( vout: &Vout, ) -> Result<()> { let hid = as_sha256(&vout.script_pub_key.hex); - let id = (hid, ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n); - let key = (ctx.block.height.to_be_bytes(), ctx.tx.txid, vout.n); + let id = ( + hid, + ctx.block.height.to_be_bytes(), + ctx.tx.txid, + vout.n.to_be_bytes(), + ); + let key = ( + ctx.block.height.to_be_bytes(), + ctx.tx.txid, + vout.n.to_be_bytes(), + ); services.script_unspent.by_id.delete(&id)?; services.script_unspent.by_key.delete(&key)?; @@ -524,7 +534,7 @@ fn invalidate_script_activity_vout( ctx.block.height.to_be_bytes(), ScriptActivityTypeHex::Vout, ctx.tx.txid, - vout.n, + vout.n.to_be_bytes(), ); services.script_activity.by_id.delete(&id)?; Ok(()) @@ -538,6 +548,22 @@ pub fn get_block_height(services: &Arc) -> Result { .map_or(0, |block| block.height)) } +pub fn index_block_start(services: &Arc, block: &BlockContext) -> Result<()> { + index_pool_swap_aggregated(services, block) +} + +pub fn invalidate_block_start(services: &Arc, block: &BlockContext) -> Result<()> { + invalidate_pool_swap_aggregated(services, block) +} + +pub fn index_block_end(services: &Arc, block: &BlockContext) -> Result<()> { + index_active_price(services, block) +} + +pub fn invalidate_block_end(services: &Arc, block: &BlockContext) -> Result<()> { + invalidate_active_price(services, block) +} + pub fn index_block(services: &Arc, block: Block) -> Result<()> { trace!("[index_block] Indexing block..."); let start = Instant::now(); @@ -586,12 +612,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul } } - // index_block_start - for (dftx, _) in &dftxs { - if let DfTx::PoolSwap(data) = dftx.clone() { - data.index_block_start(services, &block_ctx)? - } - } + index_block_start(services, &block_ctx)?; // index_dftx for (dftx, ctx) in &dftxs { @@ -615,12 +636,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul log_elapsed(start, "Indexed dftx"); } - // index_block_end - for (dftx, _) in dftxs { - if let DfTx::SetLoanToken(data) = dftx { - data.index_block_end(services, &block_ctx)? - } - } + index_block_end(services, &block_ctx)?; let block_mapper = BlockMapper { hash: block_hash, @@ -699,12 +715,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> } } - // invalidate_block_end - for (dftx, _) in &dftxs { - if let DfTx::SetLoanToken(data) = dftx.clone() { - data.invalidate_block_end(services, &block_ctx)? - } - } + invalidate_block_end(services, &block_ctx)?; // invalidate_dftx for (dftx, ctx) in &dftxs { @@ -727,12 +738,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> log_elapsed(start, "Invalidate dftx"); } - // invalidate_block_start - for (dftx, _) in &dftxs { - if let DfTx::PoolSwap(data) = dftx.clone() { - data.invalidate_block_start(services, &block_ctx)? - } - } + invalidate_block_start(services, &block_ctx)?; // invalidate_block services.block.by_height.delete(&block.height)?; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 461fc8c97ec..c9fb315fd3e 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -301,7 +301,10 @@ fn map_price_aggregated( let feed = services .oracle_price_feed .by_id - .list(Some((id.0, id.1, id.2, base_id)), SortOrder::Descending)? + .list( + Some((id.0, id.1, id.2, [0xffu8; 4], base_id)), + SortOrder::Descending, + )? .next() .transpose()?; @@ -357,7 +360,6 @@ fn index_set_oracle_data( pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { let oracle_repo = &services.oracle_price_aggregated; - let ticker_repo = &services.price_ticker; for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; @@ -372,19 +374,19 @@ fn index_set_oracle_data( let id = ( token.clone(), currency.clone(), - price_aggregated.block.height, + price_aggregated.block.median_time.to_be_bytes(), + price_aggregated.block.height.to_be_bytes(), ); oracle_repo.by_id.put(&id, &price_aggregated)?; - let key = ( - price_aggregated.aggregated.oracles.total, - price_aggregated.block.height, + let id = ( + price_aggregated.aggregated.oracles.total.to_be_bytes(), + price_aggregated.block.height.to_be_bytes(), token, currency, ); - ticker_repo.by_key.put(&key, pair)?; - ticker_repo.by_id.put( - &pair.clone(), + services.price_ticker.by_id.put( + &id, &PriceTicker { price: price_aggregated, }, @@ -402,7 +404,8 @@ fn index_set_oracle_data_interval( let aggregated = services.oracle_price_aggregated.by_id.get(&( token.clone(), currency.clone(), - context.block.height, + context.block.median_time.to_be_bytes(), + context.block.height.to_be_bytes(), ))?; let Some(aggregated) = aggregated else { @@ -447,8 +450,13 @@ impl Index for SetOracleData { let feeds = map_price_feeds(self, context); - for ((token, currency, _, _), _) in feeds.iter().rev() { - let id = (token.clone(), currency.clone(), context.block.height); + for ((token, currency, _, _, _), _) in feeds.iter().rev() { + let id = ( + token.clone(), + currency.clone(), + context.block.median_time.to_be_bytes(), + context.block.height.to_be_bytes(), + ); let aggregated = oracle_repo.by_id.get(&id)?; @@ -485,6 +493,7 @@ fn map_price_feeds( token_price.token.clone(), token_amount.currency.clone(), data.oracle_id, + ctx.block.height.to_be_bytes(), ctx.tx.txid, ); @@ -507,7 +516,7 @@ fn start_new_bucket( aggregated: &OraclePriceAggregated, interval: OracleIntervalSeconds, ) -> Result<()> { - let id = (token, currency, interval, block.height); + let id = (token, currency, interval, block.height.to_be_bytes()); services.oracle_price_aggregated_interval.by_id.put( &id, &OraclePriceAggregatedInterval { @@ -539,7 +548,12 @@ pub fn index_interval_mapper( let previous = repo .by_id .list( - Some((token.clone(), currency.clone(), interval.clone(), u32::MAX)), + Some(( + token.clone(), + currency.clone(), + interval.clone(), + [0xffu8; 4], + )), SortOrder::Descending, )? .take_while(|item| match item { @@ -580,7 +594,7 @@ pub fn invalidate_oracle_interval( token.to_string(), currency.to_string(), interval.clone(), - u32::MAX, + [0xffu8; 4], )), SortOrder::Descending, )? diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index bc6bc8ad058..e9ed5595284 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -9,7 +9,7 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use snafu::OptionExt; -use super::{Context, IndexBlockStart}; +use super::Context; use crate::{ error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind}, indexer::{tx_result, Index, Result}, @@ -182,67 +182,68 @@ fn create_new_bucket( Ok(()) } -impl IndexBlockStart for PoolSwap { - fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS { - for pool_pair in &pool_pairs { - let repo = &services.pool_swap_aggregated; - - let prev = repo - .by_key - .list( - Some((pool_pair.id, interval, i64::MAX)), - SortOrder::Descending, - )? - .take_while(|item| match item { - Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, - _ => true, - }) - .next() - .transpose()?; - - let bucket = block.median_time - (block.median_time % interval as i64); - - let Some((_, prev_id)) = prev else { - create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; - continue; - }; - - let Some(prev) = repo.by_id.get(&prev_id)? else { - create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; - continue; - }; +pub fn index_pool_swap_aggregated(services: &Arc, block: &BlockContext) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - if prev.bucket >= bucket { - break; - } + for interval in AGGREGATED_INTERVALS { + for pool_pair in &pool_pairs { + let repo = &services.pool_swap_aggregated; + + let prev = repo + .by_key + .list( + Some((pool_pair.id, interval, i64::MAX)), + SortOrder::Descending, + )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, + _ => true, + }) + .next() + .transpose()?; + + let bucket = block.median_time - (block.median_time % interval as i64); + + let Some((_, prev_id)) = prev else { + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; + }; + let Some(prev) = repo.by_id.get(&prev_id)? else { create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; + }; + + if prev.bucket >= bucket { + break; } - } - Ok(()) + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + } } - fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS.into_iter().rev() { - for pool_pair in pool_pairs.iter().rev() { - let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); - services - .pool_swap_aggregated - .by_id - .delete(&pool_swap_aggregated_id)?; - } - } + Ok(()) +} - Ok(()) +pub fn invalidate_pool_swap_aggregated( + services: &Arc, + block: &BlockContext, +) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); + + for interval in AGGREGATED_INTERVALS.into_iter().rev() { + for pool_pair in pool_pairs.iter().rev() { + let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); + services + .pool_swap_aggregated + .by_id + .delete(&pool_swap_aggregated_id)?; + } } + + Ok(()) } impl Index for PoolSwap { diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index c72b6a9f67b..bde84b93123 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -93,7 +93,6 @@ pub struct OracleHistoryService { pub struct PriceTickerService { by_id: PriceTicker, - by_key: PriceTickerKey, } pub struct ScriptActivityService { @@ -196,7 +195,6 @@ impl Services { }, price_ticker: PriceTickerService { by_id: PriceTicker::new(Arc::clone(&store)), - by_key: PriceTickerKey::new(Arc::clone(&store)), }, script_activity: ScriptActivityService { by_id: ScriptActivity::new(Arc::clone(&store)), diff --git a/lib/ain-ocean/src/model/oracle_price_active.rs b/lib/ain-ocean/src/model/oracle_price_active.rs index 3b9740f08e9..4f53828420e 100644 --- a/lib/ain-ocean/src/model/oracle_price_active.rs +++ b/lib/ain-ocean/src/model/oracle_price_active.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceActiveId = (String, String, u32); //token-currency-height +pub type OraclePriceActiveId = (String, String, [u8; 4]); //token-currency-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated.rs b/lib/ain-ocean/src/model/oracle_price_aggregated.rs index 0ee554ed8f8..1fdcf30128a 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::{BlockContext, OraclePriceActiveNext}; -pub type OraclePriceAggregatedId = (String, String, u32); //token-currency-height +pub type OraclePriceAggregatedId = (String, String, [u8; 8], [u8; 4]); //token-currency-mediantime-height #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs index 6b5bb65d5be..443e4e7e5aa 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs @@ -3,7 +3,7 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, u32); //token-currency-interval-height +pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, [u8; 4]); //token-currency-interval-height pub const FIFTEEN_MINUTES: isize = 15 * 60; pub const ONE_HOUR: isize = 60 * 60; diff --git a/lib/ain-ocean/src/model/oracle_price_feed.rs b/lib/ain-ocean/src/model/oracle_price_feed.rs index 0afcc656147..40cb98b5362 100644 --- a/lib/ain-ocean/src/model/oracle_price_feed.rs +++ b/lib/ain-ocean/src/model/oracle_price_feed.rs @@ -2,7 +2,7 @@ use bitcoin::Txid; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceFeedId = (String, String, Txid, Txid); // token-currency-oracle_id-txid +pub type OraclePriceFeedId = (String, String, Txid, [u8; 4], Txid); // token-currency-oracle_id-height-txid #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 54457622fdf..158fb961575 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -2,8 +2,7 @@ use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; -pub type PriceTickerId = (String, String); //token-currency -pub type PriceTickerKey = (i32, u32, String, String); // total-height-token-currency +pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/model/script_activity.rs b/lib/ain-ocean/src/model/script_activity.rs index daa40cf8f20..5e1fab5649b 100644 --- a/lib/ain-ocean/src/model/script_activity.rs +++ b/lib/ain-ocean/src/model/script_activity.rs @@ -34,7 +34,7 @@ impl fmt::Display for ScriptActivityTypeHex { } } -pub type ScriptActivityId = ([u8; 32], [u8; 4], ScriptActivityTypeHex, Txid, usize); // (hid, block.height, type_hex, txid, index) +pub type ScriptActivityId = ([u8; 32], [u8; 4], ScriptActivityTypeHex, Txid, [u8; 8]); // (hid, block.height, type_hex, txid, index) #[derive(Debug, Serialize, Deserialize)] pub struct ScriptActivity { diff --git a/lib/ain-ocean/src/model/script_aggregation.rs b/lib/ain-ocean/src/model/script_aggregation.rs index 7623c1b47b8..8e1f882c7bb 100644 --- a/lib/ain-ocean/src/model/script_aggregation.rs +++ b/lib/ain-ocean/src/model/script_aggregation.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type ScriptAggregationId = ([u8; 32], u32); // (hid, block.height) +pub type ScriptAggregationId = ([u8; 32], [u8; 4]); // (hid, block.height) #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ScriptAggregation { diff --git a/lib/ain-ocean/src/model/script_unspent.rs b/lib/ain-ocean/src/model/script_unspent.rs index c0887519768..0bf270cd8b3 100644 --- a/lib/ain-ocean/src/model/script_unspent.rs +++ b/lib/ain-ocean/src/model/script_unspent.rs @@ -3,8 +3,8 @@ use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type ScriptUnspentId = ([u8; 32], [u8; 4], Txid, usize); // hid + block.height + txid + vout_index -pub type ScriptUnspentKey = ([u8; 4], Txid, usize); // block.height + txid + vout_index, ps: key is required in index_script_unspent_vin +pub type ScriptUnspentId = ([u8; 32], [u8; 4], Txid, [u8; 8]); // hid + block.height + txid + vout_index +pub type ScriptUnspentKey = ([u8; 4], Txid, [u8; 8]); // block.height + txid + vout_index, ps: key is required in index_script_unspent_vin #[derive(Debug, Serialize, Deserialize)] pub struct ScriptUnspent { diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 054f25d5996..1f373022c47 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -94,6 +94,28 @@ define_table! { pub struct MasternodeByHeight { key_type = (u32, Txid), value_type = u8, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (height, txid) = index; + let mut vec = height.to_be_bytes().to_vec(); + vec.extend_from_slice(txid.as_byte_array().to_vec().as_ref()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 36 { + return Err(DBError::WrongKeyLength); + } + let mut height_array = [0u8; 4]; + height_array.copy_from_slice(&raw_key[..4]); + let mut txid_array = [0u8; 32]; + txid_array.copy_from_slice(&raw_key[4..]); + + let height = u32::from_be_bytes(height_array); + let txid = Txid::from_byte_array(txid_array); + Ok((height, txid)) + } + }, } } @@ -153,6 +175,28 @@ define_table! { pub struct OracleHistory { key_type = model::OracleHistoryId, value_type = model::Oracle, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (txid, height) = index; // txid, u32 + let mut vec = txid.as_byte_array().to_vec(); + vec.extend_from_slice(&height.to_be_bytes()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 36 { + return Err(DBError::WrongKeyLength); + } + let mut txid_array = [0u8; 32]; + txid_array.copy_from_slice(&raw_key[..32]); + let mut height_array = [0u8; 4]; + height_array.copy_from_slice(&raw_key[32..]); + + let txid = Txid::from_byte_array(txid_array); + let height = u32::from_be_bytes(height_array); + Ok((txid, height)) + } + }, } } @@ -201,6 +245,40 @@ define_table! { pub struct PoolSwap { key_type = model::PoolSwapKey, value_type = model::PoolSwap, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (pool_id, height, txno) = index; // u32, u32, usize + let mut vec = Vec::with_capacity(16); + vec.extend_from_slice(&pool_id.to_be_bytes()); + vec.extend_from_slice(&height.to_be_bytes()); + vec.extend_from_slice(&txno.to_be_bytes()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 16 { + return Err(DBError::WrongKeyLength); + } + let pool_id = u32::from_be_bytes( + raw_key[0..4] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let height = u32::from_be_bytes( + raw_key[4..8] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let txno = usize::from_be_bytes( + raw_key[8..] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + + Ok((pool_id, height, txno)) + } + + }, }, InitialKeyProvider = |pk: u32| (pk, u32::MAX, usize::MAX) } @@ -210,6 +288,37 @@ define_table! { pub struct PoolSwapAggregated { key_type = model::PoolSwapAggregatedId, value_type = model::PoolSwapAggregated, + custom_key = { + fn key(index: &Self::Index) -> DBResult> { + let (pool_id, interval, hash) = index; // u32, u32, hash + let mut vec = Vec::with_capacity(40); + vec.extend_from_slice(&pool_id.to_be_bytes()); + vec.extend_from_slice(&interval.to_be_bytes()); + vec.extend_from_slice(hash.as_byte_array().to_vec().as_ref()); + Ok(vec) + } + + fn get_key(raw_key: Box<[u8]>) -> DBResult { + if raw_key.len() != 40 { + return Err(DBError::WrongKeyLength); + } + let pool_id = u32::from_be_bytes( + raw_key[0..4] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let interval = u32::from_be_bytes( + raw_key[4..8] + .try_into() + .map_err(|_| DBError::WrongKeyLength)?, + ); + let mut hash_array = [0u8; 32]; + hash_array.copy_from_slice(&raw_key[..32]); + let hash = BlockHash::from_byte_array(hash_array); + + Ok((pool_id, interval, hash)) + } + }, } } @@ -263,15 +372,6 @@ define_table! { } } -define_table! { - #[derive(Debug)] - pub struct PriceTickerKey { - key_type = model::PriceTickerKey, - value_type = model::PriceTickerId, - }, - SecondaryIndex = PriceTicker -} - define_table! { #[derive(Debug)] pub struct RawBlock { @@ -408,7 +508,7 @@ define_table! { } } -pub const COLUMN_NAMES: [&str; 28] = [ +pub const COLUMN_NAMES: [&str; 27] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -425,7 +525,6 @@ pub const COLUMN_NAMES: [&str; 28] = [ PoolSwapAggregatedKey::NAME, PoolSwap::NAME, PriceTicker::NAME, - PriceTickerKey::NAME, RawBlock::NAME, ScriptActivity::NAME, ScriptAggregation::NAME,