Skip to content

Commit

Permalink
Ocean: fix rm/update oracle index (#3107)
Browse files Browse the repository at this point in the history
* fix

* update ocean ci

* revert to direct ffi:get_pp
  • Loading branch information
canonbrother authored Nov 4, 2024
1 parent e161cc4 commit 1b527c3
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests-ocean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
branches:
- master
- ocean-refinements # TODO(): remove before merge to master
- ocean-catchup-on-startup

concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
Expand Down
10 changes: 5 additions & 5 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ fn index_script(services: &Arc<Services>, ctx: &Context, txs: &[Transaction]) ->

return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "Index script TransactionVout".to_string(),
r#type: "Script TransactionVout".to_string(),
id: format!("{}-{}", vin.txid, vin.vout),
});
};
Expand Down Expand Up @@ -386,8 +386,8 @@ fn invalidate_script(services: &Arc<Services>, ctx: &Context, txs: &[Transaction
};

return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "Index script TransactionVout".to_string(),
action: IndexAction::Invalidate,
r#type: "Script TransactionVout".to_string(),
id: format!("{}-{}", vin.txid, vin.vout),
});
};
Expand Down Expand Up @@ -428,15 +428,15 @@ fn invalidate_script_unspent_vin(
let Some(transaction) = services.transaction.by_id.get(&vin.txid)? else {
return Err(Error::NotFoundIndex {
action: IndexAction::Invalidate,
r#type: "Transaction".to_string(),
r#type: "ScriptUnspentVin Transaction".to_string(),
id: vin.txid.to_string(),
});
};

let Some(vout) = services.transaction.vout_by_id.get(&(vin.txid, vin.vout))? else {
return Err(Error::NotFoundIndex {
action: IndexAction::Invalidate,
r#type: "TransactionVout".to_string(),
r#type: "ScriptUnspentVin TransactionVout".to_string(),
id: format!("{}{}", vin.txid, vin.vout),
});
};
Expand Down
67 changes: 40 additions & 27 deletions lib/ain-ocean/src/indexer/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use snafu::OptionExt;

use crate::{
error::{
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu,
ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, NotFoundIndexSnafu,
ToPrimitiveSnafu,
},
indexer::{Context, Index, Result},
model::{
Expand Down Expand Up @@ -105,27 +106,37 @@ impl Index for AppointOracle {
}

impl Index for RemoveOracle {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
let oracle_id = ctx.tx.txid;
fn index(self, services: &Arc<Services>, _ctx: &Context) -> Result<()> {
let oracle_id = self.oracle_id;
services.oracle.by_id.delete(&oracle_id)?;

if let Ok((_, previous)) = get_previous_oracle(services, oracle_id) {
for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
price_feed.currency.to_owned(),
oracle_id,
))?;
}
let (_, previous) = get_previous_oracle(services, oracle_id)?
.context(NotFoundIndexSnafu {
action: IndexAction::Index,
r#type: "RemoveOracle".to_string(),
id: oracle_id.to_string(),
})?;

for PriceFeed { token, currency } in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
token.to_owned(),
currency.to_owned(),
oracle_id,
))?;
}

Ok(())
}

fn invalidate(&self, services: &Arc<Services>, context: &Context) -> Result<()> {
fn invalidate(&self, services: &Arc<Services>, _ctx: &Context) -> Result<()> {
trace!("[RemoveOracle] Invalidating...");
let oracle_id = context.tx.txid;
let (_, previous) = get_previous_oracle(services, oracle_id)?;
let oracle_id = self.oracle_id;
let (_, previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Invalidate,
r#type: "RemoveOracle".to_string(),
id: oracle_id.to_string(),
})?;

let oracle = Oracle {
owner_address: previous.owner_address,
Expand Down Expand Up @@ -154,7 +165,7 @@ impl Index for RemoveOracle {

impl Index for UpdateOracle {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
let oracle_id = ctx.tx.txid;
let oracle_id = self.oracle_id;
let price_feeds = self
.price_feeds
.iter()
Expand All @@ -176,7 +187,12 @@ impl Index for UpdateOracle {
.by_id
.put(&(oracle_id, ctx.block.height), &oracle)?;

let (_, previous) = get_previous_oracle(services, oracle_id)?;
let (_, previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Index,
r#type: "UpdateOracle".to_string(),
id: oracle_id.to_string(),
})?;
for price_feed in &previous.price_feeds {
services.oracle_token_currency.by_id.delete(&(
price_feed.token.to_owned(),
Expand All @@ -201,7 +217,7 @@ impl Index for UpdateOracle {

fn invalidate(&self, services: &Arc<Services>, context: &Context) -> Result<()> {
trace!("[UpdateOracle] Invalidating...");
let oracle_id = context.tx.txid;
let oracle_id = self.oracle_id;
services
.oracle_history
.by_id
Expand All @@ -215,7 +231,12 @@ impl Index for UpdateOracle {
self.oracle_id,
))?;
}
let ((prev_oracle_id, _), previous) = get_previous_oracle(services, oracle_id)?;
let ((prev_oracle_id, _), previous) =
get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu {
action: IndexAction::Invalidate,
r#type: "UpdateOracle".to_string(),
id: oracle_id.to_string(),
})?;

let prev_oracle = Oracle {
owner_address: previous.owner_address,
Expand Down Expand Up @@ -704,21 +725,13 @@ fn backward_aggregate_value(
fn get_previous_oracle(
services: &Arc<Services>,
oracle_id: Txid,
) -> Result<(OracleHistoryId, Oracle)> {
) -> Result<Option<(OracleHistoryId, Oracle)>> {
let previous = services
.oracle_history
.by_id
.list(Some((oracle_id, u32::MAX)), SortOrder::Descending)?
.next()
.transpose()?;

let Some(previous) = previous else {
return Err(Error::NotFoundIndex {
action: IndexAction::Index,
r#type: "OracleHistory".to_string(),
id: oracle_id.to_string(),
});
};

Ok(previous)
}
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/indexer/poolswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn create_new_bucket(

impl IndexBlockStart for PoolSwap {
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
let mut pool_pairs = services.pool_pair_cache.get();
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
Expand Down

0 comments on commit 1b527c3

Please sign in to comment.