Skip to content

Commit

Permalink
fix poolswap index_block_start
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Oct 25, 2024
1 parent 3cefefa commit 0d16235
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions lib/ain-ocean/src/indexer/poolswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ use snafu::OptionExt;

use super::{Context, IndexBlockStart};
use crate::{
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu},
indexer::{tx_result, Index, Result},
model::{
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, indexer::{tx_result, Index, Result}, model::{
self, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated, PoolSwapResult,
TxResult,
},
storage::{RepositoryOps, SortOrder},
Services,
}, storage::{RepositoryOps, SortOrder}, PoolSwapAggregatedService, Services
};

pub const AGGREGATED_INTERVALS: [u32; 2] = [
Expand All @@ -38,7 +34,7 @@ fn index_swap_aggregated(
txid: Txid,
) -> Result<()> {
for interval in AGGREGATED_INTERVALS {
let repo: &crate::PoolSwapAggregatedService = &services.pool_swap_aggregated;
let repo = &services.pool_swap_aggregated;
let prevs = repo
.by_key
.list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)?
Expand Down Expand Up @@ -150,16 +146,49 @@ fn invalidate_swap_aggregated(
Ok(())
}

fn create_new_bucket(
repo: &PoolSwapAggregatedService,
bucket: i64,
pool_pair_id: u32,
interval: u32,
block: &BlockContext
) -> Result<()> {
let aggregated = PoolSwapAggregated {
bucket,
aggregated: PoolSwapAggregatedAggregated {
amounts: Default::default(),
},
block: BlockContext {
hash: block.hash,
height: block.height,
time: block.time,
median_time: block.median_time,
},
};

let pool_swap_aggregated_key = (pool_pair_id, interval, bucket);
let pool_swap_aggregated_id = (pool_pair_id, interval, block.hash);

repo
.by_key
.put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?;
repo
.by_id
.put(&pool_swap_aggregated_id, &aggregated)?;

Ok(())
}

impl IndexBlockStart for PoolSwap {
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let repository = &services.pool_swap_aggregated;
let repo = &services.pool_swap_aggregated;

let prevs = repository
let prev = repo
.by_key
.list(
Some((pool_pair.id, interval, i64::MAX)),
Expand All @@ -172,44 +201,23 @@ impl IndexBlockStart for PoolSwap {
.next()
.transpose()?;

let Some((_, prev_id)) = prevs else {
break;
};

let prev = repository.by_id.get(&prev_id)?;
let bucket = block.median_time - (block.median_time % interval as i64);

let Some(prev) = prev else {
break;
let Some((_, prev_id)) = prev else {
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
continue;
};

let bucket = block.median_time - (block.median_time % interval as i64);
let Some(prev) = repo.by_id.get(&prev_id)? else {
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
continue;
};

if prev.bucket >= bucket {
break;
}

let aggregated = PoolSwapAggregated {
bucket,
aggregated: PoolSwapAggregatedAggregated {
amounts: Default::default(),
},
block: BlockContext {
hash: block.hash,
height: block.height,
time: block.time,
median_time: block.median_time,
},
};

let pool_swap_aggregated_key = (pool_pair.id, interval, bucket);
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);

repository
.by_key
.put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?;
repository
.by_id
.put(&pool_swap_aggregated_id, &aggregated)?;
create_new_bucket(repo, bucket, pool_pair.id, interval, block)?;
}
}

Expand Down

0 comments on commit 0d16235

Please sign in to comment.