From 89f14158b66ed6c3b2e5a9a03c31b4aaec3a3e7c Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:55:58 +0000 Subject: [PATCH 01/12] Ocean tries to catchup on startup if below tip height --- lib/ain-ocean/src/api/pool_pair/mod.rs | 4 +- lib/ain-ocean/src/api/pool_pair/service.rs | 16 ++- lib/ain-ocean/src/indexer/mod.rs | 14 ++- lib/ain-ocean/src/indexer/oracle.rs | 30 ++--- lib/ain-ocean/src/indexer/poolswap.rs | 102 ++++++++++++--- lib/ain-ocean/src/lib.rs | 5 +- lib/ain-ocean/src/model/poolswap.rs | 2 +- lib/ain-rs-exports/src/lib.rs | 1 + lib/ain-rs-exports/src/ocean.rs | 5 + src/Makefile.am | 2 + src/init.cpp | 27 ++-- src/ocean.cpp | 139 +++++++++++++++++++++ src/ocean.h | 9 ++ 13 files changed, 293 insertions(+), 63 deletions(-) create mode 100644 src/ocean.cpp create mode 100644 src/ocean.h diff --git a/lib/ain-ocean/src/api/pool_pair/mod.rs b/lib/ain-ocean/src/api/pool_pair/mod.rs index a7795f46961..045e035cf19 100644 --- a/lib/ain-ocean/src/api/pool_pair/mod.rs +++ b/lib/ain-ocean/src/api/pool_pair/mod.rs @@ -434,9 +434,9 @@ async fn list_pool_swaps_verbose( _ => true, }) .map(|item| async { - let (_, swap) = item?; + let (key, swap) = item?; let from = find_swap_from(&ctx, &swap).await?; - let to = find_swap_to(&ctx, &swap).await?; + let to = find_swap_to(&ctx, &key, &swap).await?; let swap_type = check_swap_type(&ctx, &swap).await?; diff --git a/lib/ain-ocean/src/api/pool_pair/service.rs b/lib/ain-ocean/src/api/pool_pair/service.rs index 9042eaaa701..c12d3780a45 100644 --- a/lib/ain-ocean/src/api/pool_pair/service.rs +++ b/lib/ain-ocean/src/api/pool_pair/service.rs @@ -21,7 +21,7 @@ use crate::{ NotFoundKind, OtherSnafu, }, indexer::PoolSwapAggregatedInterval, - model::{PoolSwap, PoolSwapAggregatedAggregated}, + model::{PoolSwap, PoolSwapAggregatedAggregated, PoolSwapKey}, storage::{RepositoryOps, SecondaryIndex, SortOrder}, Result, }; @@ -673,6 +673,7 @@ pub async fn find_swap_from( pub async fn find_swap_to( ctx: &Arc, + swap_key: &PoolSwapKey, swap: &PoolSwap, ) -> Result> { let PoolSwap { @@ -689,9 +690,20 @@ pub async fn find_swap_to( let display_symbol = parse_display_symbol(&to_token); + // TODO Index to_amount if missing + if to_amount.is_none() { + let amount = 0; + let swap = PoolSwap { + to_amount: Some(amount), + ..swap.clone() + }; + ctx.services.pool.by_id.put(swap_key, &swap)?; + } + Ok(Some(PoolSwapFromToData { address: to_address, - amount: Decimal::new(to_amount.to_owned(), 8).to_string(), + // amount: Decimal::new(to_amount.to_owned(), 8).to_string(), // Need fallback + amount: Decimal::new(to_amount.to_owned().unwrap_or_default(), 8).to_string(), symbol: to_token.symbol, display_symbol, })) diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index b991f22c5e7..ab05e0b2aea 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -60,7 +60,7 @@ fn get_bucket(block: &Block, interval: i64) -> i64 { } fn index_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + let mut pool_pairs = services.pool_pair_cache.get(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { @@ -116,7 +116,7 @@ fn index_block_start(services: &Arc, block: &Block) -> Re } fn invalidate_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + let mut pool_pairs = services.pool_pair_cache.get(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { @@ -601,6 +601,14 @@ fn invalidate_block_end(services: &Arc, block: &BlockContext) -> Resul Ok(()) } +pub fn get_block_height(services: &Arc) -> Result { + Ok(services + .block + .by_height + .get_highest()? + .map_or(0, |block| block.height)) +} + pub fn index_block(services: &Arc, block: Block) -> Result<()> { trace!("[index_block] Indexing block..."); let start = Instant::now(); @@ -658,6 +666,7 @@ pub fn index_block(services: &Arc, block: Block) -> Resul DfTx::SetLoanToken(data) => data.index(services, &ctx)?, DfTx::CompositeSwap(data) => data.index(services, &ctx)?, DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?, + DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(), _ => (), } log_elapsed(start, "Indexed dftx"); @@ -755,6 +764,7 @@ pub fn invalidate_block(services: &Arc, block: Block) -> DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?, DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?, DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?, + DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(), _ => (), } log_elapsed(start, "Invalidate dftx"); diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 931bf38a289..c06aeeb0906 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -264,10 +264,7 @@ fn map_price_aggregated( )), SortOrder::Descending, )? - .take_while(|item| match item { - Ok((k, _)) => k.0 == token.clone() && k.1 == currency.clone(), - _ => true, - }) + .take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency)) .flatten() .collect::>(); @@ -361,8 +358,8 @@ fn index_set_oracle_data( let key = ( price_aggregated.aggregated.oracles.total, price_aggregated.block.height, - token.clone(), - currency.clone(), + token, + currency, ); ticker_repo.by_key.put(&key, pair)?; ticker_repo.by_id.put( @@ -534,22 +531,19 @@ pub fn index_interval_mapper( SortOrder::Descending, )? .take(1) - .flatten() - .collect::>(); + .next() + .transpose()?; - if previous.is_empty() { + let Some((_, id)) = previous else { return start_new_bucket(services, block, token, currency, aggregated, interval); - } - - for (_, id) in previous { - let aggregated_interval = repo.by_id.get(&id)?; - if let Some(aggregated_interval) = aggregated_interval { - if block.median_time - aggregated.block.median_time > interval.clone() as i64 { - return start_new_bucket(services, block, token, currency, aggregated, interval); - } + }; - forward_aggregate(services, (id, &aggregated_interval), aggregated)?; + if let Some(aggregated_interval) = repo.by_id.get(&id)? { + if block.median_time - aggregated.block.median_time > interval.clone() as i64 { + return start_new_bucket(services, block, token, currency, aggregated, interval); } + + forward_aggregate(services, (id, &aggregated_interval), aggregated)?; } Ok(()) diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index 844e28b3581..19e29af568f 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -1,15 +1,17 @@ use std::{str::FromStr, sync::Arc}; +use ain_cpp_imports::PoolPairCreationHeight; use ain_dftx::{pool::*, COIN}; use bitcoin::Txid; use log::trace; +use parking_lot::RwLock; use rust_decimal::Decimal; use rust_decimal_macros::dec; use snafu::OptionExt; use super::Context; use crate::{ - error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, + error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind}, indexer::{tx_result, Index, Result}, model::{self, PoolSwapResult, TxResult}, storage::{RepositoryOps, SortOrder}, @@ -149,7 +151,7 @@ fn invalidate_swap_aggregated( impl Index for PoolSwap { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - trace!("[Poolswap] Indexing..."); + trace!("[Poolswap] Indexing {self:?}..."); let txid = ctx.tx.txid; let idx = ctx.tx_idx; let from = self.from_script; @@ -158,17 +160,28 @@ impl Index for PoolSwap { let from_amount = self.from_amount; let to_token_id = self.to_token_id.0; - let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) = - services.result.get(&txid)? - else { - // TODO: Commenting out for now, fallback should only be introduced for supporting back CLI indexing - return Err("Missing swap result".into()); - // let pair = find_pair(from_token_id, to_token_id); - // if pair.is_none() { - // return Err(format_err!("Pool not found by {from_token_id}-{to_token_id} or {to_token_id}-{from_token_id}").into()); - // } - // let pair = pair.unwrap(); - // (None, pair.id) + let (to_amount, pool_id) = match services.result.get(&txid)? { + Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => { + (Some(to_amount), pool_id) + } + _ => { + let poolpairs = services.pool_pair_cache.get(); + + let pool_id = poolpairs + .into_iter() + .find(|pp| { + (pp.id_token_a == self.from_token_id.0 as u32 + && pp.id_token_b == self.to_token_id.0 as u32) + || (pp.id_token_a == self.to_token_id.0 as u32 + && pp.id_token_b == self.from_token_id.0 as u32) + }) + .map(|pp| pp.id) + .ok_or(Error::NotFound { + kind: NotFoundKind::PoolPair, + })?; + + (None, pool_id) + } }; let swap: model::PoolSwap = model::PoolSwap { @@ -221,17 +234,31 @@ impl Index for PoolSwap { impl Index for CompositeSwap { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - trace!("[CompositeSwap] Indexing..."); + trace!("[CompositeSwap] Indexing {self:?}..."); let txid = ctx.tx.txid; let from_token_id = self.pool_swap.from_token_id.0; let from_amount = self.pool_swap.from_amount; let to_token_id = self.pool_swap.to_token_id.0; - let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) = - services.result.get(&txid)? - else { - trace!("Missing swap result for {}", txid.to_string()); - return Err("Missing swap result".into()); + let (to_amount, pool_id) = match services.result.get(&txid)? { + Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => { + (Some(to_amount), Some(pool_id)) + } + _ => { + let poolpairs = services.pool_pair_cache.get(); + + let pool_id = poolpairs + .into_iter() + .find(|pp| { + (pp.id_token_a == self.pool_swap.from_token_id.0 as u32 + && pp.id_token_b == self.pool_swap.to_token_id.0 as u32) + || (pp.id_token_a == self.pool_swap.to_token_id.0 as u32 + && pp.id_token_b == self.pool_swap.from_token_id.0 as u32) + }) + .map(|pp| pp.id); + + (None, pool_id) + } }; let from = self.pool_swap.from_script; @@ -240,6 +267,9 @@ impl Index for CompositeSwap { let pool_ids = if pools.is_empty() { // the pool_id from finals wap is the only swap while pools is empty + let pool_id = pool_id.ok_or(Error::NotFound { + kind: NotFoundKind::PoolPair, + })?; Vec::from([pool_id]) } else { pools.iter().map(|pool| pool.id.0 as u32).collect() @@ -286,3 +316,37 @@ impl Index for CompositeSwap { tx_result::invalidate(services, &ctx.tx.txid) } } + +#[derive(Default)] +pub struct PoolPairCache { + cache: RwLock>>, +} + +impl PoolPairCache { + pub fn new() -> Self { + Self { + cache: RwLock::new(None), + } + } + + pub fn get(&self) -> Vec { + { + let guard = self.cache.read(); + if let Some(poolpairs) = guard.as_ref() { + return poolpairs.clone(); + } + } + + let poolpairs = ain_cpp_imports::get_pool_pairs(); + + let mut guard = self.cache.write(); + *guard = Some(poolpairs.clone()); + + poolpairs + } + + pub fn invalidate(&self) { + let mut guard = self.cache.write(); + *guard = None; + } +} diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index 4ab49ddcccf..986488d1a21 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -8,8 +8,9 @@ use std::{path::PathBuf, sync::Arc}; pub use api::ocean_router; use error::Error; +use indexer::poolswap::PoolPairCache; pub use indexer::{ - index_block, invalidate_block, + get_block_height, index_block, invalidate_block, oracle::invalidate_oracle_interval, transaction::{index_transaction, invalidate_transaction}, tx_result, @@ -143,6 +144,7 @@ pub struct Services { pub script_unspent: ScriptUnspentService, pub token_graph: Arc>>, pub store: Arc, + pub pool_pair_cache: PoolPairCache, } impl Services { @@ -218,6 +220,7 @@ impl Services { }, token_graph: Arc::new(Mutex::new(UnGraphMap::new())), store: Arc::clone(&store), + pool_pair_cache: PoolPairCache::new(), } } } diff --git a/lib/ain-ocean/src/model/poolswap.rs b/lib/ain-ocean/src/model/poolswap.rs index 051deb7da71..6ad5af8dbe5 100644 --- a/lib/ain-ocean/src/model/poolswap.rs +++ b/lib/ain-ocean/src/model/poolswap.rs @@ -13,7 +13,7 @@ pub struct PoolSwap { pub pool_id: u32, pub from_amount: i64, pub from_token_id: u64, - pub to_amount: i64, + pub to_amount: Option, pub to_token_id: u64, pub from: ScriptBuf, pub to: ScriptBuf, diff --git a/lib/ain-rs-exports/src/lib.rs b/lib/ain-rs-exports/src/lib.rs index 484b6dea17c..347f7b9a216 100644 --- a/lib/ain-rs-exports/src/lib.rs +++ b/lib/ain-rs-exports/src/lib.rs @@ -346,6 +346,7 @@ pub mod ffi { fn evm_try_flush_db(result: &mut CrossBoundaryResult); + fn ocean_get_block_height(result: &mut CrossBoundaryResult) -> u32; fn ocean_index_block(result: &mut CrossBoundaryResult, block_str: String); fn ocean_invalidate_block(result: &mut CrossBoundaryResult, block: String); diff --git a/lib/ain-rs-exports/src/ocean.rs b/lib/ain-rs-exports/src/ocean.rs index d86efa870c8..2519e64f3b4 100644 --- a/lib/ain-rs-exports/src/ocean.rs +++ b/lib/ain-rs-exports/src/ocean.rs @@ -7,6 +7,11 @@ use crate::{ prelude::{cross_boundary_error_return, cross_boundary_success_return}, }; +#[ffi_fallible] +pub fn ocean_get_block_height() -> Result { + ain_ocean::get_block_height(&ain_ocean::SERVICES) +} + #[ffi_fallible] pub fn ocean_index_block(block_str: String) -> Result<()> { let block: Block = serde_json::from_str(&block_str)?; diff --git a/src/Makefile.am b/src/Makefile.am index 385585c02dc..04b884b5c59 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -149,6 +149,7 @@ DEFI_CORE_H = \ index/blockfilterindex.h \ index/txindex.h \ indirectmap.h \ + ocean.h \ init.h \ interfaces/chain.h \ interfaces/handler.h \ @@ -410,6 +411,7 @@ libdefi_server_a_SOURCES = \ index/blockfilterindex.cpp \ index/txindex.cpp \ interfaces/chain.cpp \ + ocean.cpp \ init.cpp \ dbwrapper.cpp \ ffi/ffiexports.cpp \ diff --git a/src/init.cpp b/src/init.cpp index 1def0f90e89..1435d4ac3c5 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -67,6 +67,7 @@ #include #include #include +#include #include #include @@ -1749,21 +1750,6 @@ void SetupInterrupts() { fInterrupt = SetupInterruptArg("-interrupt-block", fInterruptBlockHash, fInterruptBlockHeight); } -bool OceanIndex (const UniValue b) { - CrossBoundaryResult result; - ocean_index_block(result, b.write()); - if (!result.ok) { - LogPrintf("Error indexing genesis block: %s\n", result.reason); - ocean_invalidate_block(result, b.write()); - if (!result.ok) { - LogPrintf("Error invalidating genesis block: %s\n", result.reason); - return false; - } - OceanIndex(b); - } - return true; -}; - bool AppInitMain(InitInterfaces& interfaces) { const CChainParams& chainparams = Params(); @@ -2518,7 +2504,7 @@ bool AppInitMain(InitInterfaces& interfaces) } std::string error; - + if (!pwallet->GetNewDestination(OutputType::BECH32, "", dest, error)) { return InitError("Wallet not able to get new destination for mocknet"); } @@ -2579,14 +2565,19 @@ bool AppInitMain(InitInterfaces& interfaces) const UniValue b = blockToJSON(*pcustomcsview, block, tip, pblockindex, true, 2); - if (bool isIndexed = OceanIndex(b); !isIndexed) { + if (bool isIndexed = OceanIndex(b, 0); !isIndexed) { return false; } LogPrintf("WARNING: -expr-oceanarchive flag is turned on. This feature is not yet stable. Please do not use in production unless you're aware of the risks\n"); } - // ********************************************************* Step 16: start minter thread + // ********************************************************* Step 16: start ocean catchup + if (!CatchupOceanIndexer()) { + return false; + } + + // ********************************************************* Step 17: start minter thread if(gArgs.GetBoolArg("-gen", DEFAULT_GENERATE)) { if (!pos::StartStakingThreads(threadGroup)) { return false; diff --git a/src/ocean.cpp b/src/ocean.cpp new file mode 100644 index 00000000000..62a5d236db0 --- /dev/null +++ b/src/ocean.cpp @@ -0,0 +1,139 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +bool OceanIndex(const UniValue b, uint32_t blockHeight) { + CrossBoundaryResult result; + ocean_index_block(result, b.write()); + if (!result.ok) { + LogPrintf("Error indexing ocean block %d: %s\n", result.reason, blockHeight); + ocean_invalidate_block(result, b.write()); + if (!result.ok) { + LogPrintf("Error invalidating ocean %d block: %s\n", result.reason, blockHeight); + } + return false; + } + return true; +}; + +bool CatchupOceanIndexer() { + if (!gArgs.GetBoolArg("-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED) && + !gArgs.GetBoolArg("-expr-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED)) { + return true; + } + + CrossBoundaryResult result; + + auto oceanBlockHeight = ocean_get_block_height(result); + if (!result.ok) { + LogPrintf("Error getting Ocean block height: %s\n", result.reason); + return false; + } + + CBlockIndex *tip = nullptr; + { + LOCK(cs_main); + tip = ::ChainActive().Tip(); + if (!tip) { + LogPrintf("Error: Cannot get chain tip\n"); + return false; + } + } + const uint32_t tipHeight = tip->nHeight; + + if (tipHeight == oceanBlockHeight) { + return true; + } + + LogPrintf("Starting Ocean index catchup...\n"); + + uint32_t currentHeight = oceanBlockHeight; + + LogPrintf("Ocean catchup: Current height=%u, Target height=%u\n", currentHeight, tipHeight); + + uint32_t remainingBlocks = tipHeight - currentHeight; + const uint32_t startHeight = oceanBlockHeight; + int lastProgress = -1; + const auto startTime = std::chrono::steady_clock::now(); + + CBlockIndex *pindex = nullptr; + while (currentHeight < tipHeight) { + if (ShutdownRequested()) { + LogPrintf("Shutdown requested, exiting ocean catchup...\n"); + return false; + } + + { + LOCK(cs_main); + pindex = ::ChainActive()[currentHeight]; + if (!pindex) { + LogPrintf("Error: Cannot find block at height %u\n", currentHeight); + return false; + } + } + + CBlock block; + if (!ReadBlockFromDisk(block, pindex, Params().GetConsensus())) { + LogPrintf("Error: Failed to read block %s from disk\n", pindex->GetBlockHash().ToString()); + return false; + } + + const UniValue b = blockToJSON(*pcustomcsview, block, tip, pindex, true, 2); + + if (bool isIndexed = OceanIndex(b, currentHeight); !isIndexed) { + return false; + } + + currentHeight++; + + uint32_t blocksProcessed = currentHeight - startHeight; + int currentProgress = static_cast((static_cast(currentHeight * 100) / tipHeight)); + + if (currentProgress > lastProgress || currentHeight % 10000 == 0) { + auto currentTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(currentTime - startTime).count(); + + double blocksPerSecond = elapsed > 0 ? static_cast(blocksProcessed) / elapsed : 0; + + remainingBlocks = tipHeight - currentHeight; + int estimatedSecondsLeft = blocksPerSecond > 0 ? static_cast(remainingBlocks / blocksPerSecond) : 0; + + LogPrintf( + "Ocean indexing progress: %d%% (%u/%u blocks) - %.2f blocks/s - " + "ETA: %d:%02d:%02d\n", + currentProgress, + currentHeight, + tipHeight, + blocksPerSecond, + estimatedSecondsLeft / 3600, + (estimatedSecondsLeft % 3600) / 60, + estimatedSecondsLeft % 60); + + lastProgress = currentProgress; + } + } + + auto totalTime = + std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); + + LogPrintf("Ocean indexes caught up to tip. Total time: %d:%02d:%02d\n", + totalTime / 3600, + (totalTime % 3600) / 60, + totalTime % 60); + + return true; +} diff --git a/src/ocean.h b/src/ocean.h new file mode 100644 index 00000000000..5a92763a79b --- /dev/null +++ b/src/ocean.h @@ -0,0 +1,9 @@ +#ifndef DEFI_OCEAN_H +#define DEFI_OCEAN_H + +#include + +bool CatchupOceanIndexer(); +bool OceanIndex (const UniValue b, uint32_t blockHeight); + +#endif // DEFI_OCEAN_H From 07552f73f0c5b23e6fe8804028f47829f14c2f97 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:10:28 +0000 Subject: [PATCH 02/12] Keep hold of lock --- src/ocean.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/ocean.cpp b/src/ocean.cpp index 62a5d236db0..44a976592dd 100644 --- a/src/ocean.cpp +++ b/src/ocean.cpp @@ -71,19 +71,20 @@ bool CatchupOceanIndexer() { const auto startTime = std::chrono::steady_clock::now(); CBlockIndex *pindex = nullptr; - while (currentHeight < tipHeight) { + + // Lock for the whole catchup duration + LOCK(cs_main); + + while (currentHeight <= tipHeight) { if (ShutdownRequested()) { LogPrintf("Shutdown requested, exiting ocean catchup...\n"); return false; } - { - LOCK(cs_main); - pindex = ::ChainActive()[currentHeight]; - if (!pindex) { - LogPrintf("Error: Cannot find block at height %u\n", currentHeight); - return false; - } + pindex = ::ChainActive()[currentHeight]; + if (!pindex) { + LogPrintf("Error: Cannot find block at height %u\n", currentHeight); + return false; } CBlock block; From 13b48f6549e9fd66f6119fc25dac5e84ae2ca9c4 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:14:21 +0000 Subject: [PATCH 03/12] Fix RemoveOracle indexing --- lib/ain-ocean/src/indexer/oracle.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index a01f08cdf14..1919323d2f7 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -109,14 +109,14 @@ impl Index for RemoveOracle { let oracle_id = ctx.tx.txid; services.oracle.by_id.delete(&oracle_id)?; - let (_, previous) = get_previous_oracle(services, oracle_id)?; - - for price_feed in &previous.price_feeds { - services.oracle_token_currency.by_id.delete(&( - price_feed.token.to_owned(), - price_feed.currency.to_owned(), - oracle_id, - ))?; + if let Ok((_, previous)) = get_previous_oracle(services, oracle_id) { + for price_feed in &previous.price_feeds { + services.oracle_token_currency.by_id.delete(&( + price_feed.token.to_owned(), + price_feed.currency.to_owned(), + oracle_id, + ))?; + } } Ok(()) From c6dcaefec8c88c8365eb3f1c49939d230ebf6961 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:37:28 +0000 Subject: [PATCH 04/12] Small perf improvements --- lib/ain-ocean/src/indexer/poolswap.rs | 2 +- src/rpc/blockchain.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index 3e424d0aaf2..d371ab74e19 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -184,7 +184,7 @@ fn create_new_bucket( impl IndexBlockStart for PoolSwap { fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + let mut pool_pairs = services.pool_pair_cache.get(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index f14c56c0fef..9c4501cc3df 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -357,7 +357,7 @@ UniValue blockToJSON(CCustomCSView &view, const CBlock& block, const CBlockIndex // Serialize passed information without accessing chain state of the active chain! AssertLockNotHeld(cs_main); // For performance reasons const auto consensus = Params().GetConsensus(); - const auto isEvmEnabledForBlock = IsEVMEnabled(view); + const auto isEvmEnabledForBlock = version > 2 && IsEVMEnabled(view); auto txsToUniValue = [&](const CBlock& block, bool txDetails, int version) { UniValue txs(UniValue::VARR); From 29a423c3e76afcd82b85ffeea6222a562173a6a7 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:38:13 +0000 Subject: [PATCH 05/12] Revert "Keep hold of lock" This reverts commit 07552f73f0c5b23e6fe8804028f47829f14c2f97. --- src/ocean.cpp | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/ocean.cpp b/src/ocean.cpp index 44a976592dd..62a5d236db0 100644 --- a/src/ocean.cpp +++ b/src/ocean.cpp @@ -71,20 +71,19 @@ bool CatchupOceanIndexer() { const auto startTime = std::chrono::steady_clock::now(); CBlockIndex *pindex = nullptr; - - // Lock for the whole catchup duration - LOCK(cs_main); - - while (currentHeight <= tipHeight) { + while (currentHeight < tipHeight) { if (ShutdownRequested()) { LogPrintf("Shutdown requested, exiting ocean catchup...\n"); return false; } - pindex = ::ChainActive()[currentHeight]; - if (!pindex) { - LogPrintf("Error: Cannot find block at height %u\n", currentHeight); - return false; + { + LOCK(cs_main); + pindex = ::ChainActive()[currentHeight]; + if (!pindex) { + LogPrintf("Error: Cannot find block at height %u\n", currentHeight); + return false; + } } CBlock block; From f3775bd31ff7df66abd1a2108592a9ff83ad9135 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:15:04 +0000 Subject: [PATCH 06/12] Create base txid out of loop --- lib/ain-ocean/src/indexer/oracle.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 1919323d2f7..ca6f6520031 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -270,6 +270,7 @@ fn map_price_aggregated( let mut aggregated_count = Decimal::zero(); let mut aggregated_weightage = Decimal::zero(); + let base_id = Txid::from_byte_array([0xffu8; 32]); let oracles_len = oracles.len(); for (id, oracle) in oracles { if oracle.weightage == 0 { @@ -280,10 +281,7 @@ fn map_price_aggregated( let feed = services .oracle_price_feed .by_id - .list( - Some((id.0, id.1, id.2, Txid::from_byte_array([0xffu8; 32]))), - SortOrder::Descending, - )? + .list(Some((id.0, id.1, id.2, base_id)), SortOrder::Descending)? .next() .transpose()?; From ae292d46d16a9b8369a381e9245f6ff0ff51aabb Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Tue, 29 Oct 2024 20:25:15 +0000 Subject: [PATCH 07/12] Fix printf args order --- src/ocean.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocean.cpp b/src/ocean.cpp index 62a5d236db0..db09975097a 100644 --- a/src/ocean.cpp +++ b/src/ocean.cpp @@ -20,10 +20,10 @@ bool OceanIndex(const UniValue b, uint32_t blockHeight) { CrossBoundaryResult result; ocean_index_block(result, b.write()); if (!result.ok) { - LogPrintf("Error indexing ocean block %d: %s\n", result.reason, blockHeight); + LogPrintf("Error indexing ocean block %d: %s\n", blockHeight, result.reason); ocean_invalidate_block(result, b.write()); if (!result.ok) { - LogPrintf("Error invalidating ocean %d block: %s\n", result.reason, blockHeight); + LogPrintf("Error invalidating ocean %d block: %s\n", blockHeight, result.reason); } return false; } From c1996139cdec844478bcc0958aa30d63eac50808 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Wed, 30 Oct 2024 21:17:03 +0000 Subject: [PATCH 08/12] Don't hold ocean invalidation --- src/validation.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/validation.cpp b/src/validation.cpp index 5e91a19aa16..8da5d9cfd37 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -3776,7 +3776,7 @@ bool CChainState::DisconnectTip(CValidationState &state, if (gArgs.GetBoolArg("-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED) || gArgs.GetBoolArg("-expr-oceanarchive", DEFAULT_OCEAN_INDEXER_ENABLED)) { const UniValue b = blockToJSON(mnview, block, pindexDelete, pindexDelete, true, 2); - XResultThrowOnErr(ocean_invalidate_block(result, b.write())); + XResultStatusLogged(ocean_invalidate_block(result, b.write())); } bool flushed = view.Flush() && mnview.Flush(); From c8041969a78b5153bdce083d4bf3db388a4dc325 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Wed, 30 Oct 2024 21:42:29 +0000 Subject: [PATCH 09/12] Revert "Revert "Keep hold of lock"" This reverts commit 29a423c3e76afcd82b85ffeea6222a562173a6a7. --- src/ocean.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/ocean.cpp b/src/ocean.cpp index db09975097a..b04ce348d5a 100644 --- a/src/ocean.cpp +++ b/src/ocean.cpp @@ -71,19 +71,20 @@ bool CatchupOceanIndexer() { const auto startTime = std::chrono::steady_clock::now(); CBlockIndex *pindex = nullptr; - while (currentHeight < tipHeight) { + + // Lock for the whole catchup duration + LOCK(cs_main); + + while (currentHeight <= tipHeight) { if (ShutdownRequested()) { LogPrintf("Shutdown requested, exiting ocean catchup...\n"); return false; } - { - LOCK(cs_main); - pindex = ::ChainActive()[currentHeight]; - if (!pindex) { - LogPrintf("Error: Cannot find block at height %u\n", currentHeight); - return false; - } + pindex = ::ChainActive()[currentHeight]; + if (!pindex) { + LogPrintf("Error: Cannot find block at height %u\n", currentHeight); + return false; } CBlock block; From e161cc4cfc3629ed72050a2136bdeae10fd0ce78 Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Thu, 31 Oct 2024 11:16:16 +0000 Subject: [PATCH 10/12] Bump setgovheight dftx buffer size --- lib/ain-dftx/src/types/common.rs | 2 +- lib/ain-dftx/tests/data/setgovernanceheight.txt | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/ain-dftx/src/types/common.rs b/lib/ain-dftx/src/types/common.rs index 26db0bf23d0..154f8835a84 100644 --- a/lib/ain-dftx/src/types/common.rs +++ b/lib/ain-dftx/src/types/common.rs @@ -98,7 +98,7 @@ impl Decodable for RawBytes { fn consensus_decode( reader: &mut R, ) -> Result { - let mut buf = [0u8; 512]; + let mut buf = [0u8; 4096]; let v = reader.read(&mut buf)?; Ok(Self(buf[..v].to_vec())) } diff --git a/lib/ain-dftx/tests/data/setgovernanceheight.txt b/lib/ain-dftx/tests/data/setgovernanceheight.txt index 5010f7c3958..11a4fae733b 100644 --- a/lib/ain-dftx/tests/data/setgovernanceheight.txt +++ b/lib/ain-dftx/tests/data/setgovernanceheight.txt @@ -6,3 +6,5 @@ 6a4d1b01446654786a144c505f4c4f414e5f544f4b454e5f53504c4954531c1180f0fa020000000012a0f64a0000000000190ce0360000000000205c4f13000000000021cc2a1d000000000023581416000000000024f834280000000000268c6e64000000000027587b3b00000000002850430d000000000029b8af0600000000002a54361600000000002bd4010c00000000002c18b30700000000002ddc9e0c00000000002ed82a0c00000000003550620c000000000036fcb921000000000037948d2e00000000003898401800000000003d98d71c00000000003e00ae1500000000003f80920b000000000040d4a81e000000000045ac930b000000000046305f0c000000000047785019000000000048c4d510000000000070f21a00 // {"ATTRIBUTES": {"v0/poolpairs/17/token_a_fee_pct": "0.005"},"startHeight": 1896000} 6a2b446654786a0a4154545249425554455301000000007011000000610100000020a107000000000040ee1c +// TX: 944634a0ebaecfef099460015e2e4288fbf84cd7c8d5bb8716a50bb1bd1164d9, block: 4163636 +6a4d580a446654786a0a415454524942555445538e000000007021000000620100000020a1070000000000000000007021000000640800000000000000007023000000620100000020a1070000000000000000007023000000640800000000000000007024000000620100000020a1070000000000000000007024000000640800000000000000007026000000620100000020a1070000000000000000007026000000640800000000000000007027000000620100000020a1070000000000000000007027000000640800000000000000007028000000620100000020a1070000000000000000007028000000640800000000000000007029000000620100000020a107000000000000000000702900000064080000000000000000702a000000620100000020a107000000000000000000702a00000064080000000000000000702b000000620100000020a107000000000000000000702b00000064080000000000000000702c000000620100000020a107000000000000000000702c00000064080000000000000000702d000000620100000020a107000000000000000000702d00000064080000000000000000702e000000620100000020a107000000000000000000702e000000640800000000000000007035000000620100000020a1070000000000000000007035000000640800000000000000007037000000620100000020a1070000000000000000007037000000640800000000000000007038000000620100000020a107000000000000000000703800000064080000000000000000703d000000620100000020a107000000000000000000703d00000064080000000000000000703e000000620100000020a107000000000000000000703e00000064080000000000000000703f000000620100000020a107000000000000000000703f000000640800000000000000007040000000620100000020a1070000000000000000007040000000640800000000000000007045000000620100000020a1070000000000000000007045000000640800000000000000007046000000620100000020a1070000000000000000007046000000640800000000000000007047000000620100000020a1070000000000000000007047000000640800000000000000007048000000620100000020a107000000000000000000704800000064080000000000000000704d000000620100000020a107000000000000000000704d00000064080000000000000000704e000000620100000020a107000000000000000000704e00000064080000000000000000704f000000620100000020a107000000000000000000704f000000640800000000000000007050000000620100000020a1070000000000000000007050000000640800000000000000007055000000620100000020a1070000000000000000007055000000640800000000000000007056000000620100000020a1070000000000000000007056000000640800000000000000007057000000620100000020a1070000000000000000007057000000640800000000000000007058000000620100000020a107000000000000000000705800000064080000000000000000705a000000620100000020a107000000000000000000705a00000064080000000000000000705f000000620100000020a107000000000000000000705f000000640800000000000000007060000000620100000020a1070000000000000000007060000000640800000000000000007061000000620100000020a1070000000000000000007061000000640800000000000000007062000000620100000020a1070000000000000000007062000000640800000000000000007064000000620100000020a1070000000000000000007064000000640800000000000000007068000000620100000020a107000000000000000000706800000064080000000000000000706d000000620100000020a107000000000000000000706d00000064080000000000000000706e000000620100000020a107000000000000000000706e00000064080000000000000000706f000000620100000020a107000000000000000000706f000000640800000000000000007070000000620100000020a1070000000000000000007070000000640800000000000000007072000000620100000020a1070000000000000000007072000000640800000000000000007077000000620100000020a1070000000000000000007077000000640800000000000000007078000000620100000020a1070000000000000000007078000000640800000000000000007079000000620100000020a107000000000000000000707900000064080000000000000000707a000000620100000020a107000000000000000000707a0000006408000000000000000070cc000000620100000020a10700000000000000000070cc0000006408000000000000000070cd000000620100000020a10700000000000000000070cd0000006408000000000000000070ce000000620100000020a10700000000000000000070ce0000006408000000000000000070cf000000620100000020a10700000000000000000070cf0000006408000000000000000070d4000000620100000020a10700000000000000000070d40000006408000000000000000070d5000000620100000020a10700000000000000000070d50000006408000000000000000070d6000000620100000020a10700000000000000000070d60000006408000000000000000070d7000000620100000020a10700000000000000000070d70000006408000000000000000070ef000000620100000020a10700000000000000000070ef0000006408000000000000000070fe000000620100000020a10700000000000000000070fe0000006408000000000000000070ff000000620100000020a10700000000000000000070ff000000640800000000000000007000010000620100000020a1070000000000000000007000010000640800000000000000007001010000620100000020a1070000000000000000007001010000640800000000000000007002010000620100000020a1070000000000000000007002010000640800000000000000007003010000620100000020a1070000000000000000007003010000640800000000000000007004010000620100000020a1070000000000000000007004010000640800000000000000007005010000620100000020a1070000000000000000007005010000640800000000000000007006010000620100000020a1070000000000000000007006010000640800000000000000007007010000620100000020a1070000000000000000007007010000640800000000000000007008010000620100000020a1070000000000000000007008010000640800000000000000007009010000620100000020a107000000000000000000700901000064080000000000000000700a010000620100000020a107000000000000000000700a01000064080000000000000000700b010000620100000020a107000000000000000000700b010000640800000000000000007014010000620100000020a1070000000000000000007014010000640800000000b48e3f00 From 1b527c32fb36469155dbc8e860290ac4fb72a46f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 4 Nov 2024 23:20:05 +0800 Subject: [PATCH 11/12] Ocean: fix rm/update oracle index (#3107) * fix * update ocean ci * revert to direct ffi:get_pp --- .github/workflows/tests-ocean.yml | 2 +- lib/ain-ocean/src/indexer/mod.rs | 10 ++-- lib/ain-ocean/src/indexer/oracle.rs | 67 ++++++++++++++++----------- lib/ain-ocean/src/indexer/poolswap.rs | 2 +- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/.github/workflows/tests-ocean.yml b/.github/workflows/tests-ocean.yml index 7f8f4ee1261..5de85cbc520 100644 --- a/.github/workflows/tests-ocean.yml +++ b/.github/workflows/tests-ocean.yml @@ -5,7 +5,7 @@ on: pull_request: branches: - master - - ocean-refinements # TODO(): remove before merge to master + - ocean-catchup-on-startup concurrency: group: ${{ github.workflow }}-${{ github.ref || github.run_id }} diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index d20a19ab2e9..3299e63e7bb 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -298,7 +298,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> return Err(Error::NotFoundIndex { action: IndexAction::Index, - r#type: "Index script TransactionVout".to_string(), + r#type: "Script TransactionVout".to_string(), id: format!("{}-{}", vin.txid, vin.vout), }); }; @@ -386,8 +386,8 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction }; return Err(Error::NotFoundIndex { - action: IndexAction::Index, - r#type: "Index script TransactionVout".to_string(), + action: IndexAction::Invalidate, + r#type: "Script TransactionVout".to_string(), id: format!("{}-{}", vin.txid, vin.vout), }); }; @@ -428,7 +428,7 @@ fn invalidate_script_unspent_vin( let Some(transaction) = services.transaction.by_id.get(&vin.txid)? else { return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, - r#type: "Transaction".to_string(), + r#type: "ScriptUnspentVin Transaction".to_string(), id: vin.txid.to_string(), }); }; @@ -436,7 +436,7 @@ fn invalidate_script_unspent_vin( let Some(vout) = services.transaction.vout_by_id.get(&(vin.txid, vin.vout))? else { return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, - r#type: "TransactionVout".to_string(), + r#type: "ScriptUnspentVin TransactionVout".to_string(), id: format!("{}{}", vin.txid, vin.vout), }); }; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index ca6f6520031..48a92f8dc23 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -12,7 +12,8 @@ use snafu::OptionExt; use crate::{ error::{ - ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu, + ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, NotFoundIndexSnafu, + ToPrimitiveSnafu, }, indexer::{Context, Index, Result}, model::{ @@ -105,27 +106,37 @@ impl Index for AppointOracle { } impl Index for RemoveOracle { - fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - let oracle_id = ctx.tx.txid; + fn index(self, services: &Arc, _ctx: &Context) -> Result<()> { + let oracle_id = self.oracle_id; services.oracle.by_id.delete(&oracle_id)?; - if let Ok((_, previous)) = get_previous_oracle(services, oracle_id) { - for price_feed in &previous.price_feeds { - services.oracle_token_currency.by_id.delete(&( - price_feed.token.to_owned(), - price_feed.currency.to_owned(), - oracle_id, - ))?; - } + let (_, previous) = get_previous_oracle(services, oracle_id)? + .context(NotFoundIndexSnafu { + action: IndexAction::Index, + r#type: "RemoveOracle".to_string(), + id: oracle_id.to_string(), + })?; + + for PriceFeed { token, currency } in &previous.price_feeds { + services.oracle_token_currency.by_id.delete(&( + token.to_owned(), + currency.to_owned(), + oracle_id, + ))?; } Ok(()) } - fn invalidate(&self, services: &Arc, context: &Context) -> Result<()> { + fn invalidate(&self, services: &Arc, _ctx: &Context) -> Result<()> { trace!("[RemoveOracle] Invalidating..."); - let oracle_id = context.tx.txid; - let (_, previous) = get_previous_oracle(services, oracle_id)?; + let oracle_id = self.oracle_id; + let (_, previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Invalidate, + r#type: "RemoveOracle".to_string(), + id: oracle_id.to_string(), + })?; let oracle = Oracle { owner_address: previous.owner_address, @@ -154,7 +165,7 @@ impl Index for RemoveOracle { impl Index for UpdateOracle { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - let oracle_id = ctx.tx.txid; + let oracle_id = self.oracle_id; let price_feeds = self .price_feeds .iter() @@ -176,7 +187,12 @@ impl Index for UpdateOracle { .by_id .put(&(oracle_id, ctx.block.height), &oracle)?; - let (_, previous) = get_previous_oracle(services, oracle_id)?; + let (_, previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Index, + r#type: "UpdateOracle".to_string(), + id: oracle_id.to_string(), + })?; for price_feed in &previous.price_feeds { services.oracle_token_currency.by_id.delete(&( price_feed.token.to_owned(), @@ -201,7 +217,7 @@ impl Index for UpdateOracle { fn invalidate(&self, services: &Arc, context: &Context) -> Result<()> { trace!("[UpdateOracle] Invalidating..."); - let oracle_id = context.tx.txid; + let oracle_id = self.oracle_id; services .oracle_history .by_id @@ -215,7 +231,12 @@ impl Index for UpdateOracle { self.oracle_id, ))?; } - let ((prev_oracle_id, _), previous) = get_previous_oracle(services, oracle_id)?; + let ((prev_oracle_id, _), previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Invalidate, + r#type: "UpdateOracle".to_string(), + id: oracle_id.to_string(), + })?; let prev_oracle = Oracle { owner_address: previous.owner_address, @@ -704,7 +725,7 @@ fn backward_aggregate_value( fn get_previous_oracle( services: &Arc, oracle_id: Txid, -) -> Result<(OracleHistoryId, Oracle)> { +) -> Result> { let previous = services .oracle_history .by_id @@ -712,13 +733,5 @@ fn get_previous_oracle( .next() .transpose()?; - let Some(previous) = previous else { - return Err(Error::NotFoundIndex { - action: IndexAction::Index, - r#type: "OracleHistory".to_string(), - id: oracle_id.to_string(), - }); - }; - Ok(previous) } diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index d371ab74e19..a713dbc7f19 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -184,7 +184,7 @@ fn create_new_bucket( impl IndexBlockStart for PoolSwap { fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = services.pool_pair_cache.get(); + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS { From 10da700d8832b8a36ed88e036e4699a8b4a5b86b Mon Sep 17 00:00:00 2001 From: jouzo <15011228+Jouzo@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:27:33 +0000 Subject: [PATCH 12/12] Remove clone --- lib/ain-ocean/src/api/oracle.rs | 29 ++++++++++++++--------------- lib/ain-ocean/src/indexer/oracle.rs | 15 +++++++-------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/lib/ain-ocean/src/api/oracle.rs b/lib/ain-ocean/src/api/oracle.rs index 7a7a94f33f8..d1b309f63c3 100644 --- a/lib/ain-ocean/src/api/oracle.rs +++ b/lib/ain-ocean/src/api/oracle.rs @@ -96,34 +96,33 @@ async fn get_feed( let key = (token, currency, txid); - let price_feed_list = ctx + let oracle_price_feeds = ctx .services .oracle_price_feed .by_id .list(None, SortOrder::Descending)? .paginate(&query) .flatten() - .collect::>(); - - let mut oracle_price_feeds = Vec::new(); - - for ((token, currency, oracle_id, txid), feed) in &price_feed_list { - if key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id) { + .into_iter() + .filter(|((token, currency, oracle_id, _), _)| { + key.0.eq(token) && key.1.eq(currency) && key.2.eq(oracle_id) + }) + .map(|((token, currency, oracle_id, txid), feed)| { let amount = Decimal::from(feed.amount) / Decimal::from(COIN); - oracle_price_feeds.push(OraclePriceFeedResponse { + OraclePriceFeedResponse { id: format!("{}-{}-{}-{}", token, currency, oracle_id, txid), key: format!("{}-{}-{}", token, currency, oracle_id), sort: hex::encode(feed.block.height.to_string() + &txid.to_string()), - token: token.to_owned(), - currency: currency.to_owned(), - oracle_id: oracle_id.to_owned(), - txid: *txid, + token, + currency, + oracle_id, + txid, time: feed.time, amount: amount.normalize().to_string(), block: feed.block.clone(), - }); - } - } + } + }) + .collect::>(); Ok(ApiPagedResponse::of( oracle_price_feeds, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 48a92f8dc23..295ddbcbd6e 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -110,19 +110,18 @@ impl Index for RemoveOracle { let oracle_id = self.oracle_id; services.oracle.by_id.delete(&oracle_id)?; - let (_, previous) = get_previous_oracle(services, oracle_id)? - .context(NotFoundIndexSnafu { + let (_, mut previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { action: IndexAction::Index, r#type: "RemoveOracle".to_string(), id: oracle_id.to_string(), })?; - for PriceFeed { token, currency } in &previous.price_feeds { - services.oracle_token_currency.by_id.delete(&( - token.to_owned(), - currency.to_owned(), - oracle_id, - ))?; + for PriceFeed { token, currency } in previous.price_feeds.drain(..) { + services + .oracle_token_currency + .by_id + .delete(&(token, currency, oracle_id))?; } Ok(())