diff --git a/Cargo.lock b/Cargo.lock index 727655059..9a06129d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8595,7 +8595,7 @@ dependencies = [ "anyhow", "ark-ff", "ark-serialize", - "async-broadcast", + "async-channel 2.3.1", "async-lock 3.4.0", "async-once-cell", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 011701805..12738fd66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ ark-poly = "0.4" ark-serialize = "0.4" ark-srs = "0.3.1" async-broadcast = "0.7.0" +async-channel = "2" async-lock = "3" async-once-cell = "0.5" async-trait = "0.1" diff --git a/justfile b/justfile index 5d7521a92..f2badd3a5 100644 --- a/justfile +++ b/justfile @@ -10,13 +10,21 @@ demo *args: demo-native *args: build scripts/demo-native {{args}} -build: +lint: + #!/usr/bin/env bash + set -euxo pipefail + # Use the same target dir for both `clippy` invocations + export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target} + cargo clippy --workspace --features testing --all-targets -- -D warnings + cargo clippy --workspace --all-targets --manifest-path sequencer-sqlite/Cargo.toml -- -D warnings + +build profile="test": #!/usr/bin/env bash set -euxo pipefail # Use the same target dir for both `build` invocations export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target} - cargo build --profile test - cargo build --profile test --manifest-path ./sequencer-sqlite/Cargo.toml + cargo build --profile {{profile}} + cargo build --profile {{profile}} --manifest-path ./sequencer-sqlite/Cargo.toml demo-native-mp *args: build scripts/demo-native -f process-compose.yaml -f process-compose-mp.yml {{args}} diff --git a/sequencer-sqlite/Cargo.lock b/sequencer-sqlite/Cargo.lock index 1b0040642..b2f2e85e6 100644 --- a/sequencer-sqlite/Cargo.lock +++ b/sequencer-sqlite/Cargo.lock @@ -8319,7 +8319,7 @@ dependencies = [ "anyhow", "ark-ff", "ark-serialize", - "async-broadcast", + "async-channel 2.3.1", "async-lock 3.4.0", "async-once-cell", "async-trait", diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 5fb19d8f6..f9948fc79 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -43,7 +43,7 @@ vergen = { workspace = true } anyhow = { workspace = true } ark-ff = { workspace = true } ark-serialize = { workspace = true, features = ["derive"] } -async-broadcast = { workspace = true } +async-channel = { workspace = true } async-lock = { workspace = true } async-once-cell = { workspace = true } async-trait = { workspace = true } diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 06e27e3b1..d0cbfe61e 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,13 +1,9 @@ use std::{fmt::Display, sync::Arc}; use anyhow::Context; -use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLock; -use clap::Parser; -use committable::Commitment; use derivative::Derivative; use espresso_types::{ - parse_duration, v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, NodeState, PubKey, Transaction, ValidatedState, }; @@ -22,32 +18,26 @@ use hotshot::{ }; use hotshot_events_service::events_source::{EventConsumer, EventsStreamer}; use parking_lot::Mutex; -use tokio::{ - spawn, - task::JoinHandle, - time::{sleep, timeout}, -}; +use tokio::{spawn, task::JoinHandle}; use hotshot_orchestrator::client::OrchestratorClient; use hotshot_types::{ consensus::ConsensusMetricsValue, - data::{EpochNumber, Leaf2, ViewNumber}, + data::{Leaf2, ViewNumber}, network::NetworkConfig, traits::{ metrics::Metrics, network::ConnectedNetwork, - node_implementation::{ConsensusTime, NodeType, Versions}, - ValidatedState as _, + node_implementation::{NodeType, Versions}, }, - utils::{View, ViewInner}, PeerConfig, ValidatorConfig, }; -use std::time::Duration; use tracing::{Instrument, Level}; use url::Url; use crate::{ external_event_handler::{self, ExternalEventHandler}, + proposal_fetcher::ProposalFetcherConfig, state_signature::StateSigner, static_stake_table_commitment, Node, SeqTypes, SequencerApiVersion, }; @@ -55,37 +45,6 @@ use crate::{ /// The consensus handle pub type Consensus = SystemContextHandle, V>; -#[derive(Clone, Copy, Debug, Parser)] -pub struct ProposalFetcherConfig { - #[clap( - long = "proposal-fetcher-num-workers", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", - default_value = "2" - )] - pub num_workers: usize, - - #[clap( - long = "proposal-fetcher-channel-capacity", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", - default_value = "100" - )] - pub channel_capacity: usize, - - #[clap( - long = "proposal-fetcher-fetch-timeout", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", - default_value = "2s", - value_parser = parse_duration, - )] - pub fetch_timeout: Duration, -} - -impl Default for ProposalFetcherConfig { - fn default() -> Self { - Self::parse_from(std::iter::empty::()) - } -} - /// The sequencer context contains a consensus handle and other sequencer specific information. #[derive(Derivative, Clone)] #[derivative(Debug(bound = ""))] @@ -210,6 +169,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer, anchor_view, proposal_fetcher_cfg, + metrics, ) .with_task_list(tasks)) } @@ -228,6 +188,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, anchor_view: Option, proposal_fetcher_cfg: ProposalFetcherConfig, + metrics: &dyn Metrics, ) -> Self { let events = handle.event_stream(); @@ -245,19 +206,12 @@ impl, P: SequencerPersistence, V: Versions> Sequence }; // Spawn proposal fetching tasks. - let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity); - ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send)); - for i in 0..proposal_fetcher_cfg.num_workers { - ctx.spawn( - format!("proposal fetcher {i}"), - fetch_proposals( - ctx.handle.clone(), - persistence.clone(), - recv.clone(), - proposal_fetcher_cfg.fetch_timeout, - ), - ); - } + proposal_fetcher_cfg.spawn( + &mut ctx.tasks, + ctx.handle.clone(), + persistence.clone(), + metrics, + ); // Spawn event handling loop. ctx.spawn( @@ -475,127 +429,6 @@ async fn handle_events( } } -#[tracing::instrument(skip_all)] -async fn scan_proposals( - consensus: Arc>>, - fetcher: Sender<(ViewNumber, Commitment>)>, -) where - N: ConnectedNetwork, - P: SequencerPersistence, - V: Versions, -{ - let mut events = consensus.read().await.event_stream(); - while let Some(event) = events.next().await { - let EventType::QuorumProposal { proposal, .. } = event.event else { - continue; - }; - // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back - // to the anchor. This allows state replay from the decided state. - let parent_view = proposal.data.justify_qc.view_number; - let parent_leaf = proposal.data.justify_qc.data.leaf_commit; - fetcher - .broadcast_direct((parent_view, parent_leaf)) - .await - .ok(); - } -} - -#[tracing::instrument(skip_all)] -async fn fetch_proposals( - consensus: Arc>>, - persistence: Arc, - mut scanner: Receiver<(ViewNumber, Commitment>)>, - fetch_timeout: Duration, -) where - N: ConnectedNetwork, - P: SequencerPersistence, - V: Versions, -{ - let sender = scanner.new_sender(); - while let Some((view, leaf)) = scanner.next().await { - let span = tracing::warn_span!("fetch proposal", ?view, %leaf); - let res: anyhow::Result<()> = async { - let anchor_view = load_anchor_view(&*persistence).await; - if view <= anchor_view { - tracing::debug!(?anchor_view, "skipping already-decided proposal"); - return Ok(()); - } - - match persistence.load_quorum_proposal(view).await { - Ok(proposal) => { - // If we already have the proposal in storage, keep traversing the chain to its - // parent. - let view = proposal.data.justify_qc.view_number; - let leaf = proposal.data.justify_qc.data.leaf_commit; - sender.broadcast_direct((view, leaf)).await.ok(); - return Ok(()); - } - Err(err) => { - tracing::info!("proposal missing from storage; fetching from network: {err:#}"); - } - } - - let future = - consensus - .read() - .await - .request_proposal(view, EpochNumber::genesis(), leaf)?; - let proposal = timeout(fetch_timeout, future) - .await - .context("timed out fetching proposal")? - .context("error fetching proposal")?; - persistence - .append_quorum_proposal(&proposal) - .await - .context("error saving fetched proposal")?; - - // Add the fetched leaf to HotShot state, so consensus can make use of it. - let leaf = Leaf2::from_quorum_proposal(&proposal.data); - let handle = consensus.read().await; - let consensus = handle.consensus(); - let mut consensus = consensus.write().await; - if matches!( - consensus.validated_state_map().get(&view), - None | Some(View { - // Replace a Da-only view with a Leaf view, which has strictly more information. - view_inner: ViewInner::Da { .. } - }) - ) { - let state = Arc::new(ValidatedState::from_header(leaf.block_header())); - if let Err(err) = consensus.update_leaf(leaf, state, None) { - tracing::warn!("unable to update leaf: {err:#}"); - } - } - - Ok(()) - } - .instrument(span) - .await; - if let Err(err) = res { - tracing::warn!("failed to fetch proposal: {err:#}"); - - // Avoid busy loop when operations are failing. - sleep(Duration::from_secs(1)).await; - - // If we fail fetching the proposal, don't let it clog up the fetching task. Just push - // it back onto the queue and move onto the next proposal. - sender.broadcast_direct((view, leaf)).await.ok(); - } - } -} - -async fn load_anchor_view(persistence: &impl SequencerPersistence) -> ViewNumber { - loop { - match persistence.load_anchor_view().await { - Ok(view) => break view, - Err(err) => { - tracing::warn!("error loading anchor view: {err:#}"); - sleep(Duration::from_secs(1)).await; - } - } - } -} - #[derive(Debug, Default, Clone)] #[allow(clippy::type_complexity)] pub(crate) struct TaskList(Arc)>>>); diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 98fc95893..b4d2a4e5e 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod catchup; pub mod context; pub mod genesis; +mod proposal_fetcher; mod external_event_handler; pub mod options; @@ -13,7 +14,7 @@ mod message_compat_tests; use anyhow::Context; use catchup::StatePeers; -use context::{ProposalFetcherConfig, SequencerContext}; +use context::SequencerContext; use espresso_types::{ traits::EventConsumer, BackoffParams, L1ClientOptions, NodeState, PubKey, SeqTypes, SolverAuctionResultsProvider, ValidatedState, @@ -21,6 +22,7 @@ use espresso_types::{ use genesis::L1Finalized; use hotshot::traits::election::static_committee::StaticCommittee; use hotshot_types::traits::election::Membership; +use proposal_fetcher::ProposalFetcherConfig; use std::sync::Arc; use tokio::select; // Should move `STAKE_TABLE_CAPACITY` in the sequencer repo when we have variate stake table support diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 9882283d3..b7fabe1d7 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -21,7 +21,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey}; use libp2p::Multiaddr; use url::Url; -use crate::{api, context::ProposalFetcherConfig, persistence}; +use crate::{api, persistence, proposal_fetcher::ProposalFetcherConfig}; // This options struct is a bit unconventional. The sequencer has multiple optional modules which // can be added, in any combination, to the service. These include, for example, the API server. diff --git a/sequencer/src/proposal_fetcher.rs b/sequencer/src/proposal_fetcher.rs new file mode 100644 index 000000000..5c753ece8 --- /dev/null +++ b/sequencer/src/proposal_fetcher.rs @@ -0,0 +1,244 @@ +use std::sync::Arc; + +use anyhow::Context; +use async_channel::{Receiver, Sender}; +use async_lock::RwLock; +use clap::Parser; +use committable::Commitment; +use derivative::Derivative; +use espresso_types::{parse_duration, v0::traits::SequencerPersistence, PubKey, ValidatedState}; +use futures::stream::StreamExt; +use hotshot::types::EventType; +use hotshot_types::{ + data::{EpochNumber, Leaf2, ViewNumber}, + traits::{ + metrics::{Counter, Gauge, Metrics}, + network::ConnectedNetwork, + node_implementation::{ConsensusTime, Versions}, + ValidatedState as _, + }, + utils::{View, ViewInner}, +}; +use std::time::Duration; +use tokio::time::{sleep, timeout}; +use tracing::Instrument; + +use crate::{ + context::{Consensus, TaskList}, + SeqTypes, +}; + +#[derive(Clone, Copy, Debug, Parser)] +pub struct ProposalFetcherConfig { + #[clap( + long = "proposal-fetcher-num-workers", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", + default_value = "2" + )] + pub num_workers: usize, + + #[clap( + long = "proposal-fetcher-fetch-timeout", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + default_value = "2s", + value_parser = parse_duration, + )] + pub fetch_timeout: Duration, +} + +impl Default for ProposalFetcherConfig { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + +impl ProposalFetcherConfig { + pub(crate) fn spawn( + self, + tasks: &mut TaskList, + consensus: Arc>>, + persistence: Arc

, + metrics: &(impl Metrics + ?Sized), + ) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, + { + let (sender, receiver) = async_channel::unbounded(); + let fetcher = ProposalFetcher { + sender, + consensus, + persistence, + cfg: self, + metrics: ProposalFetcherMetrics::new(metrics), + }; + + tasks.spawn("proposal scanner", fetcher.clone().scan()); + for i in 0..self.num_workers { + tasks.spawn( + format!("proposal fetcher {i}"), + fetcher.clone().fetch(receiver.clone()), + ); + } + } +} + +#[derive(Clone, Debug)] +struct ProposalFetcherMetrics { + fetched: Arc, + failed: Arc, + queue_len: Arc, + last_seen: Arc, + last_fetched: Arc, +} + +impl ProposalFetcherMetrics { + fn new(metrics: &(impl Metrics + ?Sized)) -> Self { + let metrics = metrics.subgroup("proposal_fetcher".into()); + Self { + fetched: metrics.create_counter("fetched".into(), None).into(), + failed: metrics.create_counter("failed".into(), None).into(), + queue_len: metrics.create_gauge("queue_len".into(), None).into(), + last_seen: metrics + .create_gauge("last_seen".into(), Some("view".into())) + .into(), + last_fetched: metrics + .create_gauge("last_fetched".into(), Some("view".into())) + .into(), + } + } +} + +type Request = (ViewNumber, Commitment>); + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""))] +struct ProposalFetcher +where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + sender: Sender, + #[derivative(Debug = "ignore")] + consensus: Arc>>, + #[derivative(Debug = "ignore")] + persistence: Arc

