From 3267f983b5b937b67c6565c62511b369d440f997 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Mon, 23 Sep 2024 13:01:29 +0200 Subject: [PATCH] Tests and fixes for oplog corruption bug --- Cargo.lock | 8 +- Cargo.toml | 2 +- golem-cli/Cargo.toml | 2 +- .../src/services/oplog/mock.rs | 6 +- .../src/services/oplog/mod.rs | 7 +- .../src/services/oplog/multilayer.rs | 16 +- .../src/services/oplog/primary.rs | 9 +- .../src/services/oplog/tests.rs | 292 +++++++++++++++++- .../src/services/scheduler.rs | 5 +- .../src/services/worker.rs | 32 +- golem-worker-executor-base/src/worker.rs | 6 +- 11 files changed, 353 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46ad0792e1..9bb31df48d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3815,9 +3815,9 @@ dependencies = [ [[package]] name = "golem-wasm-rpc" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aeeb1d3e3cfd9c96a6032ed5969533b048ee6d995f1713e717ea5103916b6c70" +checksum = "6e5137fe5950679be704177c8fec9ef5d4c195bb874707d6d35035a892a9183f" dependencies = [ "arbitrary", "async-recursion", @@ -3839,9 +3839,9 @@ dependencies = [ [[package]] name = "golem-wasm-rpc-stubgen" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc7c8772c322c6203949f0ceea33510bce3b2cbda08f5d9f385c36c7414e745" +checksum = "5be1bfec5496fc4f5e913abe3ff49e78ce78b6065977ab73c9b2b0b662426542" dependencies = [ "anyhow", "cargo-component", diff --git a/Cargo.toml b/Cargo.toml index d6e036fe5e..b9beb8ffff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ futures-core = "0.3.29" futures-util = "0.3.29" git-version = "0.3.9" golem-wasm-ast = "1.0.0" -golem-wasm-rpc = { version = "1.0.2", default-features = false, features = [ +golem-wasm-rpc = { version = "1.0.3", default-features = false, features = [ "host", ] } http = "1.0.0" # keep in sync with wasmtime diff --git a/golem-cli/Cargo.toml b/golem-cli/Cargo.toml index a46e3e044c..a31e953d44 100644 --- a/golem-cli/Cargo.toml +++ b/golem-cli/Cargo.toml @@ -39,7 +39,7 @@ futures-util = { workspace = true } golem-examples = "1.0.5" golem-wasm-ast = { workspace = true } golem-wasm-rpc = { workspace = true } -golem-wasm-rpc-stubgen = { version = "1.0.2", optional = true } +golem-wasm-rpc-stubgen = { version = "1.0.3", optional = true } h2 = "0.3.24" http = { workspace = true } humansize = { workspace = true } diff --git a/golem-worker-executor-base/src/services/oplog/mock.rs b/golem-worker-executor-base/src/services/oplog/mock.rs index 2f677a977c..415e643396 100644 --- a/golem-worker-executor-base/src/services/oplog/mock.rs +++ b/golem-worker-executor-base/src/services/oplog/mock.rs @@ -49,7 +49,11 @@ impl OplogService for OplogServiceMock { unimplemented!() } - async fn open(&self, _owned_worker_id: &OwnedWorkerId) -> Arc { + async fn open( + &self, + _owned_worker_id: &OwnedWorkerId, + _last_oplog_index: OplogIndex, + ) -> Arc { unimplemented!() } diff --git a/golem-worker-executor-base/src/services/oplog/mod.rs b/golem-worker-executor-base/src/services/oplog/mod.rs index 92dd686ff0..bc85be7dd4 100644 --- a/golem-worker-executor-base/src/services/oplog/mod.rs +++ b/golem-worker-executor-base/src/services/oplog/mod.rs @@ -71,8 +71,11 @@ pub trait OplogService: Debug { owned_worker_id: &OwnedWorkerId, initial_entry: OplogEntry, ) -> Arc; - async fn open(&self, owned_worker_id: &OwnedWorkerId) - -> Arc; + async fn open( + &self, + owned_worker_id: &OwnedWorkerId, + last_oplog_index: OplogIndex, + ) -> Arc; async fn get_last_index(&self, owned_worker_id: &OwnedWorkerId) -> OplogIndex; diff --git a/golem-worker-executor-base/src/services/oplog/multilayer.rs b/golem-worker-executor-base/src/services/oplog/multilayer.rs index 1b70e3df57..6b64e1c905 100644 --- a/golem-worker-executor-base/src/services/oplog/multilayer.rs +++ b/golem-worker-executor-base/src/services/oplog/multilayer.rs @@ -148,6 +148,7 @@ struct CreateOplogConstructor { initial_entry: Option, primary: Arc, service: MultiLayerOplogService, + last_oplog_index: OplogIndex, } impl CreateOplogConstructor { @@ -156,12 +157,14 @@ impl CreateOplogConstructor { initial_entry: Option, primary: Arc, service: MultiLayerOplogService, + last_oplog_index: OplogIndex, ) -> Self { Self { owned_worker_id, initial_entry, primary, service, + last_oplog_index, } } } @@ -177,7 +180,9 @@ impl OplogConstructor for CreateOplogConstructor { .create(&self.owned_worker_id, initial_entry) .await } else { - self.primary.open(&self.owned_worker_id).await + self.primary + .open(&self.owned_worker_id, self.last_oplog_index) + .await }; Arc::new(MultiLayerOplog::new(self.owned_worker_id, primary, self.service, close).await) } @@ -198,12 +203,18 @@ impl OplogService for MultiLayerOplogService { Some(initial_entry), self.primary.clone(), self.clone(), + OplogIndex::INITIAL, ), ) .await } - async fn open(&self, owned_worker_id: &OwnedWorkerId) -> Arc { + async fn open( + &self, + owned_worker_id: &OwnedWorkerId, + last_oplog_index: OplogIndex, + ) -> Arc { + debug!("MultiLayerOplogService::open {owned_worker_id}"); self.oplogs .get_or_open( &owned_worker_id.worker_id, @@ -212,6 +223,7 @@ impl OplogService for MultiLayerOplogService { None, self.primary.clone(), self.clone(), + last_oplog_index, ), ) .await diff --git a/golem-worker-executor-base/src/services/oplog/primary.rs b/golem-worker-executor-base/src/services/oplog/primary.rs index 30a96b0c69..f84d47bb2a 100644 --- a/golem-worker-executor-base/src/services/oplog/primary.rs +++ b/golem-worker-executor-base/src/services/oplog/primary.rs @@ -122,14 +122,17 @@ impl OplogService for PrimaryOplogService { ) }); - self.open(owned_worker_id).await + self.open(owned_worker_id, OplogIndex::INITIAL).await } - async fn open(&self, owned_worker_id: &OwnedWorkerId) -> Arc { + async fn open( + &self, + owned_worker_id: &OwnedWorkerId, + last_oplog_index: OplogIndex, + ) -> Arc { record_oplog_call("open"); let key = Self::oplog_key(&owned_worker_id.worker_id); - let last_oplog_index = self.get_last_index(owned_worker_id).await; self.oplogs .get_or_open( diff --git a/golem-worker-executor-base/src/services/oplog/tests.rs b/golem-worker-executor-base/src/services/oplog/tests.rs index f339852178..da1d515b5a 100644 --- a/golem-worker-executor-base/src/services/oplog/tests.rs +++ b/golem-worker-executor-base/src/services/oplog/tests.rs @@ -236,7 +236,8 @@ async fn open_add_and_read_back() { worker_name: "test".to_string(), }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let entry1 = rounded(OplogEntry::jump(OplogRegion { start: OplogIndex::from_u64(5), @@ -282,7 +283,8 @@ async fn entries_with_small_payload() { }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let last_oplog_idx = oplog.current_oplog_index().await; let entry1 = rounded( @@ -387,7 +389,8 @@ async fn entries_with_large_payload() { worker_name: "test".to_string(), }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let large_payload1 = vec![0u8; 1024 * 1024]; let large_payload2 = vec![1u8; 1024 * 1024]; @@ -566,7 +569,8 @@ async fn multilayer_transfers_entries_after_limit_reached( }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let mut entries = Vec::new(); for i in 0..n { @@ -589,7 +593,10 @@ async fn multilayer_transfers_entries_after_limit_reached( debug!("Fetching information to evaluate the test"); let primary_length = primary_oplog_service - .open(&owned_worker_id) + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) .await .length() .await; @@ -657,7 +664,8 @@ async fn read_from_archive_impl(use_blob: bool) { }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let timestamp = Timestamp::now_utc(); let entries: Vec = (0..100) @@ -678,7 +686,10 @@ async fn read_from_archive_impl(use_blob: bool) { tokio::time::sleep(Duration::from_secs(2)).await; let primary_length = primary_oplog_service - .open(&owned_worker_id) + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) .await .length() .await; @@ -697,6 +708,249 @@ async fn read_from_archive_impl(use_blob: bool) { assert_eq!(first10.into_values().collect::>(), original_first10); } +#[tokio::test] +async fn write_after_archive() { + write_after_archive_impl(false, Reopen::No).await; +} + +#[tokio::test] +async fn blob_write_after_archive() { + write_after_archive_impl(true, Reopen::No).await; +} + +#[tokio::test] +async fn write_after_archive_reopen() { + write_after_archive_impl(false, Reopen::Yes).await; +} + +#[tokio::test] +async fn blob_write_after_archive_reopen() { + write_after_archive_impl(true, Reopen::Yes).await; +} + +#[tokio::test] +async fn write_after_archive_reopen_full() { + write_after_archive_impl(false, Reopen::Full).await; +} + +#[tokio::test] +async fn blob_write_after_archive_reopen_full() { + write_after_archive_impl(true, Reopen::Full).await; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum Reopen { + No, + Yes, + Full, +} + +async fn write_after_archive_impl(use_blob: bool, reopen: Reopen) { + let indexed_storage = Arc::new(InMemoryIndexedStorage::new()); + let blob_storage = Arc::new(InMemoryBlobStorage::new()); + let mut primary_oplog_service = Arc::new( + PrimaryOplogService::new(indexed_storage.clone(), blob_storage.clone(), 1, 100).await, + ); + let secondary_layer: Arc = if use_blob { + Arc::new(BlobOplogArchiveService::new(blob_storage.clone(), 1)) + } else { + Arc::new(CompressedOplogArchiveService::new( + indexed_storage.clone(), + 1, + )) + }; + let tertiary_layer: Arc = if use_blob { + Arc::new(BlobOplogArchiveService::new(blob_storage.clone(), 2)) + } else { + Arc::new(CompressedOplogArchiveService::new( + indexed_storage.clone(), + 2, + )) + }; + let mut oplog_service = Arc::new(MultiLayerOplogService::new( + primary_oplog_service.clone(), + nev![secondary_layer.clone(), tertiary_layer.clone()], + 10, + )); + let account_id = AccountId { + value: "user1".to_string(), + }; + let worker_id = WorkerId { + component_id: ComponentId(Uuid::new_v4()), + worker_name: "test".to_string(), + }; + let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); + + info!("FIRST OPEN"); + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; + info!("FIRST OPEN DONE"); + + let timestamp = Timestamp::now_utc(); + let entries: Vec = (0..100) + .map(|i| { + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown(i.to_string()), + }) + }) + .collect(); + + let initial_oplog_idx = oplog.current_oplog_index().await; + + for entry in &entries { + oplog.add(entry.clone()).await; + } + oplog.commit().await; + tokio::time::sleep(Duration::from_secs(2)).await; + + let primary_length = primary_oplog_service + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) + .await + .length() + .await; + let secondary_length = secondary_layer.open(&owned_worker_id).await.length().await; + let tertiary_length = tertiary_layer.open(&owned_worker_id).await.length().await; + + info!("initial oplog index: {}", initial_oplog_idx); + info!("primary_length: {}", primary_length); + info!("secondary_length: {}", secondary_length); + info!("tertiary_length: {}", tertiary_length); + + let oplog = if reopen == Reopen::Yes { + drop(oplog); + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + oplog_service.open(&owned_worker_id, last_oplog_index).await + } else if reopen == Reopen::Full { + drop(oplog); + primary_oplog_service = Arc::new( + PrimaryOplogService::new(indexed_storage.clone(), blob_storage.clone(), 1, 100).await, + ); + oplog_service = Arc::new(MultiLayerOplogService::new( + primary_oplog_service.clone(), + nev![secondary_layer.clone(), tertiary_layer.clone()], + 10, + )); + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + oplog_service.open(&owned_worker_id, last_oplog_index).await + } else { + oplog + }; + + let entries: Vec = (100..1000) + .map(|i| { + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown(i.to_string()), + }) + }) + .collect(); + + for (n, entry) in entries.iter().enumerate() { + oplog.add(entry.clone()).await; + if n % 100 == 0 { + oplog.commit().await; + } + } + oplog.commit().await; + tokio::time::sleep(Duration::from_secs(2)).await; + + let primary_length = primary_oplog_service + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) + .await + .length() + .await; + let secondary_length = secondary_layer.open(&owned_worker_id).await.length().await; + let tertiary_length = tertiary_layer.open(&owned_worker_id).await.length().await; + + info!("initial oplog index: {}", initial_oplog_idx); + info!("primary_length: {}", primary_length); + info!("secondary_length: {}", secondary_length); + info!("tertiary_length: {}", tertiary_length); + + let oplog = if reopen == Reopen::Yes { + drop(oplog); + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + oplog_service.open(&owned_worker_id, last_oplog_index).await + } else if reopen == Reopen::Full { + drop(oplog); + primary_oplog_service = Arc::new( + PrimaryOplogService::new(indexed_storage.clone(), blob_storage.clone(), 1, 100).await, + ); + oplog_service = Arc::new(MultiLayerOplogService::new( + primary_oplog_service.clone(), + nev![secondary_layer.clone(), tertiary_layer.clone()], + 10, + )); + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + oplog_service.open(&owned_worker_id, last_oplog_index).await + } else { + oplog + }; + + oplog + .add(rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown("last".to_string()), + })) + .await; + oplog.commit().await; + drop(oplog); + + let entry1 = oplog_service + .read(&owned_worker_id, OplogIndex::INITIAL, 1) + .await; + let entry2 = oplog_service + .read(&owned_worker_id, OplogIndex::from_u64(100), 1) + .await; + let entry3 = oplog_service + .read(&owned_worker_id, OplogIndex::from_u64(1000), 1) + .await; + let entry4 = oplog_service + .read(&owned_worker_id, OplogIndex::from_u64(1001), 1) + .await; + + assert_eq!(entry1.len(), 1); + assert_eq!(entry2.len(), 1); + assert_eq!(entry3.len(), 1); + assert_eq!(entry4.len(), 1); + + assert_eq!( + entry1.get(&OplogIndex::INITIAL).unwrap().clone(), + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown("0".to_string()), + }) + ); + assert_eq!( + entry2.get(&OplogIndex::from_u64(100)).unwrap().clone(), + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown("99".to_string()), + }) + ); + assert_eq!( + entry3.get(&OplogIndex::from_u64(1000)).unwrap().clone(), + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown("999".to_string()), + }) + ); + assert_eq!( + entry4.get(&OplogIndex::from_u64(1001)).unwrap().clone(), + rounded(OplogEntry::Error { + timestamp, + error: WorkerError::Unknown("last".to_string()), + }) + ); +} + #[tokio::test] async fn empty_layer_gets_deleted() { empty_layer_gets_deleted_impl(false).await; @@ -743,7 +997,8 @@ async fn empty_layer_gets_deleted_impl(use_blob: bool) { }; let owned_worker_id = OwnedWorkerId::new(&account_id, &worker_id); - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; // As we add 100 entries at once, and that exceeds the limit, we expect that all entries have // been moved to the secondary layer. By doing this 10 more times, we end up having all entries @@ -774,7 +1029,10 @@ async fn empty_layer_gets_deleted_impl(use_blob: bool) { let tertiary_exists = tertiary_layer.exists(&owned_worker_id).await; let primary_length = primary_oplog_service - .open(&owned_worker_id) + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) .await .length() .await; @@ -852,7 +1110,8 @@ async fn scheduled_archive_impl(use_blob: bool) { // Adding 100 entries to the primary oplog, schedule archive and immediately drop the oplog let archive_result = { - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; for entry in &entries { oplog.add(entry.clone()).await; } @@ -868,7 +1127,10 @@ async fn scheduled_archive_impl(use_blob: bool) { tokio::time::sleep(Duration::from_secs(2)).await; let primary_length = primary_oplog_service - .open(&owned_worker_id) + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) .await .length() .await; @@ -890,7 +1152,8 @@ async fn scheduled_archive_impl(use_blob: bool) { // Calling archive again let archive_result2 = { - let oplog = oplog_service.open(&owned_worker_id).await; + let last_oplog_index = oplog_service.get_last_index(&owned_worker_id).await; + let oplog = oplog_service.open(&owned_worker_id, last_oplog_index).await; let result = MultiLayerOplog::try_archive(&oplog).await; drop(oplog); result @@ -899,7 +1162,10 @@ async fn scheduled_archive_impl(use_blob: bool) { tokio::time::sleep(Duration::from_secs(2)).await; let primary_length = primary_oplog_service - .open(&owned_worker_id) + .open( + &owned_worker_id, + primary_oplog_service.get_last_index(&owned_worker_id).await, + ) .await .length() .await; diff --git a/golem-worker-executor-base/src/services/scheduler.rs b/golem-worker-executor-base/src/services/scheduler.rs index 627d545bda..2c0b69e685 100644 --- a/golem-worker-executor-base/src/services/scheduler.rs +++ b/golem-worker-executor-base/src/services/scheduler.rs @@ -159,7 +159,10 @@ impl SchedulerServiceDefault { let current_last_index = self.oplog_service.get_last_index(&owned_worker_id).await; if current_last_index == last_oplog_index { - let oplog = self.oplog_service.open(&owned_worker_id).await; + let oplog = self + .oplog_service + .open(&owned_worker_id, last_oplog_index) + .await; if let Some(more) = MultiLayerOplog::try_archive(&oplog).await { if more { self.schedule( diff --git a/golem-worker-executor-base/src/services/worker.rs b/golem-worker-executor-base/src/services/worker.rs index 8b2e57eadb..23176fdb9c 100644 --- a/golem-worker-executor-base/src/services/worker.rs +++ b/golem-worker-executor-base/src/services/worker.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use golem_common::model::oplog::{OplogEntry, OplogIndex}; use golem_common::model::{ - ComponentType, OwnedWorkerId, ShardId, WorkerId, WorkerMetadata, WorkerStatus, + ComponentType, OwnedWorkerId, ShardId, Timestamp, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; -use tracing::debug; +use tracing::{debug, warn}; use crate::error::GolemError; use crate::metrics::workers::record_worker_call; @@ -228,7 +228,33 @@ impl WorkerService for DefaultWorkerService { Some(details) } Some((_, entry)) => { - panic!("Unexpected initial oplog entry for worker: {entry:?}") + // This should never happen, but there were some issues previously causing a corrupt oplog + // leading to this state. + // + // There is no point in panicking and restarting the executor here, as the corrupt oplog + // will most likely remain as it is. + // + // So to save the executor's state we return a "fake" failed worker metadata. + + warn!( + worker_id = owned_worker_id.to_string(), + oplog_entry = format!("{entry:?}"), + "Unexpected initial oplog entry found, returning fake failed worker metadata" + ); + let last_oplog_idx = self.oplog_service.get_last_index(owned_worker_id).await; + Some(WorkerMetadata { + worker_id: owned_worker_id.worker_id(), + args: vec![], + env: vec![], + account_id: owned_worker_id.account_id(), + created_at: Timestamp::now_utc(), + parent: None, + last_known_status: WorkerStatusRecord { + status: WorkerStatus::Failed, + oplog_idx: last_oplog_idx, + ..WorkerStatusRecord::default() + }, + }) } } } diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index 0285bdfe5d..10cf4faa31 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -173,7 +173,11 @@ impl Worker { parent, ) .await?; - let oplog = deps.oplog_service().open(&owned_worker_id).await; + let last_oplog_index = deps.oplog_service().get_last_index(&owned_worker_id).await; + let oplog = deps + .oplog_service() + .open(&owned_worker_id, last_oplog_index) + .await; let initial_pending_invocations = worker_metadata .last_known_status