Skip to content

Commit

Permalink
backport from metaplex
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Oct 6, 2023
1 parent 2e336b7 commit 5ea4c05
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 96 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[workspace]
members = [
"plerkle_messenger", # 1.5.3
"plerkle", # 1.5.3+solana.1.16.15
"plerkle_serialization", # 1.5.3+solana.1.16.15
"plerkle", # 1.6.0+solana.1.16.15
"plerkle_messenger", # 1.6.0
"plerkle_serialization", # 1.6.0+solana.1.16.15
]

[profile.release]
Expand Down
6 changes: 3 additions & 3 deletions plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "1.5.3+solana.1.16.15"
version = "1.6.0+solana.1.16.15"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand All @@ -26,8 +26,8 @@ flatbuffers = "23.1.21"
hex = "0.4.3"
lazy_static = "1.4.0"
log = "0.4.17"
plerkle_messenger = { path = "../plerkle_messenger", version = "1.5.2", features = ["redis"] }
plerkle_serialization = { path = "../plerkle_serialization", version = "1.5.2" }
plerkle_messenger = { path = "../plerkle_messenger", version = "1.6.0", features = ["redis"] }
plerkle_serialization = { path = "../plerkle_serialization", version = "1.6.0" }
serde = "1.0.144"
serde_derive = "1.0.103"
serde_json = "1.0.83"
Expand Down
12 changes: 12 additions & 0 deletions plerkle/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use thiserror::Error;

use solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError;

#[derive(Error, Debug)]
pub enum PlerkleError {
#[error("General Plugin Config Error ({msg})")]
Expand All @@ -19,4 +21,14 @@ pub enum PlerkleError {

#[error("Unable to Send Event to Stream ({msg})")]
EventStreamError { msg: String },

#[error("Unable to acquire lock for updating slots seen. Error message: ({msg})")]
SlotsSeenLockError { msg: String },
}

// Implement the From trait for the PlerkleError to convert it into GeyserPluginError
impl From<PlerkleError> for GeyserPluginError {
fn from(err: PlerkleError) -> Self {
GeyserPluginError::Custom(Box::new(err))
}
}
106 changes: 60 additions & 46 deletions plerkle/src/geyser_plugin_nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ use solana_geyser_plugin_interface::geyser_plugin_interface::{
use solana_sdk::{message::AccountKeys, pubkey::Pubkey, signature::Signature};
use std::{
collections::BTreeSet,
convert::TryFrom,
fmt::{Debug, Formatter},
fs::File,
io::Read,
net::UdpSocket,
ops::Bound::Included,
ops::RangeBounds,
ops::{Bound::Included, RangeBounds},
sync::{Arc, Mutex},
};
use tokio::{
Expand Down Expand Up @@ -89,7 +87,6 @@ impl SlotStore {
}
}

#[allow(clippy::type_complexity)]
#[derive(Default)]
pub(crate) struct Plerkle<'a> {
runtime: Option<Runtime>,
Expand All @@ -98,13 +95,54 @@ pub(crate) struct Plerkle<'a> {
sender: Option<UnboundedSender<SerializedData<'a>>>,
started_at: Option<Instant>,
handle_startup: bool,
slots_seen: Mutex<SlotStore>,
slots_seen: Arc<Mutex<SlotStore>>,
#[allow(clippy::type_complexity)]
account_event_cache: Arc<DashMap<u64, DashMap<Pubkey, (u64, SerializedData<'a>)>>>,
#[allow(clippy::type_complexity)]
transaction_event_cache: Arc<DashMap<u64, DashMap<Signature, (u64, SerializedData<'a>)>>>,
conf_level: Option<SlotStatus>,
}

#[derive(Deserialize, PartialEq, Eq, Debug)]
trait PlerklePrivateMethods {
fn get_plerkle_block_info<'b>(
&self,
blockinfo: ReplicaBlockInfoVersions<'b>,
) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b>;
}

impl<'a> PlerklePrivateMethods for Plerkle<'a> {
fn get_plerkle_block_info<'b>(
&self,
blockinfo: ReplicaBlockInfoVersions<'b>,
) -> plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2<'b> {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(block_info) => {
plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 {
parent_slot: 0,
parent_blockhash: "",
slot: block_info.slot,
blockhash: block_info.blockhash,
block_time: block_info.block_time,
block_height: block_info.block_height,
executed_transaction_count: 0,
}
}
ReplicaBlockInfoVersions::V0_0_2(block_info) => {
plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 {
parent_slot: 0,
parent_blockhash: "",
slot: block_info.slot,
blockhash: block_info.blockhash,
block_time: block_info.block_time,
block_height: block_info.block_height,
executed_transaction_count: 0,
}
}
}
}
}