, + cfg: ProposalFetcherConfig, + metrics: ProposalFetcherMetrics, +} + +impl ProposalFetcher +where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + #[tracing::instrument(skip_all)] + async fn scan(self) { + let mut events = self.consensus.read().await.event_stream(); + while let Some(event) = events.next().await { + let EventType::QuorumProposal { proposal, .. } = event.event else { + continue; + }; + // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back + // to the anchor. This allows state replay from the decided state. + let parent_view = proposal.data.justify_qc.view_number; + let parent_leaf = proposal.data.justify_qc.data.leaf_commit; + self.request((parent_view, parent_leaf)).await; + } + } + + #[tracing::instrument(skip_all)] + async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment>)>) { + let mut receiver = std::pin::pin!(receiver); + while let Some(req) = receiver.next().await { + self.fetch_request(req).await; + } + } + + async fn request(&self, req: Request) { + self.sender.send(req).await.ok(); + self.metrics.queue_len.set(self.sender.len()); + self.metrics.last_seen.set(req.0.u64() as usize); + } + + async fn fetch_request(&self, (view, leaf): Request) { + let span = tracing::warn_span!("fetch proposal", ?view, %leaf); + let res: anyhow::Result<()> = async { + let anchor_view = self + .persistence + .load_anchor_view() + .await + .context("loading anchor view")?; + if view <= anchor_view { + tracing::debug!(?anchor_view, "skipping already-decided proposal"); + return Ok(()); + } + + match self.persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + let view = proposal.data.justify_qc.view_number; + let leaf = proposal.data.justify_qc.data.leaf_commit; + self.request((view, leaf)).await; + return Ok(()); + } + Err(err) => { + tracing::info!("proposal missing from storage; fetching from network: {err:#}"); + } + } + + let future = + self.consensus + .read() + .await + .request_proposal(view, EpochNumber::genesis(), leaf)?; + let proposal = timeout(self.cfg.fetch_timeout, future) + .await + .context("timed out fetching proposal")? + .context("error fetching proposal")?; + self.persistence + .append_quorum_proposal(&proposal) + .await + .context("error saving fetched proposal")?; + + // Add the fetched leaf to HotShot state, so consensus can make use of it. + let leaf = Leaf2::from_quorum_proposal(&proposal.data); + let handle = self.consensus.read().await; + let consensus = handle.consensus(); + let mut consensus = consensus.write().await; + if matches!( + consensus.validated_state_map().get(&view), + None | Some(View { + // Replace a Da-only view with a Leaf view, which has strictly more information. + view_inner: ViewInner::Da { .. } + }) + ) { + let state = Arc::new(ValidatedState::from_header(leaf.block_header())); + if let Err(err) = consensus.update_leaf(leaf, state, None) { + tracing::warn!("unable to update leaf: {err:#}"); + } + } + + self.metrics.last_fetched.set(view.u64() as usize); + self.metrics.fetched.add(1); + + Ok(()) + } + .instrument(span) + .await; + if let Err(err) = res { + tracing::warn!("failed to fetch proposal: {err:#}"); + self.metrics.failed.add(1); + + // Avoid busy loop when operations are failing. + sleep(Duration::from_secs(1)).await; + + // If we fail fetching the proposal, don't let it clog up the fetching task. Just push + // it back onto the queue and move onto the next proposal. + self.request((view, leaf)).await; + } + } +}