From 74f85971fb7e9c2c804e38e649bd0ea08fa2a798 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Mon, 9 Dec 2024 13:08:26 -0800 Subject: [PATCH] Fix deadlock and race condition in proposal fetching (#2379) * Fix deadlock and race condition in proposal fetching * Change from broadcast channel to multi-consumer channel. This means only one fetcher task will receive each proposal to be fetched, which is the actual behavior we want. Before, with broadcast, we had multiple fetchers always fetching the same proposal, which is why we saw race conditions causing database serialization errors. It should now be possible to reenable multiple workers. * Use an unbounded channel. This prevents a deadlock where a consumer sends back into the channel (e.g. to recursively fetch the parent of the proposal it had just fetched), but the channel is full, blocking the consumer, the very task responsible for clearing the blockage. * Add metrics for proposal fetcher --- Cargo.lock | 1 + Cargo.toml | 1 + justfile | 11 +- sequencer/Cargo.toml | 1 + sequencer/src/context.rs | 201 ++--------------------- sequencer/src/lib.rs | 4 +- sequencer/src/options.rs | 2 +- sequencer/src/proposal_fetcher.rs | 254 ++++++++++++++++++++++++++++++ 8 files changed, 281 insertions(+), 194 deletions(-) create mode 100644 sequencer/src/proposal_fetcher.rs diff --git a/Cargo.lock b/Cargo.lock index 9d6ccbb794..e5553dd861 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8656,6 +8656,7 @@ dependencies = [ "ark-ff", "ark-serialize", "async-broadcast", + "async-channel 2.3.1", "async-lock 3.4.0", "async-once-cell", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index db2fd96b81..90625b5486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ ark-poly = "0.4" ark-serialize = "0.4" ark-srs = "0.3.1" async-broadcast = "0.7.0" +async-channel = "2" async-lock = "3" async-once-cell = "0.5" async-trait = "0.1" diff --git a/justfile b/justfile index 1bfd6f3eeb..abff229762 100644 --- a/justfile +++ b/justfile @@ -7,9 +7,14 @@ doc *args: demo *args: docker compose up {{args}} -demo-native: - cargo build --profile test - scripts/demo-native +demo-native *args: build + scripts/demo-native {{args}} + +lint: + cargo clippy --workspace --features testing --all-targets -- -D warnings + +build profile="test": + cargo build --profile {{profile}} demo-native-mp: cargo build --release diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index eb1789656c..5d9f8e6ab6 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -44,6 +44,7 @@ anyhow = { workspace = true } ark-ff = { workspace = true } ark-serialize = { workspace = true, features = ["derive"] } async-broadcast = { workspace = true } +async-channel = { workspace = true } async-lock = { workspace = true } async-once-cell = { workspace = true } async-trait = { workspace = true } diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 9e3a84776f..10910b4102 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,13 +1,9 @@ use std::{fmt::Display, sync::Arc}; use anyhow::Context; -use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLock; -use clap::Parser; -use committable::{Commitment, Committable}; use derivative::Derivative; use espresso_types::{ - parse_duration, v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, NodeState, PubKey, Transaction, ValidatedState, }; @@ -21,33 +17,27 @@ use hotshot::{ }; use hotshot_events_service::events_source::{EventConsumer, EventsStreamer}; use parking_lot::Mutex; -use tokio::{ - spawn, - task::JoinHandle, - time::{sleep, timeout}, -}; +use tokio::{spawn, task::JoinHandle}; use hotshot_orchestrator::client::OrchestratorClient; use hotshot_query_service::Leaf; use hotshot_types::{ consensus::ConsensusMetricsValue, - data::{EpochNumber, ViewNumber}, + data::ViewNumber, network::NetworkConfig, traits::{ metrics::Metrics, network::ConnectedNetwork, - node_implementation::{ConsensusTime, NodeType, Versions}, - ValidatedState as _, + node_implementation::{NodeType, Versions}, }, - utils::{View, ViewInner}, PeerConfig, ValidatorConfig, }; -use std::time::Duration; use tracing::{Instrument, Level}; use url::Url; use crate::{ external_event_handler::{self, ExternalEventHandler}, + proposal_fetcher::ProposalFetcherConfig, state_signature::StateSigner, static_stake_table_commitment, Node, SeqTypes, SequencerApiVersion, }; @@ -55,37 +45,6 @@ use crate::{ /// The consensus handle pub type Consensus = SystemContextHandle, V>; -#[derive(Clone, Copy, Debug, Parser)] -pub struct ProposalFetcherConfig { - #[clap( - long = "proposal-fetcher-num-workers", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", - default_value = "2" - )] - pub num_workers: usize, - - #[clap( - long = "proposal-fetcher-channel-capacity", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", - default_value = "100" - )] - pub channel_capacity: usize, - - #[clap( - long = "proposal-fetcher-fetch-timeout", - env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", - default_value = "2s", - value_parser = parse_duration, - )] - pub fetch_timeout: Duration, -} - -impl Default for ProposalFetcherConfig { - fn default() -> Self { - Self::parse_from(std::iter::empty::()) - } -} - /// The sequencer context contains a consensus handle and other sequencer specific information. #[derive(Derivative, Clone)] #[derivative(Debug(bound = ""))] @@ -210,6 +169,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer, anchor_view, proposal_fetcher_cfg, + metrics, ) .with_task_list(tasks)) } @@ -228,6 +188,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, anchor_view: Option, proposal_fetcher_cfg: ProposalFetcherConfig, + metrics: &dyn Metrics, ) -> Self { let events = handle.event_stream(); @@ -245,19 +206,12 @@ impl, P: SequencerPersistence, V: Versions> Sequence }; // Spawn proposal fetching tasks. - let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity); - ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send)); - for i in 0..proposal_fetcher_cfg.num_workers { - ctx.spawn( - format!("proposal fetcher {i}"), - fetch_proposals( - ctx.handle.clone(), - persistence.clone(), - recv.clone(), - proposal_fetcher_cfg.fetch_timeout, - ), - ); - } + proposal_fetcher_cfg.spawn( + &mut ctx.tasks, + ctx.handle.clone(), + persistence.clone(), + metrics, + ); // Spawn event handling loop. ctx.spawn( @@ -475,137 +429,6 @@ async fn handle_events( } } -#[tracing::instrument(skip_all)] -async fn scan_proposals( - consensus: Arc>>, - fetcher: Sender<(ViewNumber, Commitment>)>, -) where - N: ConnectedNetwork, - P: SequencerPersistence, - V: Versions, -{ - let mut events = consensus.read().await.event_stream(); - while let Some(event) = events.next().await { - let EventType::QuorumProposal { proposal, .. } = event.event else { - continue; - }; - // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back - // to the anchor. This allows state replay from the decided state. - let parent_view = proposal.data.justify_qc.view_number; - let parent_leaf = proposal.data.justify_qc.data.leaf_commit; - fetcher - .broadcast_direct((parent_view, parent_leaf)) - .await - .ok(); - } -} - -#[tracing::instrument(skip_all)] -async fn fetch_proposals( - consensus: Arc>>, - persistence: Arc, - mut scanner: Receiver<(ViewNumber, Commitment>)>, - fetch_timeout: Duration, -) where - N: ConnectedNetwork, - P: SequencerPersistence, - V: Versions, -{ - let sender = scanner.new_sender(); - while let Some((view, leaf)) = scanner.next().await { - let span = tracing::warn_span!("fetch proposal", ?view, %leaf); - let res: anyhow::Result<()> = async { - let anchor_view = load_anchor_view(&*persistence).await; - if view <= anchor_view { - tracing::debug!(?anchor_view, "skipping already-decided proposal"); - return Ok(()); - } - - match persistence.load_quorum_proposal(view).await { - Ok(proposal) => { - // If we already have the proposal in storage, keep traversing the chain to its - // parent. - let view = proposal.data.justify_qc.view_number; - let leaf = proposal.data.justify_qc.data.leaf_commit; - sender.broadcast_direct((view, leaf)).await.ok(); - return Ok(()); - } - Err(err) => { - tracing::info!("proposal missing from storage; fetching from network: {err:#}"); - } - } - - let future = - consensus - .read() - .await - .request_proposal(view, EpochNumber::genesis(), leaf)?; - let proposal = timeout(fetch_timeout, future) - .await - .context("timed out fetching proposal")? - .context("error fetching proposal")?; - persistence - .append_quorum_proposal(&proposal) - .await - .context("error saving fetched proposal")?; - - // Add the fetched leaf to HotShot state, so consensus can make use of it. - let leaf = Leaf::from_quorum_proposal(&proposal.data); - let handle = consensus.read().await; - let consensus = handle.consensus(); - let mut consensus = consensus.write().await; - if matches!( - consensus.validated_state_map().get(&view), - None | Some(View { - // Replace a Da-only view with a Leaf view, which has strictly more information. - view_inner: ViewInner::Da { .. } - }) - ) { - let v = View { - view_inner: ViewInner::Leaf { - leaf: Committable::commit(&leaf), - state: Arc::new(ValidatedState::from_header(leaf.block_header())), - delta: None, - }, - }; - if let Err(err) = consensus.update_validated_state_map(view, v) { - tracing::warn!("unable to update validated state map: {err:#}"); - } - consensus - .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) - .await; - tracing::debug!("added view to validated state map view proposal fetcher"); - } - - Ok(()) - } - .instrument(span) - .await; - if let Err(err) = res { - tracing::warn!("failed to fetch proposal: {err:#}"); - - // Avoid busy loop when operations are failing. - sleep(Duration::from_secs(1)).await; - - // If we fail fetching the proposal, don't let it clog up the fetching task. Just push - // it back onto the queue and move onto the next proposal. - sender.broadcast_direct((view, leaf)).await.ok(); - } - } -} - -async fn load_anchor_view(persistence: &impl SequencerPersistence) -> ViewNumber { - loop { - match persistence.load_anchor_view().await { - Ok(view) => break view, - Err(err) => { - tracing::warn!("error loading anchor view: {err:#}"); - sleep(Duration::from_secs(1)).await; - } - } - } -} - #[derive(Debug, Default, Clone)] #[allow(clippy::type_complexity)] pub(crate) struct TaskList(Arc)>>>); diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 143e229b01..d5e2d6ec5b 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod catchup; pub mod context; pub mod genesis; +mod proposal_fetcher; mod external_event_handler; pub mod options; @@ -12,7 +13,7 @@ mod message_compat_tests; use anyhow::Context; use async_lock::RwLock; use catchup::StatePeers; -use context::{ProposalFetcherConfig, SequencerContext}; +use context::SequencerContext; use espresso_types::{ traits::EventConsumer, BackoffParams, L1Client, L1ClientOptions, NodeState, PubKey, SeqTypes, SolverAuctionResultsProvider, ValidatedState, @@ -22,6 +23,7 @@ use futures::FutureExt; use genesis::L1Finalized; use hotshot::traits::election::static_committee::StaticCommittee; use hotshot_types::traits::election::Membership; +use proposal_fetcher::ProposalFetcherConfig; use std::sync::Arc; // Should move `STAKE_TABLE_CAPACITY` in the sequencer repo when we have variate stake table support use libp2p::Multiaddr; diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 88544ec0f6..9f76aea7e9 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -19,7 +19,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey}; use libp2p::Multiaddr; use url::Url; -use crate::{api, context::ProposalFetcherConfig, persistence}; +use crate::{api, persistence, proposal_fetcher::ProposalFetcherConfig}; // This options struct is a bit unconventional. The sequencer has multiple optional modules which // can be added, in any combination, to the service. These include, for example, the API server. diff --git a/sequencer/src/proposal_fetcher.rs b/sequencer/src/proposal_fetcher.rs new file mode 100644 index 0000000000..fff240b024 --- /dev/null +++ b/sequencer/src/proposal_fetcher.rs @@ -0,0 +1,254 @@ +use std::sync::Arc; + +use anyhow::Context; +use async_channel::{Receiver, Sender}; +use async_lock::RwLock; +use clap::Parser; +use committable::{Commitment, Committable}; +use derivative::Derivative; +use espresso_types::{parse_duration, v0::traits::SequencerPersistence, PubKey, ValidatedState}; +use futures::stream::StreamExt; +use hotshot::types::EventType; +use hotshot_types::{ + data::{EpochNumber, Leaf, ViewNumber}, + traits::{ + metrics::{Counter, Gauge, Metrics}, + network::ConnectedNetwork, + node_implementation::{ConsensusTime, Versions}, + ValidatedState as _, + }, + utils::{View, ViewInner}, +}; +use std::time::Duration; +use tokio::time::{sleep, timeout}; +use tracing::Instrument; + +use crate::{ + context::{Consensus, TaskList}, + SeqTypes, +}; + +#[derive(Clone, Copy, Debug, Parser)] +pub struct ProposalFetcherConfig { + #[clap( + long = "proposal-fetcher-num-workers", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", + default_value = "2" + )] + pub num_workers: usize, + + #[clap( + long = "proposal-fetcher-fetch-timeout", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + default_value = "2s", + value_parser = parse_duration, + )] + pub fetch_timeout: Duration, +} + +impl Default for ProposalFetcherConfig { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + +impl ProposalFetcherConfig { + pub(crate) fn spawn( + self, + tasks: &mut TaskList, + consensus: Arc>>, + persistence: Arc

, + metrics: &(impl Metrics + ?Sized), + ) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, + { + let (sender, receiver) = async_channel::unbounded(); + let fetcher = ProposalFetcher { + sender, + consensus, + persistence, + cfg: self, + metrics: ProposalFetcherMetrics::new(metrics), + }; + + tasks.spawn("proposal scanner", fetcher.clone().scan()); + for i in 0..self.num_workers { + tasks.spawn( + format!("proposal fetcher {i}"), + fetcher.clone().fetch(receiver.clone()), + ); + } + } +} + +#[derive(Clone, Debug)] +struct ProposalFetcherMetrics { + fetched: Arc, + failed: Arc, + queue_len: Arc, + last_seen: Arc, + last_fetched: Arc, +} + +impl ProposalFetcherMetrics { + fn new(metrics: &(impl Metrics + ?Sized)) -> Self { + let metrics = metrics.subgroup("proposal_fetcher".into()); + Self { + fetched: metrics.create_counter("fetched".into(), None).into(), + failed: metrics.create_counter("failed".into(), None).into(), + queue_len: metrics.create_gauge("queue_len".into(), None).into(), + last_seen: metrics + .create_gauge("last_seen".into(), Some("view".into())) + .into(), + last_fetched: metrics + .create_gauge("last_fetched".into(), Some("view".into())) + .into(), + } + } +} + +type Request = (ViewNumber, Commitment>); + +#[derive(Derivative)] +#[derivative(Clone(bound = ""), Debug(bound = ""))] +struct ProposalFetcher +where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + sender: Sender, + #[derivative(Debug = "ignore")] + consensus: Arc>>, + #[derivative(Debug = "ignore")] + persistence: Arc

