diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index afff0c0581..fa413f4cb7 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -9,14 +9,10 @@ use snafu::OptionExt; use super::{Context, IndexBlockStart}; use crate::{ - error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, - indexer::{tx_result, Index, Result}, - model::{ + error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, indexer::{tx_result, Index, Result}, model::{ self, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated, PoolSwapResult, TxResult, - }, - storage::{RepositoryOps, SortOrder}, - Services, + }, storage::{RepositoryOps, SortOrder}, PoolSwapAggregatedService, Services }; pub const AGGREGATED_INTERVALS: [u32; 2] = [ @@ -38,7 +34,7 @@ fn index_swap_aggregated( txid: Txid, ) -> Result<()> { for interval in AGGREGATED_INTERVALS { - let repo: &crate::PoolSwapAggregatedService = &services.pool_swap_aggregated; + let repo = &services.pool_swap_aggregated; let prevs = repo .by_key .list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)? @@ -150,6 +146,39 @@ fn invalidate_swap_aggregated( Ok(()) } +fn create_new_bucket( + repo: &PoolSwapAggregatedService, + bucket: i64, + pool_pair_id: u32, + interval: u32, + block: &BlockContext +) -> Result<()> { + let aggregated = PoolSwapAggregated { + bucket, + aggregated: PoolSwapAggregatedAggregated { + amounts: Default::default(), + }, + block: BlockContext { + hash: block.hash, + height: block.height, + time: block.time, + median_time: block.median_time, + }, + }; + + let pool_swap_aggregated_key = (pool_pair_id, interval, bucket); + let pool_swap_aggregated_id = (pool_pair_id, interval, block.hash); + + repo + .by_key + .put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?; + repo + .by_id + .put(&pool_swap_aggregated_id, &aggregated)?; + + Ok(()) +} + impl IndexBlockStart for PoolSwap { fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); @@ -157,9 +186,9 @@ impl IndexBlockStart for PoolSwap { for interval in AGGREGATED_INTERVALS { for pool_pair in &pool_pairs { - let repository = &services.pool_swap_aggregated; + let repo = &services.pool_swap_aggregated; - let prevs = repository + let prev = repo .by_key .list( Some((pool_pair.id, interval, i64::MAX)), @@ -172,44 +201,23 @@ impl IndexBlockStart for PoolSwap { .next() .transpose()?; - let Some((_, prev_id)) = prevs else { - break; - }; - - let prev = repository.by_id.get(&prev_id)?; + let bucket = block.median_time - (block.median_time % interval as i64); - let Some(prev) = prev else { - break; + let Some((_, prev_id)) = prev else { + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; }; - let bucket = block.median_time - (block.median_time % interval as i64); + let Some(prev) = repo.by_id.get(&prev_id)? else { + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; + continue; + }; if prev.bucket >= bucket { break; } - let aggregated = PoolSwapAggregated { - bucket, - aggregated: PoolSwapAggregatedAggregated { - amounts: Default::default(), - }, - block: BlockContext { - hash: block.hash, - height: block.height, - time: block.time, - median_time: block.median_time, - }, - }; - - let pool_swap_aggregated_key = (pool_pair.id, interval, bucket); - let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); - - repository - .by_key - .put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?; - repository - .by_id - .put(&pool_swap_aggregated_id, &aggregated)?; + create_new_bucket(repo, bucket, pool_pair.id, interval, block)?; } }