Skip to content

Commit

Permalink
Implement AsyncIterator and PurgeableAsyncIterator (#27)
Browse files Browse the repository at this point in the history
* super simple implementation

* fix test

* extend down stack and with specular batches

* fix linting

* missed unused import

* final lints that were missed

* really final lint fix

* fix issues clippy brought up
  • Loading branch information
chalkandpaste authored May 14, 2024
1 parent ea7f8a5 commit b053c0d
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 76 deletions.
8 changes: 8 additions & 0 deletions src/derive/async_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use async_trait::async_trait;

#[async_trait]
pub trait AsyncIterator {
type Item;

async fn next(&mut self) -> Option<Self::Item>;
}
24 changes: 14 additions & 10 deletions src/derive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{mpsc, Arc, RwLock};

use async_trait::async_trait;
use eyre::Result;

use crate::specular::stages::{
Expand All @@ -8,6 +9,7 @@ use crate::specular::stages::{
use crate::{config::Config, engine::PayloadAttributes};

use self::{
async_iterator::AsyncIterator,
stages::{
attributes::Attributes,
batcher_transactions::{BatcherTransactionMessage, BatcherTransactions},
Expand All @@ -17,34 +19,36 @@ use self::{
state::State,
};

pub mod async_iterator;
pub mod stages;
pub mod state;

mod purgeable;
pub use purgeable::PurgeableIterator;
pub use purgeable::PurgeableAsyncIterator;

pub struct Pipeline {
batcher_transaction_sender: mpsc::Sender<BatcherTransactionMessage>,
attributes: Attributes,
pending_attributes: Option<PayloadAttributes>,
}

impl Iterator for Pipeline {
#[async_trait]
impl AsyncIterator for Pipeline {
type Item = PayloadAttributes;

fn next(&mut self) -> Option<Self::Item> {
async fn next(&mut self) -> Option<Self::Item> {
if self.pending_attributes.is_some() {
self.pending_attributes.take()
} else {
self.attributes.next()
self.attributes.next().await
}
}
}

impl Pipeline {
pub fn new(state: Arc<RwLock<State>>, config: Arc<Config>, seq: u64) -> Result<Self> {
let (tx, rx) = mpsc::channel();
let batch_iter: Box<dyn PurgeableIterator<Item = Batch>> =
let batch_iter: Box<dyn PurgeableAsyncIterator<Item = Batch> + Send> =
if config.chain.meta.enable_full_derivation {
let batcher_transactions = BatcherTransactions::new(rx);
let channels = Channels::new(batcher_transactions, config.clone());
Expand All @@ -71,17 +75,17 @@ impl Pipeline {
Ok(())
}

pub fn peek(&mut self) -> Option<&PayloadAttributes> {
pub async fn peek(&mut self) -> Option<&PayloadAttributes> {
if self.pending_attributes.is_none() {
let next_attributes = self.next();
let next_attributes = self.next().await;
self.pending_attributes = next_attributes;
}

self.pending_attributes.as_ref()
}

pub fn purge(&mut self) -> Result<()> {
self.attributes.purge();
pub async fn purge(&mut self) -> Result<()> {
self.attributes.purge().await;
Ok(())
}
}
Expand Down Expand Up @@ -159,7 +163,7 @@ mod tests {

state.write().unwrap().update_l1_info(l1_info);

if let Some(payload) = pipeline.next() {
if let Some(payload) = pipeline.next().await {
let hashes = get_tx_hashes(&payload.transactions.unwrap());
let expected_hashes = get_expected_hashes(config.chain.l2_genesis.number + 1).await;

Expand Down
10 changes: 7 additions & 3 deletions src/derive/purgeable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/// Iterator that can purge itself
pub trait PurgeableIterator: Iterator {
fn purge(&mut self);
use crate::derive::async_iterator::AsyncIterator;
use async_trait::async_trait;

/// AsyncIterator that can purge itself
#[async_trait]
pub trait PurgeableAsyncIterator: AsyncIterator {
async fn purge(&mut self);
}
21 changes: 13 additions & 8 deletions src/derive/stages/attributes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use ethers::abi::{decode, encode, ParamType, Token};
use ethers::types::{Address, Log, H256, U256, U64};
use ethers::utils::{keccak256, rlp::Encodable, rlp::RlpStream};
Expand All @@ -8,42 +9,46 @@ use eyre::Result;

use crate::common::{Epoch, RawTransaction};
use crate::config::{Config, SystemAccounts};
use crate::derive::async_iterator::AsyncIterator;
use crate::derive::state::State;
use crate::derive::PurgeableIterator;
use crate::derive::PurgeableAsyncIterator;
use crate::engine::PayloadAttributes;
use crate::l1::L1Info;

use super::batches::Batch;

pub struct Attributes {
batch_iter: Box<dyn PurgeableIterator<Item = Batch>>,
batch_iter: Box<dyn PurgeableAsyncIterator<Item = Batch> + Send>,
state: Arc<RwLock<State>>,
sequence_number: u64,
epoch_hash: H256,
config: Arc<Config>,
}

impl Iterator for Attributes {
#[async_trait]
impl AsyncIterator for Attributes {
type Item = PayloadAttributes;

fn next(&mut self) -> Option<Self::Item> {
async fn next(&mut self) -> Option<Self::Item> {
self.batch_iter
.next()
.await
.map(|batch| self.derive_attributes(batch))
}
}

impl PurgeableIterator for Attributes {
fn purge(&mut self) {
self.batch_iter.purge();
#[async_trait]
impl PurgeableAsyncIterator for Attributes {
async fn purge(&mut self) {
self.batch_iter.purge().await;
self.sequence_number = 0;
self.epoch_hash = self.state.read().unwrap().safe_epoch.hash;
}
}

impl Attributes {
pub fn new(
batch_iter: Box<dyn PurgeableIterator<Item = Batch>>,
batch_iter: Box<dyn PurgeableAsyncIterator<Item = Batch> + Send>,
state: Arc<RwLock<State>>,
config: Arc<Config>,
seq: u64,
Expand Down
14 changes: 9 additions & 5 deletions src/derive/stages/batcher_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::mpsc;

use async_trait::async_trait;
use eyre::Result;
use std::collections::VecDeque;

use crate::derive::PurgeableIterator;
use crate::derive::async_iterator::AsyncIterator;
use crate::derive::PurgeableAsyncIterator;

pub struct BatcherTransactionMessage {
pub txs: Vec<Vec<u8>>,
Expand All @@ -15,17 +17,19 @@ pub struct BatcherTransactions {
transaction_rx: mpsc::Receiver<BatcherTransactionMessage>,
}

impl Iterator for BatcherTransactions {
#[async_trait]
impl AsyncIterator for BatcherTransactions {
type Item = BatcherTransaction;

fn next(&mut self) -> Option<Self::Item> {
async fn next(&mut self) -> Option<Self::Item> {
self.process_incoming();
self.txs.pop_front()
}
}

impl PurgeableIterator for BatcherTransactions {
fn purge(&mut self) {
#[async_trait]
impl PurgeableAsyncIterator for BatcherTransactions {
async fn purge(&mut self) {
// drain the channel first
while self.transaction_rx.try_recv().is_ok() {}
self.txs.clear();
Expand Down
29 changes: 16 additions & 13 deletions src/derive/stages/batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use std::io::Read;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use ethers::types::H256;
use ethers::utils::rlp::{DecoderError, Rlp};

Expand All @@ -12,8 +13,9 @@ use libflate::zlib::Decoder;

use crate::common::RawTransaction;
use crate::config::Config;
use crate::derive::async_iterator::AsyncIterator;
use crate::derive::state::State;
use crate::derive::PurgeableIterator;
use crate::derive::PurgeableAsyncIterator;

use super::channels::Channel;

Expand All @@ -25,26 +27,28 @@ pub struct Batches<I> {
config: Arc<Config>,
}

impl<I> Iterator for Batches<I>
#[async_trait]
impl<I> AsyncIterator for Batches<I>
where
I: Iterator<Item = Channel>,
I: AsyncIterator<Item = Channel> + Send,
{
type Item = Batch;

fn next(&mut self) -> Option<Self::Item> {
self.try_next().unwrap_or_else(|_| {
async fn next(&mut self) -> Option<Self::Item> {
self.try_next().await.unwrap_or_else(|_| {
tracing::debug!("Failed to decode batch");
None
})
}
}

impl<I> PurgeableIterator for Batches<I>
#[async_trait]
impl<I> PurgeableAsyncIterator for Batches<I>
where
I: PurgeableIterator<Item = Channel>,
I: PurgeableAsyncIterator<Item = Channel> + Send,
{
fn purge(&mut self) {
self.channel_iter.purge();
async fn purge(&mut self) {
self.channel_iter.purge().await;
self.batches.clear();
}
}
Expand All @@ -62,11 +66,10 @@ impl<I> Batches<I> {

impl<I> Batches<I>
where
I: Iterator<Item = Channel>,
I: AsyncIterator<Item = Channel> + Send,
{
fn try_next(&mut self) -> Result<Option<Batch>> {
let channel = self.channel_iter.next();
if let Some(channel) = channel {
async fn try_next(&mut self) -> Result<Option<Batch>> {
if let Some(channel) = self.channel_iter.next().await {
let batches = decode_batches(&channel)?;
batches.into_iter().for_each(|batch| {
tracing::debug!(
Expand Down
36 changes: 20 additions & 16 deletions src/derive/stages/channels.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::sync::Arc;

use async_trait::async_trait;

use super::batcher_transactions::{BatcherTransaction, Frame};
use crate::{config::Config, derive::PurgeableIterator};
use crate::{
config::Config, derive::async_iterator::AsyncIterator, derive::PurgeableAsyncIterator,
};

pub struct Channels<I> {
batcher_tx_iter: I,
Expand All @@ -15,23 +19,25 @@ pub struct Channels<I> {
channel_timeout: u64,
}

impl<I> Iterator for Channels<I>
#[async_trait]
impl<I> AsyncIterator for Channels<I>
where
I: Iterator<Item = BatcherTransaction>,
I: AsyncIterator<Item = BatcherTransaction> + Send,
{
type Item = Channel;

fn next(&mut self) -> Option<Self::Item> {
self.process_frames()
async fn next(&mut self) -> Option<Self::Item> {
self.process_frames().await
}
}

impl<I> PurgeableIterator for Channels<I>
#[async_trait]
impl<I> PurgeableAsyncIterator for Channels<I>
where
I: PurgeableIterator<Item = BatcherTransaction>,
I: PurgeableAsyncIterator<Item = BatcherTransaction> + Send,
{
fn purge(&mut self) {
self.batcher_tx_iter.purge();
async fn purge(&mut self) {
self.batcher_tx_iter.purge().await;
self.pending_channels.clear();
self.frame_bank.clear();
}
Expand All @@ -51,7 +57,7 @@ impl<I> Channels<I> {

impl<I> Channels<I>
where
I: Iterator<Item = BatcherTransaction>,
I: AsyncIterator<Item = BatcherTransaction> + Send,
{
/// Pushes a frame into the correct pending channel
fn push_frame(&mut self, frame: Frame) {
Expand All @@ -76,10 +82,8 @@ where
}

/// Pull the next batcher transaction from the [BatcherTransactions] stage
fn fill_bank(&mut self) {
let next_batcher_tx = self.batcher_tx_iter.next();

if let Some(tx) = next_batcher_tx {
async fn fill_bank(&mut self) {
if let Some(tx) = self.batcher_tx_iter.next().await {
self.frame_bank.append(&mut tx.frames.to_vec());
}
}
Expand All @@ -98,8 +102,8 @@ where
}

/// Processes frames until there are either none left or a channel is ready
fn process_frames(&mut self) -> Option<Channel> {
self.fill_bank();
async fn process_frames(&mut self) -> Option<Channel> {
self.fill_bank().await;

while !self.frame_bank.is_empty() {
// Append the frame to the channel
Expand Down
6 changes: 3 additions & 3 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
use crate::{
common::{BlockInfo, Epoch},
config::Config,
derive::{state::State, Pipeline},
derive::{async_iterator::AsyncIterator, state::State, Pipeline},
engine::{Engine, EngineApi, ExecutionPayload},
l1::{BlockUpdate, ChainWatcher},
network::{handlers::block_handler::BlockHandler, service::Service},
Expand Down Expand Up @@ -218,7 +218,7 @@ impl<E: Engine> Driver<E> {
self.handle_next_block_update().await?;
self.update_state_head().await?;

for next_attributes in self.pipeline.by_ref() {
while let Some(next_attributes) = self.pipeline.next().await {
let l1_inclusion_block = next_attributes
.l1_inclusion_block
.ok_or(eyre::eyre!("attributes without inclusion block"))?;
Expand Down Expand Up @@ -345,7 +345,7 @@ impl<E: Engine> Driver<E> {
.map_err(|_| eyre::eyre!("lock poisoned"))?
.purge(engine_driver.finalized_head, engine_driver.finalized_epoch);

self.pipeline.purge()?;
self.pipeline.purge().await?;
engine_driver.reorg();
}
BlockUpdate::FinalityUpdate(num) => {
Expand Down
Loading

0 comments on commit b053c0d

Please sign in to comment.