Skip to content

Commit

Permalink
Fix deadlock and race condition in proposal fetching (#2379)
Browse files Browse the repository at this point in the history
* Fix deadlock and race condition in proposal fetching

* Change from broadcast channel to multi-consumer channel. This means
  only one fetcher task will receive each proposal to be fetched, which
  is the actual behavior we want. Before, with broadcast, we had multiple
  fetchers always fetching the same proposal, which is why we saw race
  conditions causing database serialization errors. It should now be
  possible to reenable multiple workers.
* Use an unbounded channel. This prevents a deadlock where a consumer
  sends back into the channel (e.g. to recursively fetch the parent of
  the proposal it had just fetched), but the channel is full, blocking
  the consumer, the very task responsible for clearing the blockage.

* Add metrics for proposal fetcher
  • Loading branch information
jbearer authored Dec 9, 2024
1 parent a38dce3 commit 8cd4f97
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 187 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ark-poly = "0.4"
ark-serialize = "0.4"
ark-srs = "0.3.1"
async-broadcast = "0.7.0"
async-channel = "2"
async-lock = "3"
async-once-cell = "0.5"
async-trait = "0.1"
Expand Down
14 changes: 11 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,21 @@ demo *args:
demo-native *args: build
scripts/demo-native {{args}}

build:
lint:
#!/usr/bin/env bash
set -euxo pipefail
# Use the same target dir for both `clippy` invocations
export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target}
cargo clippy --workspace --features testing --all-targets -- -D warnings
cargo clippy --workspace --all-targets --manifest-path sequencer-sqlite/Cargo.toml -- -D warnings
build profile="test":
#!/usr/bin/env bash
set -euxo pipefail
# Use the same target dir for both `build` invocations
export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target}
cargo build --profile test
cargo build --profile test --manifest-path ./sequencer-sqlite/Cargo.toml
cargo build --profile {{profile}}
cargo build --profile {{profile}} --manifest-path ./sequencer-sqlite/Cargo.toml
demo-native-mp *args: build
scripts/demo-native -f process-compose.yaml -f process-compose-mp.yml {{args}}
Expand Down
2 changes: 1 addition & 1 deletion sequencer-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ vergen = { workspace = true }
anyhow = { workspace = true }
ark-ff = { workspace = true }
ark-serialize = { workspace = true, features = ["derive"] }
async-broadcast = { workspace = true }
async-channel = { workspace = true }
async-lock = { workspace = true }
async-once-cell = { workspace = true }
async-trait = { workspace = true }
Expand Down
191 changes: 12 additions & 179 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::{fmt::Display, sync::Arc};

use anyhow::Context;
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLock;
use clap::Parser;
use committable::Commitment;
use derivative::Derivative;
use espresso_types::{
parse_duration,
v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
NodeState, PubKey, Transaction, ValidatedState,
};
Expand All @@ -22,70 +18,33 @@ use hotshot::{
};
use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
use parking_lot::Mutex;
use tokio::{
spawn,
task::JoinHandle,
time::{sleep, timeout},
};
use tokio::{spawn, task::JoinHandle};

use hotshot_orchestrator::client::OrchestratorClient;
use hotshot_types::{
consensus::ConsensusMetricsValue,
data::{EpochNumber, Leaf2, ViewNumber},
data::{Leaf2, ViewNumber},
network::NetworkConfig,
traits::{
metrics::Metrics,
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeType, Versions},
ValidatedState as _,
node_implementation::{NodeType, Versions},
},
utils::{View, ViewInner},
PeerConfig, ValidatorConfig,
};
use std::time::Duration;
use tracing::{Instrument, Level};
use url::Url;

use crate::{
external_event_handler::{self, ExternalEventHandler},
proposal_fetcher::ProposalFetcherConfig,
state_signature::StateSigner,
static_stake_table_commitment, Node, SeqTypes, SequencerApiVersion,
};

/// The consensus handle
pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;

#[derive(Clone, Copy, Debug, Parser)]
pub struct ProposalFetcherConfig {
#[clap(
long = "proposal-fetcher-num-workers",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
default_value = "2"
)]
pub num_workers: usize,

#[clap(
long = "proposal-fetcher-channel-capacity",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY",
default_value = "100"
)]
pub channel_capacity: usize,

#[clap(
long = "proposal-fetcher-fetch-timeout",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
default_value = "2s",
value_parser = parse_duration,
)]
pub fetch_timeout: Duration,
}

impl Default for ProposalFetcherConfig {
fn default() -> Self {
Self::parse_from(std::iter::empty::<String>())
}
}

/// The sequencer context contains a consensus handle and other sequencer specific information.
#[derive(Derivative, Clone)]
#[derivative(Debug(bound = ""))]
Expand Down Expand Up @@ -210,6 +169,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
event_consumer,
anchor_view,
proposal_fetcher_cfg,
metrics,
)
.with_task_list(tasks))
}
Expand All @@ -228,6 +188,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
event_consumer: impl PersistenceEventConsumer + 'static,
anchor_view: Option<ViewNumber>,
proposal_fetcher_cfg: ProposalFetcherConfig,
metrics: &dyn Metrics,
) -> Self {
let events = handle.event_stream();

Expand All @@ -245,19 +206,12 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
};

