Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
feat(rpc) : support for eth_newPendingTransactionFilter full rpc func…
Browse files Browse the repository at this point in the history
…tion (paradigmxyz#5206)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
DoTheBestToGetTheBest and mattsse authored Oct 30, 2023
1 parent 64d5064 commit ed9b9a7
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 25 deletions.
8 changes: 5 additions & 3 deletions crates/rpc/rpc-api/src/eth_filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_rpc_types::{Filter, FilterChanges, FilterId, Log};

use reth_rpc_types::{Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind};
/// Rpc Interface for poll-based ethereum filter API.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
Expand All @@ -15,7 +14,10 @@ pub trait EthFilterApi {

/// Creates a pending transaction filter and returns its id.
#[method(name = "newPendingTransactionFilter")]
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId>;
async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId>;

/// Returns all filter changes since last poll.
#[method(name = "getFilterChanges")]
Expand Down
13 changes: 11 additions & 2 deletions crates/rpc/rpc-builder/tests/it/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use reth_rpc_api::{
Web3ApiClient,
};
use reth_rpc_builder::RethRpcModule;
use reth_rpc_types::{trace::filter::TraceFilter, CallRequest, Filter, Index, TransactionRequest};
use reth_rpc_types::{
trace::filter::TraceFilter, CallRequest, Filter, Index, PendingTransactionFilterKind,
TransactionRequest,
};
use std::collections::HashSet;

fn is_unimplemented(err: Error) -> bool {
Expand All @@ -36,7 +39,13 @@ where
C: ClientT + SubscriptionClientT + Sync,
{
EthFilterApiClient::new_filter(client, Filter::default()).await.unwrap();
EthFilterApiClient::new_pending_transaction_filter(client).await.unwrap();
EthFilterApiClient::new_pending_transaction_filter(client, None).await.unwrap();
EthFilterApiClient::new_pending_transaction_filter(
client,
Some(PendingTransactionFilterKind::Full),
)
.await
.unwrap();
let id = EthFilterApiClient::new_block_filter(client).await.unwrap();
EthFilterApiClient::filter_changes(client, id.clone()).await.unwrap();
EthFilterApiClient::logs(client, Filter::default()).await.unwrap();
Expand Down
51 changes: 49 additions & 2 deletions crates/rpc/rpc-types/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log};
use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log, Transaction};
use alloy_primitives::{keccak256, Address, Bloom, BloomInput, B256, U256, U64};
use itertools::{EitherOrBoth::*, Itertools};
use serde::{
Expand Down Expand Up @@ -820,14 +820,15 @@ impl FilteredParams {
true
}
}

/// Response of the `eth_getFilterChanges` RPC.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum FilterChanges {
/// New logs.
Logs(Vec<RpcLog>),
/// New hashes (block or transactions)
Hashes(Vec<B256>),
/// New transactions.
Transactions(Vec<Transaction>),
/// Empty result,
Empty,
}
Expand All @@ -840,6 +841,7 @@ impl Serialize for FilterChanges {
match self {
FilterChanges::Logs(logs) => logs.serialize(s),
FilterChanges::Hashes(hashes) => hashes.serialize(s),
FilterChanges::Transactions(transactions) => transactions.serialize(s),
FilterChanges::Empty => (&[] as &[serde_json::Value]).serialize(s),
}
}
Expand Down Expand Up @@ -908,6 +910,51 @@ impl From<jsonrpsee_types::SubscriptionId<'_>> for FilterId {
}
}
}
/// Specifies the kind of information you wish to receive from the `eth_newPendingTransactionFilter`
/// RPC endpoint.
///
/// When this type is used in a request, it determines whether the client wishes to receive:
/// - Only the transaction hashes (`Hashes` variant), or
/// - Full transaction details (`Full` variant).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PendingTransactionFilterKind {
/// Receive only the hashes of the transactions.
#[default]
Hashes,
/// Receive full details of the transactions.
Full,
}

impl Serialize for PendingTransactionFilterKind {
/// Serializes the `PendingTransactionFilterKind` into a boolean value:
/// - `false` for `Hashes`
/// - `true` for `Full`
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
PendingTransactionFilterKind::Hashes => false.serialize(serializer),
PendingTransactionFilterKind::Full => true.serialize(serializer),
}
}
}

impl<'a> Deserialize<'a> for PendingTransactionFilterKind {
/// Deserializes a boolean value into `PendingTransactionFilterKind`:
/// - `false` becomes `Hashes`
/// - `true` becomes `Full`
fn deserialize<D>(deserializer: D) -> Result<PendingTransactionFilterKind, D::Error>
where
D: Deserializer<'a>,
{
let val = Option::<bool>::deserialize(deserializer)?;
match val {
Some(true) => Ok(PendingTransactionFilterKind::Full),
_ => Ok(PendingTransactionFilterKind::Hashes),
}
}
}

#[cfg(test)]
mod tests {
Expand Down
1 change: 0 additions & 1 deletion crates/rpc/rpc-types/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
eth::{Filter, Transaction},
Log, RichHeader,
};

use alloy_primitives::B256;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};

