Skip to content

Commit

Permalink
Allow undecided state storage to be selectively turned out (#1483)
Browse files Browse the repository at this point in the history
Intended to mitigate [incident
13](https://www.notion.so/espressosys/Unit-410-Lagging-DA-Node-783dc4aa86d84708a6bfca6a253df475)
while still retaining the ability to store undecided state in case we
need to restart a bunch of nodes at once.
  • Loading branch information
jbearer authored May 17, 2024
2 parents 3e50daa + 3061a67 commit 2943a51
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 39 deletions.
13 changes: 3 additions & 10 deletions sequencer/src/api/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl SequencerDataSource for DataSource {
type Options = Options;

async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
let path = Path::new(&opt.path);
let path = Path::new(opt.path());
let data_source = {
if reset {
FileSystemDataSource::create(path, provider).await?
Expand Down Expand Up @@ -41,18 +41,11 @@ mod impl_testable_data_source {
}

fn persistence_options(storage: &Self::Storage) -> Self::Options {
Options {
path: storage.path().into(),
}
Options::new(storage.path().into())
}

fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
opt.query_fs(
Default::default(),
Options {
path: storage.path().into(),
},
)
opt.query_fs(Default::default(), Options::new(storage.path().into()))
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,7 @@ mod test {
if let Err(err) = init_with_storage(
modules,
opt,
fs::Options {
path: tmp.path().into(),
},
fs::Options::new(tmp.path().into()),
SEQUENCER_VERSION,
)
.await
Expand Down
46 changes: 36 additions & 10 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use std::{
pub struct Options {
/// Storage path for persistent data.
#[clap(long, env = "ESPRESSO_SEQUENCER_STORAGE_PATH")]
pub path: PathBuf,
path: PathBuf,

#[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)]
store_undecided_state: bool,
}

impl Default for Options {
Expand All @@ -35,12 +38,28 @@ impl Default for Options {
}
}

impl Options {
pub fn new(path: PathBuf) -> Self {
Self {
path,
store_undecided_state: false,
}
}

pub(crate) fn path(&self) -> &Path {
&self.path
}
}

#[async_trait]
impl PersistenceOptions for Options {
type Persistence = Persistence;

async fn create(self) -> anyhow::Result<Persistence> {
Ok(Persistence(self.path))
Ok(Persistence {
path: self.path,
store_undecided_state: self.store_undecided_state,
})
}

async fn reset(self) -> anyhow::Result<()> {
Expand All @@ -50,31 +69,34 @@ impl PersistenceOptions for Options {

/// File system backed persistence.
#[derive(Clone, Debug)]
pub struct Persistence(PathBuf);
pub struct Persistence {
path: PathBuf,
store_undecided_state: bool,
}

impl Persistence {
fn config_path(&self) -> PathBuf {
self.0.join("hotshot.cfg")
self.path.join("hotshot.cfg")
}

fn voted_view_path(&self) -> PathBuf {
self.0.join("highest_voted_view")
self.path.join("highest_voted_view")
}

fn anchor_leaf_path(&self) -> PathBuf {
self.0.join("anchor_leaf")
self.path.join("anchor_leaf")
}

fn vid_dir_path(&self) -> PathBuf {
self.0.join("vid")
self.path.join("vid")
}

fn da_dir_path(&self) -> PathBuf {
self.0.join("da")
self.path.join("da")
}

fn undecided_state_path(&self) -> PathBuf {
self.0.join("undecided_state")
self.path.join("undecided_state")
}

/// Overwrite a file if a condition is met.
Expand Down Expand Up @@ -372,6 +394,10 @@ impl SequencerPersistence for Persistence {
leaves: CommitmentMap<Leaf>,
state: BTreeMap<ViewNumber, View<SeqTypes>>,
) -> anyhow::Result<()> {
if !self.store_undecided_state {
return Ok(());
}

self.replace(
&self.undecided_state_path(),
|_| {
Expand Down Expand Up @@ -403,7 +429,7 @@ mod testing {
}

async fn connect(storage: &Self::Storage) -> Self {
Persistence(storage.path().into())
Options::new(storage.path().into()).create().await.unwrap()
}
}
}
Expand Down
53 changes: 37 additions & 16 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,33 @@ pub struct Options {
/// addition, there are some parameters which cannot be set via the URI, such as TLS.
// Hide from debug output since may contain sensitive data.
#[derivative(Debug = "ignore")]
pub uri: Option<String>,
pub(crate) uri: Option<String>,

/// Hostname for the remote Postgres database server.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
pub host: Option<String>,
pub(crate) host: Option<String>,

/// Port for the remote Postgres database server.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
pub port: Option<u16>,
pub(crate) port: Option<u16>,

/// Name of database to connect to.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
pub database: Option<String>,
pub(crate) database: Option<String>,

/// Postgres user to connect as.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
pub user: Option<String>,
pub(crate) user: Option<String>,

/// Password for Postgres user.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
// Hide from debug output since may contain sensitive data.
#[derivative(Debug = "ignore")]
pub password: Option<String>,
pub(crate) password: Option<String>,

/// Use TLS for an encrypted connection to the database.
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
pub use_tls: bool,
pub(crate) use_tls: bool,

/// This will enable the pruner and set the default pruning parameters unless provided.
/// Default parameters:
Expand All @@ -82,11 +82,14 @@ pub struct Options {
/// - max_usage: 80%
/// - interval: 1 hour
#[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PRUNE")]
pub prune: bool,
pub(crate) prune: bool,

/// Pruning parameters.
#[clap(flatten)]
pub pruning: PruningOptions,
pub(crate) pruning: PruningOptions,

#[clap(long, env = "ESPRESSO_SEQUENCER_STORE_UNDECIDED_STATE", hide = true)]
pub(crate) store_undecided_state: bool,
}

impl TryFrom<Options> for Config {
Expand Down Expand Up @@ -205,7 +208,10 @@ impl PersistenceOptions for Options {
type Persistence = Persistence;

async fn create(self) -> anyhow::Result<Persistence> {
SqlStorage::connect(self.try_into()?).await
Ok(Persistence {
store_undecided_state: self.store_undecided_state,
db: SqlStorage::connect(self.try_into()?).await?,
})
}

async fn reset(self) -> anyhow::Result<()> {
Expand All @@ -215,21 +221,24 @@ impl PersistenceOptions for Options {
}

/// Postgres-backed persistence.
pub type Persistence = SqlStorage;
pub struct Persistence {
db: SqlStorage,
store_undecided_state: bool,
}

async fn transaction(
db: &mut Persistence,
persistence: &mut Persistence,
f: impl FnOnce(Transaction) -> BoxFuture<anyhow::Result<()>>,
) -> anyhow::Result<()> {
let tx = db.transaction().await?;
let tx = persistence.db.transaction().await?;
match f(tx).await {
Ok(_) => {
db.commit().await?;
persistence.db.commit().await?;
Ok(())
}
Err(err) => {
tracing::warn!("transaction failed, reverting: {err:#}");
db.revert().await;
persistence.db.revert().await;
Err(err)
}
}
Expand All @@ -238,14 +247,17 @@ async fn transaction(
#[async_trait]
impl SequencerPersistence for Persistence {
fn into_catchup_provider(self) -> anyhow::Result<Arc<dyn StateCatchup>> {
Ok(Arc::new(SqlStateCatchup::from(Arc::new(RwLock::new(self)))))
Ok(Arc::new(SqlStateCatchup::from(Arc::new(RwLock::new(
self.db,
)))))
}

async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
tracing::info!("loading config from Postgres");

// Select the most recent config (although there should only be one).
let Some(row) = self
.db
.query_opt_static("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
.await?
else {
Expand Down Expand Up @@ -339,6 +351,7 @@ impl SequencerPersistence for Persistence {

async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
Ok(self
.db
.query_opt_static("SELECT view FROM highest_voted_view WHERE id = 0")
.await?
.map(|row| {
Expand All @@ -351,6 +364,7 @@ impl SequencerPersistence for Persistence {
&self,
) -> anyhow::Result<Option<(Leaf, QuorumCertificate<SeqTypes>)>> {
let Some(row) = self
.db
.query_opt_static("SELECT leaf, qc FROM anchor_leaf WHERE id = 0")
.await?
else {
Expand All @@ -370,6 +384,7 @@ impl SequencerPersistence for Persistence {
&self,
) -> anyhow::Result<Option<(CommitmentMap<Leaf>, BTreeMap<ViewNumber, View<SeqTypes>>)>> {
let Some(row) = self
.db
.query_opt_static("SELECT leaves, state FROM undecided_state WHERE id = 0")
.await?
else {
Expand All @@ -390,6 +405,7 @@ impl SequencerPersistence for Persistence {
view: ViewNumber,
) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal<SeqTypes>>>> {
let result = self
.db
.query_opt(
"SELECT data FROM da_proposal where view = $1",
[&(view.get_u64() as i64)],
Expand All @@ -409,6 +425,7 @@ impl SequencerPersistence for Persistence {
view: ViewNumber,
) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
let result = self
.db
.query_opt(
"SELECT data FROM vid_share where view = $1",
[&(view.get_u64() as i64)],
Expand Down Expand Up @@ -493,6 +510,10 @@ impl SequencerPersistence for Persistence {
leaves: CommitmentMap<Leaf>,
state: BTreeMap<ViewNumber, View<SeqTypes>>,
) -> anyhow::Result<()> {
if !self.store_undecided_state {
return Ok(());
}

let leaves_bytes = bincode::serialize(&leaves).context("serializing leaves")?;
let state_bytes = bincode::serialize(&state).context("serializing state")?;

Expand Down

0 comments on commit 2943a51

Please sign in to comment.