diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql deleted file mode 100644 index a5d1c3c13d..0000000000 --- a/crates/bin/pindexer/src/dex/dex.sql +++ /dev/null @@ -1,149 +0,0 @@ --- This component is responsible for processing events related to the DEX. - --- # Design Choices --- --- ## Asset IDs --- --- We represent them as raw bytes---i.e. BYTEA---, rather than using a 1:1 table. --- This is probably more efficient, and makes our lives much easier by the fact --- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter --- tables, rather than needing to do a join with another table. - - -DROP DOMAIN IF EXISTS Amount; -CREATE DOMAIN Amount AS NUMERIC(39, 0) NOT NULL; - -DROP TYPE IF EXISTS Value CASCADE; -CREATE TYPE Value AS ( - amount Amount, - asset BYTEA -); - --- Keeps track of changes to the dex's value circuit breaker. -CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( - -- The asset being moved into or out of the dex. - asset_id BYTEA NOT NULL, - -- The flow, either positive, or negative, into the dex via this particular asset. - -- - -- Because we're dealing with arbitrary assets, we need to use something which can store u128 - flow Amount -); - --- One step of an execution trace. -CREATE TABLE IF NOT EXISTS dex_trace_step ( - id SERIAL PRIMARY KEY, - value Value -); - --- A single trace, showing what a small amount of an input asset was exchanged for. -CREATE TABLE IF NOT EXISTS dex_trace ( - id SERIAL PRIMARY KEY, - step_start INTEGER REFERENCES dex_trace_step(id), - step_end INTEGER REFERENCES dex_trace_step(id) -); - ---- Represents instances where arb executions happened. -CREATE TABLE IF NOT EXISTS dex_arb ( - height BIGINT PRIMARY KEY, - input Value, - output Value, - trace_start INTEGER REFERENCES dex_trace(id), - trace_end INTEGER REFERENCES dex_trace(id) -); - -DROP DOMAIN IF EXISTS Bps; -CREATE DOMAIN Bps AS INTEGER CHECK(VALUE BETWEEN 0 and 10000); - --- Holds the current state of a given liquidity position -CREATE TABLE IF NOT EXISTS dex_lp ( - id BYTEA PRIMARY KEY, - -- The enum for the current state of the position - state TEXT NOT NULL, - -- The first asset of the position - asset1 BYTEA NOT NULL, - -- The second asset of the position - asset2 BYTEA NOT NULL, - p Amount, - q Amount, - -- The fee, in basis points - fee_bps Bps, - -- How much of asset2 you get when swapping asset1. - price12 NUMERIC GENERATED ALWAYS AS (((1 - fee_bps::NUMERIC / 10000) * (p / q))) STORED, - -- How much of asset1 you get when swapping asset2. - price21 NUMERIC GENERATED ALWAYS AS (((1 - fee_bps::NUMERIC / 10000) * (q / p))) STORED, - -- Whether the position will be closed when all reserves are depleted - close_on_fill BOOLEAN NOT NULL, - -- The amount of reserves of asset 1. - reserves1 Amount, - -- The amount of reserves of asset 2. - reserves2 Amount -); --- So that we can easily query positions by ascending price -CREATE INDEX ON dex_lp(asset1, price12); -CREATE INDEX ON dex_lp(asset2, price21); - --- Holds update events to liquidity position -CREATE TABLE IF NOT EXISTS dex_lp_update ( - id SERIAL PRIMARY KEY, - -- The block height where the update occurred. - height BIGINT NOT NULL, - -- The identifier for the position - position_id BYTEA NOT NULL, - -- The new state of the position - state TEXT NOT NULL, - -- The new reserves of asset1 (potentially null) - reserves1 NUMERIC(39, 0), - -- The new reserves of asset2 (potentially null) - reserves2 NUMERIC(39, 0), - -- If present, a reference to the execution table, for execution events - execution_id INTEGER -); -CREATE INDEX ON dex_lp_update(position_id, height DESC, id DESC); -CREATE INDEX ON dex_lp_update(execution_id); - --- Holds data related to execution events -CREATE TABLE IF NOT EXISTS dex_lp_execution ( - id SERIAL PRIMARY KEY, - -- The amount of asset1 that was pushed into (or pulled out of, if negative) this position. - inflow1 Amount, - -- As above, with asset2. - inflow2 Amount, - -- The start asset for this execution. - context_start BYTEA NOT NULL, - -- The end asset for this execution. - context_end BYTEA NOT NULL -); - ---- Represents instances where swap executions happened. -CREATE TABLE IF NOT EXISTS dex_batch_swap ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - trace12_start INTEGER REFERENCES dex_trace (id), - trace12_end INTEGER REFERENCES dex_trace (id), - trace21_start INTEGER REFERENCES dex_trace (id), - trace21_end INTEGER REFERENCES dex_trace (id), - asset1 BYTEA NOT NULL, - asset2 BYTEA NOT NULL, - unfilled1 Amount NOT NULL, - unfilled2 Amount NOT NULL, - delta1 Amount NOT NULL, - delta2 Amount NOT NULL, - lambda1 Amount NOT NULL, - lambda2 Amount NOT NULL -); - -CREATE INDEX ON dex_batch_swap(height); -CREATE INDEX ON dex_batch_swap(asset1, height); -CREATE INDEX ON dex_batch_swap(asset2, height); - --- Represents instances of invididual swaps into the batch. -CREATE TABLE IF NOT EXISTS dex_swap ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - value1 Value, - value2 Value -); - -CREATE INDEX ON dex_swap(height, id); -CREATE INDEX ON dex_swap(((value1).asset)); -CREATE INDEX ON dex_swap(((value2).asset)); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs deleted file mode 100644 index ec668d7bb8..0000000000 --- a/crates/bin/pindexer/src/dex/mod.rs +++ /dev/null @@ -1,633 +0,0 @@ -use std::collections::HashSet; - -use anyhow::{anyhow, Context}; -use cometindex::async_trait; -use penumbra_asset::asset::Id as AssetId; -use penumbra_dex::lp::position::{Id, Position}; -use penumbra_dex::lp::{self, TradingFunction}; -use penumbra_dex::{BatchSwapOutputData, DirectedTradingPair, SwapExecution, TradingPair}; -use penumbra_num::Amount; -use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; -use sqlx::{PgPool, Postgres, Transaction}; - -use crate::sql::Sql; -use crate::{AppView, ContextualizedEvent, PgTransaction}; - -/// Insert a swap execution into the database. -/// -/// This returns the start and end indices of its trace. -async fn insert_swap_execution<'d>( - dbtx: &mut Transaction<'d, Postgres>, - execution: Option<&SwapExecution>, -) -> anyhow::Result<(Option, Option)> { - let execution = match execution { - None => return Ok((None, None)), - Some(e) => e, - }; - let mut trace_start = None; - let mut trace_end = None; - for trace in &execution.traces { - let mut step_start = None; - let mut step_end = None; - for step in trace { - let (id,): (i32,) = sqlx::query_as( - r#"INSERT INTO dex_trace_step VALUES (DEFAULT, (CAST($1 AS Amount), $2)) RETURNING id;"#, - ) - .bind(step.amount.to_string()) - .bind(Sql::from(step.asset_id)) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = step_start { - step_start = Some(id); - } - step_end = Some(id); - } - let (id,): (i32,) = - sqlx::query_as(r#"INSERT INTO dex_trace VALUES (DEFAULT, $1, $2) RETURNING id;"#) - .bind(step_start) - .bind(step_end) - .fetch_one(dbtx.as_mut()) - .await?; - if let None = trace_start { - trace_start = Some(id); - } - trace_end = Some(id); - } - Ok((trace_start, trace_end)) -} - -/// One of the possible events that we care about. -#[derive(Clone, Debug)] -enum Event { - /// A parsed version of [pb::EventValueCircuitBreakerCredit]. - CircuitBreakerCredit { - asset_id: AssetId, - previous_balance: Amount, - new_balance: Amount, - }, - /// A parsed version of [pb::EventValueCircuitBreakerDebit] - CircuitBreakerDebit { - asset_id: AssetId, - previous_balance: Amount, - new_balance: Amount, - }, - /// A parsed version of [pb::EventArbExecution] - ArbExecution { - height: u64, - execution: SwapExecution, - }, - /// A parsed version of [pb::EventBatchSwap] - BatchSwap { - height: u64, - execution12: Option, - execution21: Option, - output_data: BatchSwapOutputData, - }, - /// A parsed version of [pb::EventPositionOpen] - PositionOpen { height: u64, position: Position }, - /// A parsed version of [pb::EventPositionWithdraw] - PositionWithdraw { - height: u64, - position_id: Id, - reserves_1: Amount, - reserves_2: Amount, - sequence: u64, - }, - /// A parsed version of [pb::EventPositionClose] - PositionClose { height: u64, position_id: Id }, - /// A parsed version of [pb::EventPositionExecution] - PositionExecution { - height: u64, - position_id: Id, - reserves_1: Amount, - reserves_2: Amount, - prev_reserves_1: Amount, - prev_reserves_2: Amount, - context: DirectedTradingPair, - }, - /// A parsed version of [pb::EventSwap] - Swap { - height: u64, - trading_pair: TradingPair, - delta_1_i: Amount, - delta_2_i: Amount, - }, -} - -impl Event { - const NAMES: [&'static str; 9] = [ - "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", - "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", - "penumbra.core.component.dex.v1.EventArbExecution", - "penumbra.core.component.dex.v1.EventPositionWithdraw", - "penumbra.core.component.dex.v1.EventPositionOpen", - "penumbra.core.component.dex.v1.EventPositionClose", - "penumbra.core.component.dex.v1.EventPositionExecution", - "penumbra.core.component.dex.v1.EventBatchSwap", - "penumbra.core.component.dex.v1.EventSwap", - ]; - - /// Index this event, using the handle to the postgres transaction. - async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> { - match self { - Event::CircuitBreakerCredit { - asset_id, - previous_balance, - new_balance, - } => { - let amount = new_balance.checked_sub(&previous_balance).ok_or(anyhow!( - "balance decreased after dex credit: previous: {}, new: {}", - previous_balance, - new_balance - ))?; - sqlx::query( - r#" - INSERT INTO dex_value_circuit_breaker_change - VALUES ($1, CAST($2 AS Amount)); - "#, - ) - .bind(Sql::from(*asset_id)) - .bind(amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::CircuitBreakerDebit { - asset_id, - previous_balance, - new_balance, - } => { - let amount = previous_balance.checked_sub(&new_balance).ok_or(anyhow!( - "balance increased after dex credit: previous: {}, new: {}", - previous_balance, - new_balance - ))?; - sqlx::query( - r#" - INSERT INTO dex_value_circuit_breaker_change - VALUES ($1, -(CAST($2 AS Amount))); - "#, - ) - .bind(Sql::from(*asset_id)) - .bind(amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::ArbExecution { height, execution } => { - let (trace_start, trace_end) = insert_swap_execution(dbtx, Some(execution)).await?; - sqlx::query(r#"INSERT INTO dex_arb VALUES ($1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5), $6, $7);"#) - .bind(i64::try_from(*height)?) - .bind(execution.input.amount.to_string()) - .bind(Sql::from(execution.input.asset_id)) - .bind(execution.output.amount.to_string()) - .bind(Sql::from(execution.output.asset_id)) - .bind(trace_start) - .bind(trace_end) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionOpen { height, position } => { - let Position { - state, - phi: TradingFunction { pair, component }, - .. - } = position; - let id = position.id().0; - tracing::debug!( - p = component.p.to_string(), - q = component.q.to_string(), - r1 = position.reserves_1().amount.to_string(), - r2 = position.reserves_2().amount.to_string() - ); - sqlx::query( - " - INSERT INTO dex_lp (id, state, asset1, asset2, p, q, fee_bps, close_on_fill, reserves1, reserves2) - VALUES ($1, $2, $3, $4, CAST($5 as Amount), CAST($6 AS Amount), $7, $8, CAST($9 AS Amount), CAST($10 AS Amount)); - ", - ) - .bind(id) - .bind(state.to_string()) - .bind(pair.asset_1().to_bytes()) - .bind(pair.asset_2().to_bytes()) - .bind(component.p.to_string()) - .bind(component.q.to_string()) - .bind(i32::try_from(component.fee)?) - .bind(position.close_on_fill) - .bind(position.reserves_1().amount.to_string()) - .bind(position.reserves_2().amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount)); - ", - ) - .bind(i64::try_from(*height)?) - .bind(id) - .bind(state.to_string()) - .bind(position.reserves_1().amount.to_string()) - .bind(position.reserves_2().amount.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionClose { - height, - position_id, - } => { - let state = lp::position::State::Closed; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state) - VALUES ($1, $2, $3) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET state = $2 - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(state.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionWithdraw { - height, - position_id, - reserves_1: reserves1, - reserves_2: reserves2, - sequence, - } => { - let state = lp::position::State::Withdrawn { - sequence: *sequence, - }; - let reserves1 = reserves1.to_string(); - let reserves2 = reserves2.to_string(); - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount)) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET state = $2, reserves1 = CAST($3 AS Amount), reserves2 = CAST($4 AS Amount) - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::PositionExecution { - height, - position_id, - reserves_1: reserves1, - reserves_2: reserves2, - context, - prev_reserves_1: prev_reserves1, - prev_reserves_2: prev_reserves2, - } => { - let state = lp::position::State::Opened; - let reserves1 = reserves1.to_string(); - let reserves2 = reserves2.to_string(); - let prev_reserves1 = prev_reserves1.to_string(); - let prev_reserves2 = prev_reserves2.to_string(); - let id: i32 = sqlx::query_scalar( - " - INSERT INTO dex_lp_execution (inflow1, inflow2, context_start, context_end) - VALUES (CAST($1 AS Amount) - CAST($2 AS Amount), CAST($3 AS Amount) - CAST($4 AS Amount), $5, $6) - RETURNING id; - ",) - .bind(&reserves1) - .bind(prev_reserves1) - .bind(&reserves2) - .bind(prev_reserves2) - .bind(context.start.to_bytes()) - .bind(context.end.to_bytes()) - .fetch_one(dbtx.as_mut()).await?; - sqlx::query( - " - INSERT INTO dex_lp_update (height, position_id, state, reserves1, reserves2, execution_id) - VALUES ($1, $2, $3, CAST($4 AS Amount), CAST($5 AS Amount), $6) - ", - ) - .bind(i64::try_from(*height)?) - .bind(position_id.0) - .bind(state.to_string()) - .bind(&reserves1) - .bind(&reserves2) - .bind(id) - .execute(dbtx.as_mut()) - .await?; - sqlx::query( - " - UPDATE dex_lp - SET reserves1 = CAST($2 AS Amount), reserves2 = CAST($3 AS Amount) - WHERE id = $1; - ", - ) - .bind(position_id.0) - .bind(&reserves1) - .bind(&reserves2) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::BatchSwap { - height, - execution12, - execution21, - output_data, - } => { - let (trace12_start, trace12_end) = - insert_swap_execution(dbtx, execution12.as_ref()).await?; - let (trace21_start, trace21_end) = - insert_swap_execution(dbtx, execution21.as_ref()).await?; - sqlx::query(r#"INSERT INTO dex_batch_swap VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, CAST($8 AS Amount), CAST($9 AS Amount), CAST($10 AS Amount), CAST($11 AS Amount), CAST($12 AS Amount), CAST($13 AS Amount));"#) - .bind(i64::try_from(*height)?) - .bind(trace12_start) - .bind(trace12_end) - .bind(trace21_start) - .bind(trace21_end) - .bind(Sql::from(output_data.trading_pair.asset_1())) - .bind(Sql::from(output_data.trading_pair.asset_2())) - .bind(output_data.unfilled_1.to_string()) - .bind(output_data.unfilled_2.to_string()) - .bind(output_data.delta_1.to_string()) - .bind(output_data.delta_2.to_string()) - .bind(output_data.lambda_1.to_string()) - .bind(output_data.lambda_2.to_string()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - Event::Swap { - height, - trading_pair, - delta_1_i, - delta_2_i, - } => { - sqlx::query(r#"INSERT INTO dex_swap VALUES (DEFAULT, $1, (CAST($2 AS Amount), $3), (CAST($4 AS Amount), $5));"#) - .bind(i64::try_from(*height)?) - .bind(delta_1_i.to_string()) - .bind(Sql::from(trading_pair.asset_1())) - .bind(delta_2_i.to_string()) - .bind(Sql::from(trading_pair.asset_2())) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - } - } -} - -impl<'a> TryFrom<&'a ContextualizedEvent> for Event { - type Error = anyhow::Error; - - fn try_from(event: &'a ContextualizedEvent) -> Result { - match event.event.kind.as_str() { - // Credit - x if x == Event::NAMES[0] => { - let pe = pb::EventValueCircuitBreakerCredit::from_event(event.as_ref())?; - let asset_id = - AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; - let previous_balance = Amount::try_from( - pe.previous_balance - .ok_or(anyhow!("event missing previous_balance"))?, - )?; - let new_balance = - Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; - Ok(Self::CircuitBreakerCredit { - asset_id, - previous_balance, - new_balance, - }) - } - // Debit - x if x == Event::NAMES[1] => { - let pe = pb::EventValueCircuitBreakerDebit::from_event(event.as_ref())?; - let asset_id = - AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; - let previous_balance = Amount::try_from( - pe.previous_balance - .ok_or(anyhow!("event missing previous_balance"))?, - )?; - let new_balance = - Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; - Ok(Self::CircuitBreakerDebit { - asset_id, - previous_balance, - new_balance, - }) - } - // Arb - x if x == Event::NAMES[2] => { - let pe = pb::EventArbExecution::from_event(event.as_ref())?; - let height = pe.height; - let execution = pe - .swap_execution - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; - Ok(Self::ArbExecution { height, execution }) - } - // LP Withdraw - x if x == Event::NAMES[3] => { - let pe = pb::EventPositionWithdraw::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - let reserves_1 = pe - .reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let reserves_2 = pe - .reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let sequence = pe.sequence; - Ok(Self::PositionWithdraw { - height, - position_id, - reserves_1, - reserves_2, - sequence, - }) - } - // LP Open - x if x == Event::NAMES[4] => { - let pe = pb::EventPositionOpen::from_event(event.as_ref())?; - let height = event.block_height; - let position = pe - .position - .ok_or(anyhow!("missing position")) - .context("(make sure you're using pd >= 0.79.3)")? - .try_into()?; - Ok(Self::PositionOpen { height, position }) - } - // LP Close - x if x == Event::NAMES[5] => { - let pe = pb::EventPositionClose::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - - Ok(Self::PositionClose { - height, - position_id, - }) - } - // LP Execution - x if x == Event::NAMES[6] => { - let pe = pb::EventPositionExecution::from_event(event.as_ref())?; - let height = event.block_height; - let position_id = pe - .position_id - .ok_or(anyhow!("missing position id"))? - .try_into()?; - let reserves_1 = pe - .reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let reserves_2 = pe - .reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let prev_reserves_1 = pe - .prev_reserves_1 - .ok_or(anyhow!("missing reserves_1"))? - .try_into()?; - let prev_reserves_2 = pe - .prev_reserves_2 - .ok_or(anyhow!("missing reserves_2"))? - .try_into()?; - let context: DirectedTradingPair = - pe.context.ok_or(anyhow!("missing context"))?.try_into()?; - Ok(Self::PositionExecution { - height, - position_id, - reserves_1, - reserves_2, - prev_reserves_1, - prev_reserves_2, - context, - }) - } - // Batch Swap - x if x == Event::NAMES[7] => { - let pe = pb::EventBatchSwap::from_event(event.as_ref())?; - let height = event.block_height; - let output_data = pe - .batch_swap_output_data - .ok_or(anyhow!("missing swap execution"))? - .try_into()?; - let execution12 = pe - .swap_execution_1_for_2 - .map(|x| x.try_into()) - .transpose()?; - let execution21 = pe - .swap_execution_2_for_1 - .map(|x| x.try_into()) - .transpose()?; - Ok(Self::BatchSwap { - height, - execution12, - execution21, - output_data, - }) - } - // Swap - x if x == Event::NAMES[8] => { - let pe = pb::EventSwap::from_event(event.as_ref())?; - let height = event.block_height; - let trading_pair = pe - .trading_pair - .expect("trading_pair should be present") - .try_into()?; - let delta_1_i = pe - .delta_1_i - .expect("delta_1_i should be present") - .try_into()?; - let delta_2_i = pe - .delta_2_i - .expect("delta_2_i should be present") - .try_into()?; - Ok(Self::Swap { - height, - trading_pair, - delta_1_i, - delta_2_i, - }) - } - x => Err(anyhow!(format!("unrecognized event kind: {x}"))), - } - } -} - -#[derive(Debug)] -pub struct Component { - event_strings: HashSet<&'static str>, -} - -impl Component { - pub fn new() -> Self { - let event_strings = Event::NAMES.into_iter().collect(); - Self { event_strings } - } -} - -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> anyhow::Result<()> { - for statement in include_str!("dex.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - self.event_strings.contains(type_str) - } - - #[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))] - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> anyhow::Result<()> { - Event::try_from(event)?.index(dbtx).await - } -} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 90600e43ab..ddaa667635 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -6,7 +6,7 @@ pub trait IndexerExt: Sized { impl IndexerExt for cometindex::Indexer { fn with_default_penumbra_app_views(self) -> Self { - self.with_index(crate::shielded_pool::fmd::ClueSet {}) + self.with_index(crate::block::Block {}) .with_index(crate::stake::ValidatorSet {}) .with_index(crate::stake::Slashings {}) .with_index(crate::stake::DelegationTxs {}) diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 47429f1c10..529b56102f 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -3,13 +3,10 @@ pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; -pub mod dex; pub mod dex_ex; pub mod ibc; pub mod insights; mod parsing; -pub mod shielded_pool; -mod sql; pub mod stake; pub mod supply; diff --git a/crates/bin/pindexer/src/shielded_pool.rs b/crates/bin/pindexer/src/shielded_pool.rs deleted file mode 100644 index b0d0276891..0000000000 --- a/crates/bin/pindexer/src/shielded_pool.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fmd; diff --git a/crates/bin/pindexer/src/shielded_pool/fmd.rs b/crates/bin/pindexer/src/shielded_pool/fmd.rs deleted file mode 100644 index 50f4942dd3..0000000000 --- a/crates/bin/pindexer/src/shielded_pool/fmd.rs +++ /dev/null @@ -1,61 +0,0 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; -use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent}; - -#[derive(Debug)] -pub struct ClueSet {} - -#[async_trait] -impl AppView for ClueSet { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - // table name is module path + struct name - " -CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set ( - id SERIAL PRIMARY KEY, - clue_bytes BYTEA NOT NULL, - tx_hash BYTEA NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue" - } - - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<(), anyhow::Error> { - let pe = pb::EventBroadcastClue::from_event(event.as_ref())?; - - let clue_bytes = pe - .clue - .ok_or_else(|| anyhow::anyhow!("clue event missing clue"))? - .inner; - - let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec(); - - sqlx::query( - " - INSERT INTO shielded_pool_fmd_clue_set (clue_bytes, tx_hash) - VALUES ($1, $2) - ", - ) - .bind(&clue_bytes) - .bind(&tx_hash) - .execute(dbtx.as_mut()) - .await?; - - Ok(()) - } -} diff --git a/crates/bin/pindexer/src/sql.rs b/crates/bin/pindexer/src/sql.rs deleted file mode 100644 index 9d17faa978..0000000000 --- a/crates/bin/pindexer/src/sql.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::error::Error; - -use penumbra_asset::asset::Id as AssetId; -use sqlx::{Decode, Encode, Postgres, Type}; - -/// An extension trait to make it easier to implement serialization for existing Penumbra types. -/// -/// Types that implement this trait can then be shoved into [Sql] and passed along -/// to the various sqlx functions. -pub trait SqlExt: Clone + Sized { - type SqlT; - - fn to_sql_type(&self) -> Self::SqlT; - fn from_sql_type(value: Self::SqlT) -> anyhow::Result; -} - -/// A wrapper over `T` allowing for SQL serialization and deserialization. -/// -/// When `T` implements [SqlExt] then this type will be encodeable and decodeable -/// from a Postgres database. -pub struct Sql(T); - -impl Sql { - #[allow(dead_code)] - pub fn into(self) -> T { - self.0 - } -} - -impl From for Sql { - fn from(value: T) -> Self { - Self(value) - } -} - -impl<'q, T> Encode<'q, Postgres> for Sql -where - T: SqlExt, - T::SqlT: Encode<'q, Postgres>, -{ - fn encode_by_ref( - &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { - ::to_sql_type(&self.0).encode_by_ref(buf) - } -} - -impl<'q, T> Decode<'q, Postgres> for Sql -where - T: SqlExt, - T::SqlT: Decode<'q, Postgres>, -{ - fn decode( - value: >::ValueRef, - ) -> Result { - let sql_t = ::SqlT::decode(value)?; - let t = T::from_sql_type(sql_t) - .map_err(|e| Box::::from(e))?; - Ok(Sql(t)) - } -} - -impl Type for Sql -where - T: SqlExt, - T::SqlT: Type, -{ - fn type_info() -> ::TypeInfo { - <[u8; 32]>::type_info() - } -} - -impl SqlExt for AssetId { - type SqlT = [u8; 32]; - - fn to_sql_type(&self) -> Self::SqlT { - self.to_bytes() - } - - fn from_sql_type(value: Self::SqlT) -> anyhow::Result { - Ok(AssetId::try_from(value.as_slice())?) - } -}