, + cfg: ProposalFetcherConfig, + metrics: ProposalFetcherMetrics, +} + +impl ProposalFetcher +where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + #[tracing::instrument(skip_all)] + async fn scan(self) { + let mut events = self.consensus.read().await.event_stream(); + while let Some(event) = events.next().await { + let EventType::QuorumProposal { proposal, .. } = event.event else { + continue; + }; + // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back + // to the anchor. This allows state replay from the decided state. + let parent_view = proposal.data.justify_qc.view_number; + let parent_leaf = proposal.data.justify_qc.data.leaf_commit; + self.request((parent_view, parent_leaf)).await; + } + } + + #[tracing::instrument(skip_all)] + async fn fetch(self, receiver: Receiver<(ViewNumber, Commitment>)>) { + let mut receiver = std::pin::pin!(receiver); + while let Some(req) = receiver.next().await { + self.fetch_request(req).await; + } + } + + async fn request(&self, req: Request) { + self.sender.send(req).await.ok(); + self.metrics.queue_len.set(self.sender.len()); + self.metrics.last_seen.set(req.0.u64() as usize); + } + + async fn fetch_request(&self, (view, leaf): Request) { + let span = tracing::warn_span!("fetch proposal", ?view, %leaf); + let res: anyhow::Result<()> = async { + let anchor_view = self + .persistence + .load_anchor_view() + .await + .context("loading anchor view")?; + if view <= anchor_view { + tracing::debug!(?anchor_view, "skipping already-decided proposal"); + return Ok(()); + } + + match self.persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + let view = proposal.data.justify_qc.view_number; + let leaf = proposal.data.justify_qc.data.leaf_commit; + self.request((view, leaf)).await; + return Ok(()); + } + Err(err) => { + tracing::info!("proposal missing from storage; fetching from network: {err:#}"); + } + } + + let future = + self.consensus + .read() + .await + .request_proposal(view, EpochNumber::genesis(), leaf)?; + let proposal = timeout(self.cfg.fetch_timeout, future) + .await + .context("timed out fetching proposal")? + .context("error fetching proposal")?; + self.persistence + .append_quorum_proposal(&proposal) + .await + .context("error saving fetched proposal")?; + + // Add the fetched leaf to HotShot state, so consensus can make use of it. + let leaf = Leaf::from_quorum_proposal(&proposal.data); + let handle = self.consensus.read().await; + let consensus = handle.consensus(); + let mut consensus = consensus.write().await; + if matches!( + consensus.validated_state_map().get(&view), + None | Some(View { + // Replace a Da-only view with a Leaf view, which has strictly more information. + view_inner: ViewInner::Da { .. } + }) + ) { + let v = View { + view_inner: ViewInner::Leaf { + leaf: Committable::commit(&leaf), + state: Arc::new(ValidatedState::from_header(leaf.block_header())), + delta: None, + }, + }; + if let Err(err) = consensus.update_validated_state_map(view, v) { + tracing::warn!("unable to update validated state map: {err:#}"); + } + consensus + .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) + .await; + tracing::debug!("added view to validated state map view proposal fetcher"); + } + + self.metrics.last_fetched.set(view.u64() as usize); + self.metrics.fetched.add(1); + + Ok(()) + } + .instrument(span) + .await; + if let Err(err) = res { + tracing::warn!("failed to fetch proposal: {err:#}"); + self.metrics.failed.add(1); + + // Avoid busy loop when operations are failing. + sleep(Duration::from_secs(1)).await; + + // If we fail fetching the proposal, don't let it clog up the fetching task. Just push + // it back onto the queue and move onto the next proposal. + self.request((view, leaf)).await; + } + } +}