Skip to content

Commit

Permalink
pindexer: implement batch processing API
Browse files Browse the repository at this point in the history
  • Loading branch information
cronokirby committed Nov 1, 2024
1 parent 37f604f commit 1d93574
Show file tree
Hide file tree
Showing 21 changed files with 886 additions and 760 deletions.
49 changes: 26 additions & 23 deletions crates/bin/pindexer/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction};
use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent};
use sqlx::{types::chrono::DateTime, PgPool};
use sqlx::types::chrono::DateTime;

#[derive(Debug)]
pub struct Block {}

#[async_trait]
impl AppView for Block {
fn name(&self) -> String {
"block".to_string()
}

async fn init_chain(
&self,
dbtx: &mut PgTransaction,
Expand All @@ -27,34 +31,33 @@ CREATE TABLE IF NOT EXISTS block_details (
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.sct.v1.EventBlockRoot"
}

async fn index_event(
async fn index_batch(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
batch: EventBatch,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBlockRoot::from_event(event.as_ref())?;
let timestamp = pe.timestamp.unwrap_or_default();
for event in batch.events() {
let pe = match pb::EventBlockRoot::from_event(event.as_ref()) {
Ok(pe) => pe,
Err(_) => continue,
};
let timestamp = pe.timestamp.unwrap_or_default();

sqlx::query(
"
sqlx::query(
"
INSERT INTO block_details (height, timestamp, root)
VALUES ($1, $2, $3)
",
)
.bind(i64::try_from(pe.height)?)
.bind(DateTime::from_timestamp(
timestamp.seconds,
u32::try_from(timestamp.nanos)?,
))
.bind(pe.root.unwrap().inner)
.execute(dbtx.as_mut())
.await?;

)
.bind(i64::try_from(pe.height)?)
.bind(DateTime::from_timestamp(
timestamp.seconds,
u32::try_from(timestamp.nanos)?,
))
.bind(pe.root.unwrap().inner)
.execute(dbtx.as_mut())
.await?;
}
Ok(())
}
}
60 changes: 31 additions & 29 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use std::fmt::Display;

use anyhow::{anyhow, Context};
use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc};
use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction};
use penumbra_asset::asset;
use penumbra_dex::{event::EventCandlestickData, CandlestickData};
use penumbra_proto::{event::EventDomainType, DomainType};
use penumbra_sct::event::EventBlockRoot;
use prost::Name as _;
use sqlx::PgPool;

type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;

Expand Down Expand Up @@ -453,35 +451,11 @@ impl Component {
pub fn new() -> Self {
Self {}
}
}

#[async_trait]
impl AppView for Component {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_: &serde_json::Value,
) -> Result<(), anyhow::Error> {
for statement in include_str!("schema.sql").split(";") {
sqlx::query(statement).execute(dbtx.as_mut()).await?;
}
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
[
<EventCandlestickData as DomainType>::Proto::full_name(),
<EventBlockRoot as DomainType>::Proto::full_name(),
]
.into_iter()
.any(|x| type_str == x)
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
let height = event.block_height;
Expand All @@ -504,7 +478,35 @@ impl AppView for Component {
}
summary::update_all(dbtx, time).await?;
}
tracing::debug!(?event, "unrecognized event");
Ok(())
}
}

#[async_trait]
impl AppView for Component {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_: &serde_json::Value,
) -> Result<(), anyhow::Error> {
for statement in include_str!("schema.sql").split(";") {
sqlx::query(statement).execute(dbtx.as_mut()).await?;
}
Ok(())
}

fn name(&self) -> String {
"dex_ex".to_string()
}

async fn index_batch(
&self,
dbtx: &mut PgTransaction,
batch: EventBatch,
) -> Result<(), anyhow::Error> {
for event in batch.events() {
self.index_event(dbtx, event).await?;
}
Ok(())
}
}
Loading

0 comments on commit 1d93574

Please sign in to comment.