diff --git a/.github/workflows/tests-ocean.yml b/.github/workflows/tests-ocean.yml index 7f8f4ee126..5de85cbc52 100644 --- a/.github/workflows/tests-ocean.yml +++ b/.github/workflows/tests-ocean.yml @@ -5,7 +5,7 @@ on: pull_request: branches: - master - - ocean-refinements # TODO(): remove before merge to master + - ocean-catchup-on-startup concurrency: group: ${{ github.workflow }}-${{ github.ref || github.run_id }} diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index d20a19ab2e..3299e63e7b 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -298,7 +298,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> return Err(Error::NotFoundIndex { action: IndexAction::Index, - r#type: "Index script TransactionVout".to_string(), + r#type: "Script TransactionVout".to_string(), id: format!("{}-{}", vin.txid, vin.vout), }); }; @@ -386,8 +386,8 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction }; return Err(Error::NotFoundIndex { - action: IndexAction::Index, - r#type: "Index script TransactionVout".to_string(), + action: IndexAction::Invalidate, + r#type: "Script TransactionVout".to_string(), id: format!("{}-{}", vin.txid, vin.vout), }); }; @@ -428,7 +428,7 @@ fn invalidate_script_unspent_vin( let Some(transaction) = services.transaction.by_id.get(&vin.txid)? else { return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, - r#type: "Transaction".to_string(), + r#type: "ScriptUnspentVin Transaction".to_string(), id: vin.txid.to_string(), }); }; @@ -436,7 +436,7 @@ fn invalidate_script_unspent_vin( let Some(vout) = services.transaction.vout_by_id.get(&(vin.txid, vin.vout))? else { return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, - r#type: "TransactionVout".to_string(), + r#type: "ScriptUnspentVin TransactionVout".to_string(), id: format!("{}{}", vin.txid, vin.vout), }); }; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index ca6f652003..48a92f8dc2 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -12,7 +12,8 @@ use snafu::OptionExt; use crate::{ error::{ - ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, ToPrimitiveSnafu, + ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, IndexAction, NotFoundIndexSnafu, + ToPrimitiveSnafu, }, indexer::{Context, Index, Result}, model::{ @@ -105,27 +106,37 @@ impl Index for AppointOracle { } impl Index for RemoveOracle { - fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - let oracle_id = ctx.tx.txid; + fn index(self, services: &Arc, _ctx: &Context) -> Result<()> { + let oracle_id = self.oracle_id; services.oracle.by_id.delete(&oracle_id)?; - if let Ok((_, previous)) = get_previous_oracle(services, oracle_id) { - for price_feed in &previous.price_feeds { - services.oracle_token_currency.by_id.delete(&( - price_feed.token.to_owned(), - price_feed.currency.to_owned(), - oracle_id, - ))?; - } + let (_, previous) = get_previous_oracle(services, oracle_id)? + .context(NotFoundIndexSnafu { + action: IndexAction::Index, + r#type: "RemoveOracle".to_string(), + id: oracle_id.to_string(), + })?; + + for PriceFeed { token, currency } in &previous.price_feeds { + services.oracle_token_currency.by_id.delete(&( + token.to_owned(), + currency.to_owned(), + oracle_id, + ))?; } Ok(()) } - fn invalidate(&self, services: &Arc, context: &Context) -> Result<()> { + fn invalidate(&self, services: &Arc, _ctx: &Context) -> Result<()> { trace!("[RemoveOracle] Invalidating..."); - let oracle_id = context.tx.txid; - let (_, previous) = get_previous_oracle(services, oracle_id)?; + let oracle_id = self.oracle_id; + let (_, previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Invalidate, + r#type: "RemoveOracle".to_string(), + id: oracle_id.to_string(), + })?; let oracle = Oracle { owner_address: previous.owner_address, @@ -154,7 +165,7 @@ impl Index for RemoveOracle { impl Index for UpdateOracle { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { - let oracle_id = ctx.tx.txid; + let oracle_id = self.oracle_id; let price_feeds = self .price_feeds .iter() @@ -176,7 +187,12 @@ impl Index for UpdateOracle { .by_id .put(&(oracle_id, ctx.block.height), &oracle)?; - let (_, previous) = get_previous_oracle(services, oracle_id)?; + let (_, previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Index, + r#type: "UpdateOracle".to_string(), + id: oracle_id.to_string(), + })?; for price_feed in &previous.price_feeds { services.oracle_token_currency.by_id.delete(&( price_feed.token.to_owned(), @@ -201,7 +217,7 @@ impl Index for UpdateOracle { fn invalidate(&self, services: &Arc, context: &Context) -> Result<()> { trace!("[UpdateOracle] Invalidating..."); - let oracle_id = context.tx.txid; + let oracle_id = self.oracle_id; services .oracle_history .by_id @@ -215,7 +231,12 @@ impl Index for UpdateOracle { self.oracle_id, ))?; } - let ((prev_oracle_id, _), previous) = get_previous_oracle(services, oracle_id)?; + let ((prev_oracle_id, _), previous) = + get_previous_oracle(services, oracle_id)?.context(NotFoundIndexSnafu { + action: IndexAction::Invalidate, + r#type: "UpdateOracle".to_string(), + id: oracle_id.to_string(), + })?; let prev_oracle = Oracle { owner_address: previous.owner_address, @@ -704,7 +725,7 @@ fn backward_aggregate_value( fn get_previous_oracle( services: &Arc, oracle_id: Txid, -) -> Result<(OracleHistoryId, Oracle)> { +) -> Result> { let previous = services .oracle_history .by_id @@ -712,13 +733,5 @@ fn get_previous_oracle( .next() .transpose()?; - let Some(previous) = previous else { - return Err(Error::NotFoundIndex { - action: IndexAction::Index, - r#type: "OracleHistory".to_string(), - id: oracle_id.to_string(), - }); - }; - Ok(previous) } diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index d371ab74e1..a713dbc7f1 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -184,7 +184,7 @@ fn create_new_bucket( impl IndexBlockStart for PoolSwap { fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { - let mut pool_pairs = services.pool_pair_cache.get(); + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); for interval in AGGREGATED_INTERVALS {