#[derive(Deserialize, PartialEq, Debug)]
pub enum ConfirmationLevel {
Processed,
Rooted,
Expand Down Expand Up @@ -145,7 +183,7 @@ impl<'a> Plerkle<'a> {
sender: None,
started_at: None,
handle_startup: false,
slots_seen: Mutex::new(SlotStore::new()),
slots_seen: Arc::new(Mutex::new(SlotStore::new())),
account_event_cache: Arc::new(DashMap::new()),
transaction_event_cache: Arc::new(DashMap::new()),
conf_level: None,
Expand Down Expand Up @@ -527,10 +565,13 @@ impl GeyserPlugin for Plerkle<'static> {
status: SlotStatus,
) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
info!("Slot status update: {:?} {:?}", slot, status);
let mut slots_seen = self.slots_seen.lock().unwrap();
if status == SlotStatus::Processed {
if let Some(parent) = parent {
slots_seen.insert(parent);
let mut seen = self
.slots_seen
.lock()
.map_err(|e| PlerkleError::SlotsSeenLockError { msg: e.to_string() })?;
seen.insert(parent)
}
}
if status == self.get_confirmation_level() {
Expand All @@ -557,11 +598,15 @@ impl GeyserPlugin for Plerkle<'static> {
}
}

let slots_to_purge = slots_seen.needs_purge(slot);
let mut seen: std::sync::MutexGuard<'_, SlotStore> = self
.slots_seen
.lock()
.map_err(|e| PlerkleError::SlotsSeenLockError { msg: e.to_string() })?;
let slots_to_purge = seen.needs_purge(slot);
if let Some(purgable) = slots_to_purge {
debug!("Purging slots: {:?}", purgable);
for slot in &purgable {
slots_seen.remove(*slot);
seen.remove(*slot);
}

let cl = self.account_event_cache.clone();
Expand Down Expand Up @@ -662,48 +707,17 @@ impl GeyserPlugin for Plerkle<'static> {
Ok(())
}

fn notify_block_metadata(
&self,
blockinfo: ReplicaBlockInfoVersions,
) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
let seen = Instant::now();
let plerkle_blockinfo = self.get_plerkle_block_info(blockinfo);

// Get runtime and sender channel.
let runtime = self.get_runtime()?;
let sender = self.get_sender_clone()?;

// Serialize data.
let rep: plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2;
let builder = FlatBufferBuilder::new();

let block_info = match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(block_info) => {
// Hope to remove this when coupling is not an issue.
rep = plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 {
parent_slot: 0,
parent_blockhash: "",
slot: block_info.slot,
blockhash: block_info.blockhash,
block_time: block_info.block_time,
block_height: block_info.block_height,
executed_transaction_count: 0,
};
&rep
}
ReplicaBlockInfoVersions::V0_0_2(block_info) => {
rep = plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaBlockInfoV2 {
parent_slot: block_info.parent_slot,
parent_blockhash: block_info.parent_blockhash,
slot: block_info.slot,
blockhash: block_info.blockhash,
block_time: block_info.block_time,
block_height: block_info.block_height,
executed_transaction_count: block_info.executed_transaction_count,
};
&rep
}
};

let builder = serialize_block(builder, block_info);
let builder = serialize_block(builder, &plerkle_blockinfo);

// Send block info over channel.
runtime.spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "1.5.3"
version = "1.6.0"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
4 changes: 2 additions & 2 deletions plerkle_serialization/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_serialization"
description = "Metaplex Flatbuffers Plerkle Serialization for Geyser plugin producer/consumer patterns."
version = "1.5.3+solana.1.16.15"
version = "1.6.0+solana.1.16.15"
authors = ["Metaplex Developers <[email protected]>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand All @@ -13,7 +13,7 @@ publish = false
bs58 = "0.4.0"
chrono = "0.4.22"
flatbuffers = "23.1.21"
serde = { version = "1.0.152"}
serde = "1.0.152"
solana-sdk = "=1.16.15"
solana-transaction-status = "=1.16.15"
thiserror = "1.0.32"
Expand Down
4 changes: 2 additions & 2 deletions plerkle_serialization/src/account_info_generated.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify

// @generated

use crate::common_generated::*;
use core::cmp::Ordering;
use core::mem;
use core::{cmp::Ordering, mem};

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
Expand Down
4 changes: 2 additions & 2 deletions plerkle_serialization/src/block_info_generated.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify

// @generated

use core::cmp::Ordering;
use core::mem;
use core::{cmp::Ordering, mem};

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
Expand Down
4 changes: 2 additions & 2 deletions plerkle_serialization/src/common_generated.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify

// @generated

use core::cmp::Ordering;
use core::mem;
use core::{cmp::Ordering, mem};

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
Expand Down
4 changes: 2 additions & 2 deletions plerkle_serialization/src/compiled_instruction_generated.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify

// @generated

use crate::common_generated::*;
use core::cmp::Ordering;
use core::mem;
use core::{cmp::Ordering, mem};

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
Expand Down
Loading

0 comments on commit 5ea4c05

Please sign in to comment.