// Spawn proposal fetching tasks.
let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity);
ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send));
for i in 0..proposal_fetcher_cfg.num_workers {
ctx.spawn(
format!("proposal fetcher {i}"),
fetch_proposals(
ctx.handle.clone(),
persistence.clone(),
recv.clone(),
proposal_fetcher_cfg.fetch_timeout,
),
);
}
proposal_fetcher_cfg.spawn(
&mut ctx.tasks,
ctx.handle.clone(),
persistence.clone(),
metrics,
);

// Spawn event handling loop.
ctx.spawn(
Expand Down Expand Up @@ -475,127 +429,6 @@ async fn handle_events<V: Versions>(
}
}

#[tracing::instrument(skip_all)]
async fn scan_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
fetcher: Sender<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let mut events = consensus.read().await.event_stream();
while let Some(event) = events.next().await {
let EventType::QuorumProposal { proposal, .. } = event.event else {
continue;
};
// Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
// to the anchor. This allows state replay from the decided state.
let parent_view = proposal.data.justify_qc.view_number;
let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
fetcher
.broadcast_direct((parent_view, parent_leaf))
.await
.ok();
}
}

#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
mut scanner: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
fetch_timeout: Duration,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let sender = scanner.new_sender();
while let Some((view, leaf)) = scanner.next().await {
let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
let res: anyhow::Result<()> = async {
let anchor_view = load_anchor_view(&*persistence).await;
if view <= anchor_view {
tracing::debug!(?anchor_view, "skipping already-decided proposal");
return Ok(());
}

match persistence.load_quorum_proposal(view).await {
Ok(proposal) => {
// If we already have the proposal in storage, keep traversing the chain to its
// parent.
let view = proposal.data.justify_qc.view_number;
let leaf = proposal.data.justify_qc.data.leaf_commit;
sender.broadcast_direct((view, leaf)).await.ok();
return Ok(());
}
Err(err) => {
tracing::info!("proposal missing from storage; fetching from network: {err:#}");
}
}

let future =
consensus
.read()
.await
.request_proposal(view, EpochNumber::genesis(), leaf)?;
let proposal = timeout(fetch_timeout, future)
.await
.context("timed out fetching proposal")?
.context("error fetching proposal")?;
persistence
.append_quorum_proposal(&proposal)
.await
.context("error saving fetched proposal")?;

// Add the fetched leaf to HotShot state, so consensus can make use of it.
let leaf = Leaf2::from_quorum_proposal(&proposal.data);
let handle = consensus.read().await;
let consensus = handle.consensus();
let mut consensus = consensus.write().await;
if matches!(
consensus.validated_state_map().get(&view),
None | Some(View {
// Replace a Da-only view with a Leaf view, which has strictly more information.
view_inner: ViewInner::Da { .. }
})
) {
let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
if let Err(err) = consensus.update_leaf(leaf, state, None) {
tracing::warn!("unable to update leaf: {err:#}");
}
}

Ok(())
}
.instrument(span)
.await;
if let Err(err) = res {
tracing::warn!("failed to fetch proposal: {err:#}");

// Avoid busy loop when operations are failing.
sleep(Duration::from_secs(1)).await;

// If we fail fetching the proposal, don't let it clog up the fetching task. Just push
// it back onto the queue and move onto the next proposal.
sender.broadcast_direct((view, leaf)).await.ok();
}
}
}

async fn load_anchor_view(persistence: &impl SequencerPersistence) -> ViewNumber {
loop {
match persistence.load_anchor_view().await {
Ok(view) => break view,
Err(err) => {
tracing::warn!("error loading anchor view: {err:#}");
sleep(Duration::from_secs(1)).await;
}
}
}
}

#[derive(Debug, Default, Clone)]
#[allow(clippy::type_complexity)]
pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
Expand Down
4 changes: 3 additions & 1 deletion sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod api;
pub mod catchup;
pub mod context;
pub mod genesis;
mod proposal_fetcher;

mod external_event_handler;
pub mod options;
Expand All @@ -13,14 +14,15 @@ mod message_compat_tests;

use anyhow::Context;
use catchup::StatePeers;
use context::{ProposalFetcherConfig, SequencerContext};
use context::SequencerContext;
use espresso_types::{
traits::EventConsumer, BackoffParams, L1ClientOptions, NodeState, PubKey, SeqTypes,
SolverAuctionResultsProvider, ValidatedState,
};
use genesis::L1Finalized;
use hotshot::traits::election::static_committee::StaticCommittee;
use hotshot_types::traits::election::Membership;
use proposal_fetcher::ProposalFetcherConfig;
use std::sync::Arc;
use tokio::select;
// Should move `STAKE_TABLE_CAPACITY` in the sequencer repo when we have variate stake table support
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey};
use libp2p::Multiaddr;
use url::Url;

use crate::{api, context::ProposalFetcherConfig, persistence};
use crate::{api, persistence, proposal_fetcher::ProposalFetcherConfig};

// This options struct is a bit unconventional. The sequencer has multiple optional modules which
// can be added, in any combination, to the service. These include, for example, the API server.
Expand Down
Loading

0 comments on commit 8cd4f97

Please sign in to comment.