Skip to content

Commit

Permalink
Tests and fixes for oplog corruption bug (#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo authored Sep 23, 2024
1 parent a567b23 commit 08eedaa
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 32 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ futures-core = "0.3.29"
futures-util = "0.3.29"
git-version = "0.3.9"
golem-wasm-ast = "1.0.0"
golem-wasm-rpc = { version = "1.0.2", default-features = false, features = [
golem-wasm-rpc = { version = "1.0.3", default-features = false, features = [
"host",
] }
http = "1.0.0" # keep in sync with wasmtime
Expand Down
2 changes: 1 addition & 1 deletion golem-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ futures-util = { workspace = true }
golem-examples = "1.0.5"
golem-wasm-ast = { workspace = true }
golem-wasm-rpc = { workspace = true }
golem-wasm-rpc-stubgen = { version = "1.0.2", optional = true }
golem-wasm-rpc-stubgen = { version = "1.0.3", optional = true }
h2 = "0.3.24"
http = { workspace = true }
humansize = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion golem-worker-executor-base/src/services/oplog/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl OplogService for OplogServiceMock {
unimplemented!()
}

async fn open(&self, _owned_worker_id: &OwnedWorkerId) -> Arc<dyn Oplog + Send + Sync> {
async fn open(
&self,
_owned_worker_id: &OwnedWorkerId,
_last_oplog_index: OplogIndex,
) -> Arc<dyn Oplog + Send + Sync> {
unimplemented!()
}

Expand Down
7 changes: 5 additions & 2 deletions golem-worker-executor-base/src/services/oplog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ pub trait OplogService: Debug {
owned_worker_id: &OwnedWorkerId,
initial_entry: OplogEntry,
) -> Arc<dyn Oplog + Send + Sync + 'static>;
async fn open(&self, owned_worker_id: &OwnedWorkerId)
-> Arc<dyn Oplog + Send + Sync + 'static>;
async fn open(
&self,
owned_worker_id: &OwnedWorkerId,
last_oplog_index: OplogIndex,
) -> Arc<dyn Oplog + Send + Sync + 'static>;

async fn get_last_index(&self, owned_worker_id: &OwnedWorkerId) -> OplogIndex;

Expand Down
16 changes: 14 additions & 2 deletions golem-worker-executor-base/src/services/oplog/multilayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct CreateOplogConstructor {
initial_entry: Option<OplogEntry>,
primary: Arc<dyn OplogService + Send + Sync>,
service: MultiLayerOplogService,
last_oplog_index: OplogIndex,
}

impl CreateOplogConstructor {
Expand All @@ -156,12 +157,14 @@ impl CreateOplogConstructor {
initial_entry: Option<OplogEntry>,
primary: Arc<dyn OplogService + Send + Sync>,
service: MultiLayerOplogService,
last_oplog_index: OplogIndex,
) -> Self {
Self {
owned_worker_id,
initial_entry,
primary,
service,
last_oplog_index,
}
}
}
Expand All @@ -177,7 +180,9 @@ impl OplogConstructor for CreateOplogConstructor {
.create(&self.owned_worker_id, initial_entry)
.await
} else {
self.primary.open(&self.owned_worker_id).await
self.primary
.open(&self.owned_worker_id, self.last_oplog_index)
.await
};
Arc::new(MultiLayerOplog::new(self.owned_worker_id, primary, self.service, close).await)
}
Expand All @@ -198,12 +203,18 @@ impl OplogService for MultiLayerOplogService {
Some(initial_entry),
self.primary.clone(),
self.clone(),
OplogIndex::INITIAL,
),
)
.await
}

async fn open(&self, owned_worker_id: &OwnedWorkerId) -> Arc<dyn Oplog + Send + Sync> {
async fn open(
&self,
owned_worker_id: &OwnedWorkerId,
last_oplog_index: OplogIndex,
) -> Arc<dyn Oplog + Send + Sync> {
debug!("MultiLayerOplogService::open {owned_worker_id}");
self.oplogs
.get_or_open(
&owned_worker_id.worker_id,
Expand All @@ -212,6 +223,7 @@ impl OplogService for MultiLayerOplogService {
None,
self.primary.clone(),
self.clone(),
last_oplog_index,
),
)
.await
Expand Down
9 changes: 6 additions & 3 deletions golem-worker-executor-base/src/services/oplog/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ impl OplogService for PrimaryOplogService {
)
});

self.open(owned_worker_id).await
self.open(owned_worker_id, OplogIndex::INITIAL).await
}

async fn open(&self, owned_worker_id: &OwnedWorkerId) -> Arc<dyn Oplog + Send + Sync> {
async fn open(
&self,
owned_worker_id: &OwnedWorkerId,
last_oplog_index: OplogIndex,
) -> Arc<dyn Oplog + Send + Sync> {
record_oplog_call("open");

let key = Self::oplog_key(&owned_worker_id.worker_id);
let last_oplog_index = self.get_last_index(owned_worker_id).await;

self.oplogs
.get_or_open(
Expand Down
Loading

0 comments on commit 08eedaa

Please sign in to comment.