Skip to content

Commit

Permalink
fix(block-reverter): Fix reverting snapshot files (#2064)
Browse files Browse the repository at this point in the history
## What ❔

- Does not retry "not found" errors when removing objects.
- Makes rolling back snapshot files optional.

## Why ❔

The current implementation leads to very slow reverts because of
retries.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 3, 2024
1 parent 7c7d352 commit 17a7e78
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 105 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions core/bin/block_reverter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ enum Command {
/// Flag that specifies if RocksDB with state keeper cache should be rolled back.
#[arg(long)]
rollback_sk_cache: bool,
/// Flag that specifies if snapshot files in GCS should be rolled back.
#[arg(long, requires = "rollback_postgres")]
rollback_snapshots: bool,
/// Flag that allows to roll back already executed blocks. It's ultra dangerous and required only for fixing external nodes.
#[arg(long)]
allow_executed_block_reversion: bool,
Expand Down Expand Up @@ -187,6 +190,7 @@ async fn main() -> anyhow::Result<()> {
rollback_postgres,
rollback_tree,
rollback_sk_cache,
rollback_snapshots,
allow_executed_block_reversion,
} => {
if !rollback_tree && rollback_postgres {
Expand Down Expand Up @@ -219,13 +223,15 @@ async fn main() -> anyhow::Result<()> {

if rollback_postgres {
block_reverter.enable_rolling_back_postgres();
let object_store_config = SnapshotsObjectStoreConfig::from_env()
.context("SnapshotsObjectStoreConfig::from_env()")?;
block_reverter.enable_rolling_back_snapshot_objects(
ObjectStoreFactory::new(object_store_config.0)
.create_store()
.await,
);
if rollback_snapshots {
let object_store_config = SnapshotsObjectStoreConfig::from_env()
.context("SnapshotsObjectStoreConfig::from_env()")?;
block_reverter.enable_rolling_back_snapshot_objects(
ObjectStoreFactory::new(object_store_config.0)
.create_store()
.await,
);
}
}
if rollback_tree {
block_reverter.enable_rolling_back_merkle_tree(db_config.merkle_tree.path);
Expand Down
2 changes: 2 additions & 0 deletions core/lib/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ google-cloud-auth.workspace = true
http.workspace = true
serde_json.workspace = true
flate2.workspace = true
rand.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
prost.workspace = true

[dev-dependencies]
assert_matches.workspace = true
tempfile.workspace = true
45 changes: 22 additions & 23 deletions core/lib/object_store/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ impl From<io::Error> for ObjectStoreError {
fn from(err: io::Error) -> Self {
match err.kind() {
io::ErrorKind::NotFound => ObjectStoreError::KeyNotFound(err.into()),
_ => ObjectStoreError::Other(err.into()),
kind => ObjectStoreError::Other {
is_transient: matches!(kind, io::ErrorKind::Interrupted | io::ErrorKind::TimedOut),
source: err.into(),
},
}
}
}
Expand All @@ -20,7 +23,7 @@ pub(crate) struct FileBackedObjectStore {
}

impl FileBackedObjectStore {
pub async fn new(base_dir: String) -> Self {
pub async fn new(base_dir: String) -> Result<Self, ObjectStoreError> {
for bucket in &[
Bucket::ProverJobs,
Bucket::WitnessInput,
Expand All @@ -36,13 +39,9 @@ impl FileBackedObjectStore {
Bucket::TeeVerifierInput,
] {
let bucket_path = format!("{base_dir}/{bucket}");
fs::create_dir_all(&bucket_path)
.await
.unwrap_or_else(|err| {
panic!("failed creating bucket `{bucket_path}`: {err}");
});
fs::create_dir_all(&bucket_path).await?;
}
FileBackedObjectStore { base_dir }
Ok(FileBackedObjectStore { base_dir })
}

fn filename(&self, bucket: Bucket, key: &str) -> String {
Expand Down Expand Up @@ -87,12 +86,12 @@ mod test {
async fn test_get() {
let dir = TempDir::new().unwrap();
let path = dir.into_path().into_os_string().into_string().unwrap();
let object_store = FileBackedObjectStore::new(path).await;
let object_store = FileBackedObjectStore::new(path).await.unwrap();
let expected = vec![9, 0, 8, 9, 0, 7];
let result = object_store
object_store
.put_raw(Bucket::ProverJobs, "test-key.bin", expected.clone())
.await;
assert!(result.is_ok(), "result must be OK");
.await
.unwrap();
let bytes = object_store
.get_raw(Bucket::ProverJobs, "test-key.bin")
.await
Expand All @@ -104,26 +103,26 @@ mod test {
async fn test_put() {
let dir = TempDir::new().unwrap();
let path = dir.into_path().into_os_string().into_string().unwrap();
let object_store = FileBackedObjectStore::new(path).await;
let object_store = FileBackedObjectStore::new(path).await.unwrap();
let bytes = vec![9, 0, 8, 9, 0, 7];
let result = object_store
object_store
.put_raw(Bucket::ProverJobs, "test-key.bin", bytes)
.await;
assert!(result.is_ok(), "result must be OK");
.await
.unwrap();
}

#[tokio::test]
async fn test_remove() {
let dir = TempDir::new().unwrap();
let path = dir.into_path().into_os_string().into_string().unwrap();
let object_store = FileBackedObjectStore::new(path).await;
let result = object_store
let object_store = FileBackedObjectStore::new(path).await.unwrap();
object_store
.put_raw(Bucket::ProverJobs, "test-key.bin", vec![0, 1])
.await;
assert!(result.is_ok(), "result must be OK");
let result = object_store
.await
.unwrap();
object_store
.remove_raw(Bucket::ProverJobs, "test-key.bin")
.await;
assert!(result.is_ok(), "result must be OK");
.await
.unwrap();
}
}
Loading

0 comments on commit 17a7e78

Please sign in to comment.