From 37f604fcf24653e5777d9a8d4bddb468e889560a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Wed, 30 Oct 2024 09:39:33 -0700 Subject: [PATCH] Pindexer remove unused appviews (#4910) ## Describe your changes This removes: - The Dex AppView - The ShieldedPool AppView The Dex AppView shouldn't be needed, given that we have an app view specific to the dex explorer now. The shielded pool app view was just an example, which can be removed given the many other examples now. Other AppViews which should be removed in a later PR after making sure they're not used: - the supply AppView - superceded by the insights app view, but is known to be used - the ibc AppView - I think there's ongoing development using this, potentially? ## Testing The other app views should continue to work. ## Checklist before requesting a review - [x] I have added guiding text to explain how a reviewer should test these changes. - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- crates/bin/pindexer/src/dex/dex.sql | 149 ----- crates/bin/pindexer/src/dex/mod.rs | 633 ------------------- crates/bin/pindexer/src/indexer_ext.rs | 2 +- crates/bin/pindexer/src/lib.rs | 3 - crates/bin/pindexer/src/shielded_pool.rs | 1 - crates/bin/pindexer/src/shielded_pool/fmd.rs | 61 -- crates/bin/pindexer/src/sql.rs | 84 --- 7 files changed, 1 insertion(+), 932 deletions(-) delete mode 100644 crates/bin/pindexer/src/dex/dex.sql delete mode 100644 crates/bin/pindexer/src/dex/mod.rs delete mode 100644 crates/bin/pindexer/src/shielded_pool.rs delete mode 100644 crates/bin/pindexer/src/shielded_pool/fmd.rs delete mode 100644 crates/bin/pindexer/src/sql.rs diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql deleted file mode 100644 index a5d1c3c13d..0000000000 --- a/crates/bin/pindexer/src/dex/dex.sql +++ /dev/null @@ -1,149 +0,0 @@ --- This component is responsible for processing events related to the DEX. - --- # Design Choices --- --- ## Asset IDs --- --- We represent them as raw bytes---i.e. BYTEA---, rather than using a 1:1 table. --- This is probably more efficient, and makes our lives much easier by the fact --- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter --- tables, rather than needing to do a join with another table. - - -DROP DOMAIN IF EXISTS Amount; -CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL; - -DROP TYPE IF EXISTS Value CASCADE; -CREATE TYPE Value AS ( - amount Amount, - asset BYTEA -); - --- Keeps track of changes to the dex's value circuit breaker. -CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( - -- The asset being moved into or out of the dex. - asset_id BYTEA NOT NULL, - -- The flow, either positive, or negative, into the dex via this particular asset. - -- - -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow Amount -); - --- One step of an execution trace. -CREATE TABLE IF NOT EXISTS dex_trace_step ( - id SERIAL PRIMARY KEY, - value Value -); - --- A single trace, showing what a small amount of an input asset was exchanged for. -CREATE TABLE IF NOT EXISTS dex_trace ( - id SERIAL PRIMARY KEY, - step_start INTEGER REFERENCES dex_trace_step(id), - step_end INTEGER REFERENCES dex_trace_step(id) -); - ---- Represents instances where arb executions happened. -CREATE TABLE IF NOT EXISTS dex_arb ( - height BIGINT PRIMARY KEY, - input Value, - output Value, - trace_start INTEGER REFERENCES dex_trace(id), - trace_end INTEGER REFERENCES dex_trace(id) -); - -DROP DOMAIN IF EXISTS Bps; -CREATE DOMAIN Bps AS INTEGER CHECK(VALUE BETWEEN 0 and 10000); - --- Holds the current state of a given liquidity position -CREATE TABLE IF NOT EXISTS dex_lp ( - id BYTEA PRIMARY KEY, - -- The enum for the current state of the position - state TEXT NOT NULL, - -- The first asset of the position - asset1 BYTEA NOT NULL, - -- The second asset of the position - asset2 BYTEA NOT NULL, - p Amount, - q Amount, - -- The fee, in basis points - fee_bps Bps, - -- How much of asset2 you get when swapping asset1. - price12 NUMERIC GENERATED ALWAYS AS (((1 - fee_bps::NUMERIC / 10000) * (p / q))) STORED, - -- How much of asset1 you get when swapping asset2. - price21 NUMERIC GENERATED ALWAYS AS (((1 - fee_bps::NUMERIC / 10000) * (q / p))) STORED, - -- Whether the position will be closed when all reserves are depleted - close_on_fill BOOLEAN NOT NULL, - -- The amount of reserves of asset 1. - reserves1 Amount, - -- The amount of reserves of asset 2. - reserves2 Amount -); --- So that we can easily query positions by ascending price -CREATE INDEX ON dex_lp(asset1, price12); -CREATE INDEX ON dex_lp(asset2, price21); - --- Holds update events to liquidity position -CREATE TABLE IF NOT EXISTS dex_lp_update ( - id SERIAL PRIMARY KEY, - -- The block height where the update occurred. - height BIGINT NOT NULL, - -- The identifier for the position - position_id BYTEA NOT NULL, - -- The new state of the position - state TEXT NOT NULL, - -- The new reserves of asset1 (potentially null) - reserves1 NUMERIC(39, 0), - -- The new reserves of asset2 (potentially null) - reserves2 NUMERIC(39, 0), - -- If present, a reference to the execution table, for execution events - execution_id INTEGER -); -CREATE INDEX ON dex_lp_update(position_id, height DESC, id DESC); -CREATE INDEX ON dex_lp_update(execution_id); - --- Holds data related to execution events -CREATE TABLE IF NOT EXISTS dex_lp_execution ( - id SERIAL PRIMARY KEY, - -- The amount of asset1 that was pushed into (or pulled out of, if negative) this position. - inflow1 Amount, - -- As above, with asset2. - inflow2 Amount, - -- The start asset for this execution. - context_start BYTEA NOT NULL, - -- The end asset for this execution. - context_end BYTEA NOT NULL -); - ---- Represents instances where swap executions happened. -CREATE TABLE IF NOT EXISTS dex_batch_swap ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - trace12_start INTEGER REFERENCES dex_trace (id), - trace12_end INTEGER REFERENCES dex_trace (id), - trace21_start INTEGER REFERENCES dex_trace (id), - trace21_end INTEGER REFERENCES dex_trace (id), - asset1 BYTEA NOT NULL, - asset2 BYTEA NOT NULL, - unfilled1 Amount NOT NULL, - unfilled2 Amount NOT NULL, - delta1 Amount NOT NULL, - delta2 Amount NOT NULL, - lambda1 Amount NOT NULL, - lambda2 Amount NOT NULL -); - -CREATE INDEX ON dex_batch_swap(height); -CREATE INDEX ON dex_batch_swap(asset1, height); -CREATE INDEX ON dex_batch_swap(asset2, height); - --- Represents instances of invididual swaps into the batch. -CREATE TABLE IF NOT EXISTS dex_swap ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - value1 Value, - value2 Value -); - -CREATE INDEX ON dex_swap(height, id); -CREATE INDEX ON dex_swap(((value1).asset)); -CREATE INDEX ON dex_swap(((value2).asset)); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs deleted file mode 100644 index ec668d7bb8..0000000000 --- a/crates/bin/pindexer/src/dex/mod.rs +++ /dev/null @@ -1,633 +0,0 @@ -use std::collections::HashSet; - -use anyhow::{anyhow, Context}; -use cometindex::async_trait; -use penumbra_asset::asset::Id as AssetId; -use penumbra_dex::lp::position::{Id, Position}; -use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair}; -use penumbra_num::Amount; -use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; -use sqlx::{PgPool, Postgres, Transaction}; - -use crate::sql::Sql; -use crate::{AppView, ContextualizedEvent, PgTransaction}; - -/// Insert a swap execution into the database. -/// -/// This returns the start and end indices of its trace. -async fn insert_swap_execution<'d>( - dbtx: &mut Transaction<'d, Postgres>, - execution: Option<&SwapExecution>, -) -> anyhow::Result<(Option, Option)> { - let execution = match execution { - None => return Ok((None, None)), - Some(e) => e, - }; - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = - sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } - Ok((trace_start, trace_end)) -} - -/// One of the possible events that we care about. -#[derive(Clone, Debug)] -enum Event { - /// A parsed version of [pb::EventValueCircuitBreakerCredit]. - CircuitBreakerCredit { - asset_id: AssetId, - previous_balance: Amount, - new_balance: Amount, - }, - /// A parsed version of [pb::EventValueCircuitBreakerDebit] - CircuitBreakerDebit { - asset_id: AssetId, - previous_balance: Amount, - new_balance: Amount, - }, - /// A parsed version of [pb::EventArbExecution] - ArbExecution { - height: u64, - execution: SwapExecution, - }, - /// A parsed version of [pb::EventBatchSwap] - BatchSwap { - height: u64, - execution12: Option, - execution21: Option, - output_data: BatchSwapOutputData, - }, - /// A parsed version of [pb::EventPositionOpen] - PositionOpen { height: u64, position: Position }, - /// A parsed version of [pb::EventPositionWithdraw] - PositionWithdraw { - height: u64, - position_id: Id, - reserves_1: Amount, - reserves_2: Amount, - sequence: u64, - }, - /// A parsed version of [pb::EventPositionClose] - PositionClose { height: u64, position_id: Id }, - /// A parsed version of [pb::EventPositionExecution] - PositionExecution { - height: u64, - position_id: Id, - reserves_1: Amount, - reserves_2: Amount, - prev_reserves_1: Amount, - prev_reserves_2: Amount, - context: DirectedTradingPair, - }, - /// A parsed version of [pb::EventSwap] - Swap { - height: u64, - trading_pair: TradingPair, - delta_1_i: Amount, - delta_2_i: Amount, - }, -} - -impl Event { - const NAMES: [&'static str; 9] = [ - "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", - "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", - "penumbra.core.component.dex.v1.EventArbExecution", - "penumbra.core.component.dex.v1.EventPositionWithdraw", - "penumbra.core.component.dex.v1.EventPositionOpen", - "penumbra.core.component.dex.v1.EventPositionClose", - "penumbra.core.component.dex.v1.EventPositionExecution", - "penumbra.core.component.dex.v1.EventBatchSwap", - "penumbra.core.component.dex.v1.EventSwap", - ]; - - /// Index this event, using the handle to the postgres transaction. - async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> { - match self { - Event::CircuitBreakerCredit { - asset_id, - previous_balance, - new_balance, - } => { - let amount = new_balance.checked_sub(&previous_balance).ok_or(anyhow!( - "balance decreased after dex credit: previous: {}, new: {}", - previous_balance, - new_balance - ))?; - sqlx::query( - r#" - INSERT INTO dex_value_circuit_breaker_change - VALUES ($1, CAST($2 AS Amount)); - "#, - ) - .bind(Sql::from(*asset_id)) - .bind(amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::CircuitBreakerDebit { - asset_id, - previous_balance, - new_balance, - } => { - let amount = previous_balance.checked_sub(&new_balance).ok_or(anyhow!( - "balance increased after dex credit: previous: {}, new: {}", - previous_balance, - new_balance - ))?; - sqlx::query( - r#" - INSERT INTO dex_value_circuit_breaker_change - VALUES ($1, -(CAST($2 AS Amount))); - "#, - ) - .bind(Sql::from(*asset_id)) - .bind(amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::ArbExecution { height, execution } => { - let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?; - sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) - .bind(i64::try_from(*height)?) - .bind(execution.input.amount.to_string()) - .bind(Sql::from(execution.input.asset_id)) - .bind(execution.output.amount.to_string()) - .bind(Sql::from(execution.output.asset_id)) - .bind(trace_start) - .bind(trace_end) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionOpen { height, position } => { - let Position { - state, - phi: TradingFunction { pair, component }, - .. - } = position; - let id = position.id().0; - tracing::debug!( - p = component.p.to_string(), - q = component.q.to_string(), - r1 = position.reserves_1().amount.to_string(), - r2 = position.reserves_2().amount.to_string() - ); - sqlx::query( - " - INSERT INTO dex_lp (id, state, asset1, asset2, p, q, fee_bps, close_on_fill, reserves1, reserves2) - VALUES ($1, $2, $3, $4, CAST($5 as Amount), CAST($6 AS Amount), $7, $8, CAST($9 AS Amount), CAST($10 AS Amount)); - ", - ) - .bind(id) - .bind(state.to_string()) - .bind(pair.asset_1().to_bytes()) - .bind(pair.asset_2().to_bytes()) - .bind(component.p.to_string()) - .bind(component.q.to_string()) - .bind(i32::try_from(component.fee)?) - .bind(position.close_on_fill) - .bind(position.reserves_1().amount.to_string()) - .bind(position.reserves_2().amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount)); - ", - ) - .bind(i64::try_from(*height)?) - .bind(id) - .bind(state.to_string()) - .bind(position.reserves_1().amount.to_string()) - .bind(position.reserves_2().amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionClose { - height, - position_id, - } => { - let state = lp::position::State::Closed; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state) - VALUES ($1, $2, $3) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET state = $2 - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(state.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionWithdraw { - height, - position_id, - reserves_1: reserves1, - reserves_2: reserves2, - sequence, - } => { - let state = lp::position::State::Withdrawn { - sequence: *sequence, - }; - let reserves1 = reserves1.to_string(); - let reserves2 = reserves2.to_string(); - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount)) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET state = $2, reserves1 = CAST($3 AS Amount), reserves2 = CAST($4 AS Amount) - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionExecution { - height, - position_id, - reserves_1: reserves1, - reserves_2: reserves2, - context, - prev_reserves_1: prev_reserves1, - prev_reserves_2: prev_reserves2, - } => { - let state = lp::position::State::Opened; - let reserves1 = reserves1.to_string(); - let reserves2 = reserves2.to_string(); - let prev_reserves1 = prev_reserves1.to_string(); - let prev_reserves2 = prev_reserves2.to_string(); - let id: i32 = sqlx::query_scalar( - " - INSERT INTO dex_lp_execution (inflow1, inflow2, context_start, context_end) - VALUES (CAST($1 AS Amount) - CAST($2 AS Amount), CAST($3 AS Amount) - CAST($4 AS Amount), $5, $6) - RETURNING id; - ",) - .bind(&reserves1) - .bind(prev_reserves1) - .bind(&reserves2) - .bind(prev_reserves2) - .bind(context.start.to_bytes()) - .bind(context.end.to_bytes()) - .fetch_one(dbtx.as_mut()).await?; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2, execution_id) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount), $6) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .bind(id) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET reserves1 = CAST($2 AS Amount), reserves2 = CAST($3 AS Amount) - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::BatchSwap { - height, - execution12, - execution21, - output_data, - } => { - let (trace12_start, trace12_end) = - insert_swap_execution(dbtx, execution12.as_ref()).await?; - let (trace21_start, trace21_end) = - insert_swap_execution(dbtx, execution21.as_ref()).await?; - sqlx::query(r#"INSERT INTO dex_batch_swap VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) - .bind(i64::try_from(*height)?) - .bind(trace12_start) - .bind(trace12_end) - .bind(trace21_start) - .bind(trace21_end) - .bind(Sql::from(output_data.trading_pair.asset_1())) - .bind(Sql::from(output_data.trading_pair.asset_2())) - .bind(output_data.unfilled_1.to_string()) - .bind(output_data.unfilled_2.to_string()) - .bind(output_data.delta_1.to_string()) - .bind(output_data.delta_2.to_string()) - .bind(output_data.lambda_1.to_string()) - .bind(output_data.lambda_2.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::Swap { - height, - trading_pair, - delta_1_i, - delta_2_i, - } => { - sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#) - .bind(i64::try_from(*height)?) - .bind(delta_1_i.to_string()) - .bind(Sql::from(trading_pair.asset_1())) - .bind(delta_2_i.to_string()) - .bind(Sql::from(trading_pair.asset_2())) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - } - } -} - -impl<'a> TryFrom<&'a ContextualizedEvent> for Event { - type Error = anyhow::Error; - - fn try_from(event: &'a ContextualizedEvent) -> Result { - match event.event.kind.as_str() { - // Credit - x if x == Event::NAMES[0] => { - let pe = pb::EventValueCircuitBreakerCredit::from_event(event.as_ref())?; - let asset_id = - AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; - let previous_balance = Amount::try_from( - pe.previous_balance - .ok_or(anyhow!("event missing previous_balance"))?, - )?; - let new_balance = - Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; - Ok(Self::CircuitBreakerCredit { - asset_id, - previous_balance, - new_balance, - }) - } - // Debit - x if x == Event::NAMES[1] => { - let pe = pb::EventValueCircuitBreakerDebit::from_event(event.as_ref())?; - let asset_id = - AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; - let previous_balance = Amount::try_from( - pe.previous_balance - .ok_or(anyhow!("event missing previous_balance"))?, - )?; - let new_balance = - Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; - Ok(Self::CircuitBreakerDebit { - asset_id, - previous_balance, - new_balance, - }) - } - // Arb - x if x == Event::NAMES[2] => { - let pe = pb::EventArbExecution::from_event(event.as_ref())?; - let height = pe.height; - let execution = pe - .swap_execution - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; - Ok(Self::ArbExecution { height, execution }) - } - // LP Withdraw - x if x == Event::NAMES[3] => { - let pe = pb::EventPositionWithdraw::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - let reserves_1 = pe - .reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let reserves_2 = pe - .reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let sequence = pe.sequence; - Ok(Self::PositionWithdraw { - height, - position_id, - reserves_1, - reserves_2, - sequence, - }) - } - // LP Open - x if x == Event::NAMES[4] => { - let pe = pb::EventPositionOpen::from_event(event.as_ref())?; - let height = event.block_height; - let position = pe - .position - .ok_or(anyhow!("missing position")) - .context("(make sure you're using pd >= 0.79.3)")? - .try_into()?; - Ok(Self::PositionOpen { height, position }) - } - // LP Close - x if x == Event::NAMES[5] => { - let pe = pb::EventPositionClose::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - - Ok(Self::PositionClose { - height, - position_id, - }) - } - // LP Execution - x if x == Event::NAMES[6] => { - let pe = pb::EventPositionExecution::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - let reserves_1 = pe - .reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let reserves_2 = pe - .reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let prev_reserves_1 = pe - .prev_reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let prev_reserves_2 = pe - .prev_reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let context: DirectedTradingPair = - pe.context.ok_or(anyhow!("missing context"))?.try_into()?; - Ok(Self::PositionExecution { - height, - position_id, - reserves_1, - reserves_2, - prev_reserves_1, - prev_reserves_2, - context, - }) - } - // Batch Swap - x if x == Event::NAMES[7] => { - let pe = pb::EventBatchSwap::from_event(event.as_ref())?; - let height = event.block_height; - let output_data = pe - .batch_swap_output_data - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; - let execution12 = pe - .swap_execution_1_for_2 - .map(|x| x.try_into()) - .transpose()?; - let execution21 = pe - .swap_execution_2_for_1 - .map(|x| x.try_into()) - .transpose()?; - Ok(Self::BatchSwap { - height, - execution12, - execution21, - output_data, - }) - } - // Swap - x if x == Event::NAMES[8] => { - let pe = pb::EventSwap::from_event(event.as_ref())?; - let height = event.block_height; - let trading_pair = pe - .trading_pair - .expect("trading_pair should be present") - .try_into()?; - let delta_1_i = pe - .delta_1_i - .expect("delta_1_i should be present") - .try_into()?; - let delta_2_i = pe - .delta_2_i - .expect("delta_2_i should be present") - .try_into()?; - Ok(Self::Swap { - height, - trading_pair, - delta_1_i, - delta_2_i, - }) - } - x => Err(anyhow!(format!("unrecognized event kind: {x}"))), - } - } -} - -#[derive(Debug)] -pub struct Component { - event_strings: HashSet<&'static str>, -} - -impl Component { - pub fn new() -> Self { - let event_strings = Event::NAMES.into_iter().collect(); - Self { event_strings } - } -} - -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> anyhow::Result<()> { - for statement in include_str!("dex.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - self.event_strings.contains(type_str) - } - - #[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))] - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> anyhow::Result<()> { - Event::try_from(event)?.index(dbtx).await - } -} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 90600e43ab..ddaa667635 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -6,7 +6,7 @@ pub trait IndexerExt: Sized { impl IndexerExt for cometindex::Indexer { fn with_default_penumbra_app_views(self) -> Self { - self.with_index(crate::shielded_pool::fmd::ClueSet {}) + self.with_index(crate::block::Block {}) .with_index(crate::stake::ValidatorSet {}) .with_index(crate::stake::Slashings {}) .with_index(crate::stake::DelegationTxs {}) diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 47429f1c10..529b56102f 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -3,13 +3,10 @@ pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; -pub mod dex; pub mod dex_ex; pub mod ibc; pub mod insights; mod parsing; -pub mod shielded_pool; -mod sql; pub mod stake; pub mod supply; diff --git a/crates/bin/pindexer/src/shielded_pool.rs b/crates/bin/pindexer/src/shielded_pool.rs deleted file mode 100644 index b0d0276891..0000000000 --- a/crates/bin/pindexer/src/shielded_pool.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fmd; diff --git a/crates/bin/pindexer/src/shielded_pool/fmd.rs b/crates/bin/pindexer/src/shielded_pool/fmd.rs deleted file mode 100644 index 50f4942dd3..0000000000 --- a/crates/bin/pindexer/src/shielded_pool/fmd.rs +++ /dev/null @@ -1,61 +0,0 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; -use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent}; - -#[derive(Debug)] -pub struct ClueSet {} - -#[async_trait] -impl AppView for ClueSet { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - // table name is module path + struct name - " -CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set ( - id SERIAL PRIMARY KEY, - clue_bytes BYTEA NOT NULL, - tx_hash BYTEA NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue" - } - - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<(), anyhow::Error> { - let pe = pb::EventBroadcastClue::from_event(event.as_ref())?; - - let clue_bytes = pe - .clue - .ok_or_else(|| anyhow::anyhow!("clue event missing clue"))? - .inner; - - let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec(); - - sqlx::query( - " - INSERT INTO shielded_pool_fmd_clue_set (clue_bytes, tx_hash) - VALUES ($1, $2) - ", - ) - .bind(&clue_bytes) - .bind(&tx_hash) - .execute(dbtx.as_mut()) - .await?; - - Ok(()) - } -} diff --git a/crates/bin/pindexer/src/sql.rs b/crates/bin/pindexer/src/sql.rs deleted file mode 100644 index 9d17faa978..0000000000 --- a/crates/bin/pindexer/src/sql.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::error::Error; - -use penumbra_asset::asset::Id as AssetId; -use sqlx::{Decode, Encode, Postgres, Type}; - -/// An extension trait to make it easier to implement serialization for existing Penumbra types. -/// -/// Types that implement this trait can then be shoved into [Sql] and passed along -/// to the various sqlx functions. -pub trait SqlExt: Clone + Sized { - type SqlT; - - fn to_sql_type(&self) -> Self::SqlT; - fn from_sql_type(value: Self::SqlT) -> anyhow::Result; -} - -/// A wrapper over `T` allowing for SQL serialization and deserialization. -/// -/// When `T` implements [SqlExt] then this type will be encodeable and decodeable -/// from a Postgres database. -pub struct Sql(T); - -impl Sql { - #[allow(dead_code)] - pub fn into(self) -> T { - self.0 - } -} - -impl From for Sql { - fn from(value: T) -> Self { - Self(value) - } -} - -impl<'q, T> Encode<'q, Postgres> for Sql -where - T: SqlExt, - T::SqlT: Encode<'q, Postgres>, -{ - fn encode_by_ref( - &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { - ::to_sql_type(&self.0).encode_by_ref(buf) - } -} - -impl<'q, T> Decode<'q, Postgres> for Sql -where - T: SqlExt, - T::SqlT: Decode<'q, Postgres>, -{ - fn decode( - value: >::ValueRef, - ) -> Result { - let sql_t = ::SqlT::decode(value)?; - let t = T::from_sql_type(sql_t) - .map_err(|e| Box::::from(e))?; - Ok(Sql(t)) - } -} - -impl Type for Sql -where - T: SqlExt, - T::SqlT: Type, -{ - fn type_info() -> ::TypeInfo { - <[u8; 32]>::type_info() - } -} - -impl SqlExt for AssetId { - type SqlT = [u8; 32]; - - fn to_sql_type(&self) -> Self::SqlT { - self.to_bytes() - } - - fn from_sql_type(value: Self::SqlT) -> anyhow::Result { - Ok(AssetId::try_from(value.as_slice())?) - } -}