Skip to content

Commit

Permalink
program_transformers: remove plerkle
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Feb 21, 2024
1 parent 9e3f11a commit f7728ee
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 115 deletions.
33 changes: 16 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ clone_on_ref_ptr = "deny"
missing_const_for_fn = "deny"
trivially_copy_pass_by_ref = "deny"

[patch.crates-io]
blockbuster = { git = "https://github.com/rpcpool/blockbuster.git", tag = "blockbuster-v1.0.1-no-plerkle" }

[profile.release]
codegen-units = 1
lto = true
35 changes: 31 additions & 4 deletions integration_tests/tests/integration_tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ use migration::sea_orm::{
use migration::{Migrator, MigratorTrait};
use mpl_token_metadata::accounts::Metadata;

use nft_ingester::config;
use nft_ingester::{
config,
plerkle::{
parse_account_keys, parse_message_instructions, parse_meta_inner_instructions,
parse_pubkey, parse_signature, parse_slice,
},
};
use once_cell::sync::Lazy;
use plerkle_serialization::root_as_account_info;
use plerkle_serialization::root_as_transaction_info;
use plerkle_serialization::serializer::serialize_account;
use plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2;
use program_transformers::ProgramTransformer;
use program_transformers::{AccountInfo, ProgramTransformer, TransactionInfo};

use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
Expand Down Expand Up @@ -356,7 +362,12 @@ pub async fn index_account_bytes(setup: &TestSetup, account_bytes: Vec<u8>) {

setup
.transformer
.handle_account_update(account)
.handle_account_update(&AccountInfo {
slot: account.slot(),
pubkey: &parse_pubkey(account.pubkey()).expect("failed to parse account"),
owner: &parse_pubkey(account.owner()).expect("failed to parse account"),
data: parse_slice(account.data()).expect("failed to parse account"),
})
.await
.unwrap();
}
Expand Down Expand Up @@ -419,7 +430,23 @@ async fn cached_fetch_transaction(setup: &TestSetup, sig: Signature) -> Vec<u8>
pub async fn index_transaction(setup: &TestSetup, sig: Signature) {
let txn_bytes: Vec<u8> = cached_fetch_transaction(setup, sig).await;
let txn = root_as_transaction_info(&txn_bytes).unwrap();
setup.transformer.handle_transaction(&txn).await.unwrap();
setup
.transformer
.handle_transaction(&TransactionInfo {
slot: txn.slot(),
signature: &parse_signature(txn.signature()).expect("failed to parse transaction"),
account_keys: &parse_account_keys(txn.account_keys())
.expect("failed to parse transaction"),
message_instructions: &parse_message_instructions(txn.outer_instructions())
.expect("failed to parse transaction"),
meta_inner_instructions: &parse_meta_inner_instructions(
txn.compiled_inner_instructions(),
txn.inner_instructions(),
)
.expect("failed to parse transaction"),
})
.await
.unwrap();
}

async fn cached_fetch_largest_token_account_id(client: &RpcClient, mint: Pubkey) -> Pubkey {
Expand Down
19 changes: 17 additions & 2 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use {
crate::{
metric,
metrics::capture_result,
plerkle::{parse_pubkey, parse_slice},
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
chrono::Utc,
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_account_info,
program_transformers::ProgramTransformer,
program_transformers::{error::ProgramTransformerResult, AccountInfo, ProgramTransformer},
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
Expand Down Expand Up @@ -103,7 +104,7 @@ async fn handle_account(
account = Some(bs58::encode(pubkey.0.as_slice()).into_string());
}
let begin_processing = Instant::now();
let res = manager.handle_account_update(account_update).await;
let res = handle_account_update(manager, account_update).await;
let should_ack = capture_result(
id.clone(),
stream_key,
Expand All @@ -120,3 +121,17 @@ async fn handle_account(
}
ret_id
}

async fn handle_account_update<'a>(
manager: Arc<ProgramTransformer>,
account_update: plerkle_serialization::AccountInfo<'_>,
) -> ProgramTransformerResult<()> {
manager
.handle_account_update(&AccountInfo {
slot: account_update.slot(),
pubkey: &parse_pubkey(account_update.pubkey())?,
owner: &parse_pubkey(account_update.owner())?,
data: parse_slice(account_update.data())?,
})
.await
}
2 changes: 1 addition & 1 deletion nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
pub mod account_updates;
pub mod ack;
pub mod backfiller;
pub mod config;
pub mod database;
pub mod error;
pub mod metrics;
pub mod plerkle;
pub mod stream;
pub mod tasks;
pub mod transaction_notifications;
1 change: 1 addition & 0 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod config;
mod database;
pub mod error;
pub mod metrics;
mod plerkle;
mod stream;
pub mod tasks;
mod transaction_notifications;
Expand Down
116 changes: 116 additions & 0 deletions nft_ingester/src/plerkle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use {
flatbuffers::{ForwardsUOffset, Vector},
plerkle_serialization::{
CompiledInnerInstructions as FBCompiledInnerInstructions,
CompiledInstruction as FBCompiledInstruction, InnerInstructions as FBInnerInstructions,
Pubkey as FBPubkey,
},
program_transformers::error::{ProgramTransformerError, ProgramTransformerResult},
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::{InnerInstruction, InnerInstructions},
};

fn deser_err() -> ProgramTransformerError {
ProgramTransformerError::DeserializationError("Could not deserialize data".to_owned())
}

pub fn parse_pubkey(pubkey: Option<&FBPubkey>) -> ProgramTransformerResult<Pubkey> {
Ok(Pubkey::try_from(pubkey.ok_or_else(deser_err)?.0.as_slice())
.expect("valid key from FlatBuffer"))
}

pub fn parse_slice(data: Option<Vector<'_, u8>>) -> ProgramTransformerResult<&[u8]> {
data.map(|data| data.bytes()).ok_or_else(deser_err)
}

pub fn parse_signature(data: Option<&str>) -> ProgramTransformerResult<Signature> {
data.ok_or_else(deser_err)?
.parse()
.map_err(|_error| deser_err())
}

pub fn parse_account_keys(
keys: Option<Vector<'_, FBPubkey>>,
) -> ProgramTransformerResult<Vec<Pubkey>> {
keys.ok_or_else(deser_err).map(|keys| {
keys.iter()
.map(|key| Pubkey::try_from(key.0.as_slice()).expect("valid key from FlatBuffer"))
.collect()
})
}

pub fn parse_message_instructions(
vec_cix: Option<Vector<'_, ForwardsUOffset<FBCompiledInstruction>>>,
) -> ProgramTransformerResult<Vec<CompiledInstruction>> {
vec_cix.ok_or_else(deser_err).and_then(|vec| {
vec.iter()
.map(|cix| {
Ok(CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().ok_or_else(deser_err)?.bytes().to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
})
})
.collect()
})
}

pub fn parse_meta_inner_instructions(
vec_ciixs: Option<Vector<'_, ForwardsUOffset<FBCompiledInnerInstructions>>>,
vec_iixs: Option<Vector<'_, ForwardsUOffset<FBInnerInstructions>>>,
) -> ProgramTransformerResult<Vec<InnerInstructions>> {
if let Some(vec_ciixs) = vec_ciixs {
vec_ciixs
.iter()
.map(|ciix| {
Ok(InnerInstructions {
index: ciix.index(),
instructions: ciix
.instructions()
.ok_or_else(deser_err)?
.iter()
.map(|ix| {
let cix = ix.compiled_instruction().ok_or_else(deser_err)?;
Ok(InnerInstruction {
instruction: CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix
.accounts()
.ok_or_else(deser_err)?
.bytes()
.to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
},
stack_height: Some(ix.stack_height() as u32),
})
})
.collect::<Result<_, ProgramTransformerError>>()?,
})
})
.collect()
} else if let Some(vec_iixs) = vec_iixs {
vec_iixs
.iter()
.map(|iixs| {
Ok(InnerInstructions {
index: iixs.index(),
instructions: iixs
.instructions()
.expect("valid instructions")
.iter()
.map(|cix| InnerInstruction {
instruction: CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().expect("valid accounts").bytes().to_vec(),
data: cix.data().expect("valid data").bytes().to_vec(),
},
stack_height: Some(0),
})
.collect(),
})
})
.collect()
} else {
Err(deser_err())
}
}
Loading

0 comments on commit f7728ee

Please sign in to comment.