Expand Down
120 changes: 103 additions & 17 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ use crate::{
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use alloy_primitives::B256;
use core::fmt;

use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_interfaces::RethError;
use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock, TxHash};
use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
use reth_rpc_types::{
Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
use std::{
collections::HashMap,
iter::StepBy,
Expand All @@ -35,7 +39,7 @@ const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500k

/// `Eth` filter RPC implementation.
pub struct EthFilter<Provider, Pool> {
/// All nested fields bundled together.
/// All nested fields bundled together
inner: Arc<EthFilterInner<Provider, Pool>>,
}

Expand Down Expand Up @@ -120,6 +124,7 @@ impl<Provider, Pool> EthFilter<Provider, Pool>
where
Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static,
Pool: TransactionPool + 'static,
<Pool as TransactionPool>::Transaction: 'static,
{
/// Returns all the filter changes for the given id, if any
pub async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges, FilterError> {
Expand Down Expand Up @@ -148,10 +153,7 @@ where
};

match kind {
FilterKind::PendingTransaction(receiver) => {
let pending_txs = receiver.drain().await;
Ok(FilterChanges::Hashes(pending_txs))
}
FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
FilterKind::Block => {
// Note: we need to fetch the block hashes from inclusive range
// [start_block..best_block]
Expand Down Expand Up @@ -235,13 +237,31 @@ where
}

/// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId> {
async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
let receiver = self.inner.pool.pending_transactions_listener();

let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
let transaction_kind = match kind.unwrap_or_default() {
PendingTransactionFilterKind::Hashes => {
let receiver = self.inner.pool.pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
}
PendingTransactionFilterKind::Full => {
let stream = self.inner.pool.new_pending_pool_transactions_listener();
let full_txs_receiver = FullTransactionsReceiver::new(stream);
FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
full_txs_receiver,
)))
}
};

//let filter = FilterKind::PendingTransaction(transaction_kind);

self.inner.install_filter(FilterKind::PendingTransaction(pending_txs_receiver)).await
// Install the filter and propagate any errors
self.inner.install_filter(transaction_kind).await
}

/// Handler for `eth_getFilterChanges`
Expand Down Expand Up @@ -490,24 +510,90 @@ impl PendingTransactionsReceiver {
}

/// Returns all new pending transactions received since the last poll.
async fn drain(&self) -> Vec<B256> {
async fn drain(&self) -> FilterChanges {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_receiver.lock().await;

while let Ok(tx_hash) = prepared_stream.try_recv() {
pending_txs.push(tx_hash);
}
pending_txs

// Convert the vector of hashes into FilterChanges::Hashes
FilterChanges::Hashes(pending_txs)
}
}

/// A structure to manage and provide access to a stream of full transaction details.
#[derive(Debug, Clone)]
struct FullTransactionsReceiver<T: PoolTransaction> {
txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
}

impl<T> FullTransactionsReceiver<T>
where
T: PoolTransaction + 'static,
{
/// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
fn new(stream: NewSubpoolTransactionStream<T>) -> Self {
FullTransactionsReceiver { txs_stream: Arc::new(Mutex::new(stream)) }
}

/// Returns all new pending transactions received since the last poll.
async fn drain(&self) -> FilterChanges {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_stream.lock().await;

while let Ok(tx) = prepared_stream.try_recv() {
pending_txs.push(reth_rpc_types_compat::transaction::from_recovered(
tx.transaction.to_recovered_transaction(),
))
}
FilterChanges::Transactions(pending_txs)
}
}

/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type.
#[async_trait]
trait FullTransactionsFilter: fmt::Debug + Send + Sync + Unpin + 'static {
async fn drain(&self) -> FilterChanges;
}

#[async_trait]
impl<T> FullTransactionsFilter for FullTransactionsReceiver<T>
where
T: PoolTransaction + 'static,
{
async fn drain(&self) -> FilterChanges {
FullTransactionsReceiver::drain(self).await
}
}

/// Represents the kind of pending transaction data that can be retrieved.
///
/// This enum differentiates between two kinds of pending transaction data:
/// - Just the transaction hashes.
/// - Full transaction details.
#[derive(Debug, Clone)]
enum PendingTransactionKind {
Hashes(PendingTransactionsReceiver),
FullTransaction(Arc<dyn FullTransactionsFilter>),
}

impl PendingTransactionKind {
async fn drain(&self) -> FilterChanges {
match self {
PendingTransactionKind::Hashes(receiver) => receiver.drain().await,
PendingTransactionKind::FullTransaction(receiver) => receiver.drain().await,
}
}
}

#[derive(Clone, Debug)]
enum FilterKind {
Log(Box<Filter>),
Block,
PendingTransaction(PendingTransactionsReceiver),
PendingTransaction(PendingTransactionKind),
}

/// Errors that can occur in the handler implementation
#[derive(Debug, thiserror::Error)]
pub enum FilterError {
Expand Down
16 changes: 16 additions & 0 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,22 @@ impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
pub fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
Self { st, subpool }
}

/// Tries to receive the next value for this stream.
pub fn try_recv(
&mut self,
) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
loop {
match self.st.try_recv() {
Ok(event) => {
if event.subpool == self.subpool {
return Ok(event)
}
}
Err(e) => return Err(e),
}
}
}
}

impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
Expand Down

0 comments on commit ed9b9a7

Please sign in to comment.