Skip to content

Commit

Permalink
Refactoring the Durability construct (#1097)
Browse files Browse the repository at this point in the history
* Refactoring the Durability construct, WIP

* Refactoring continues

* Refactoring continues

* Refactoring continues

* Finished initial refactoring

* Fix
  • Loading branch information
vigoo authored Jan 9, 2025
1 parent d1a711c commit 8e828fd
Show file tree
Hide file tree
Showing 21 changed files with 1,221 additions and 1,361 deletions.
295 changes: 173 additions & 122 deletions golem-worker-executor-base/src/durable_host/blobstore/container.rs

Large diffs are not rendered by default.

227 changes: 125 additions & 102 deletions golem-worker-executor-base/src/durable_host/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ pub mod container;
pub mod types;

use async_trait::async_trait;
use futures_util::TryFutureExt;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime::component::Resource;
use wasmtime_wasi::WasiView;

use crate::durable_host::blobstore::types::ContainerEntry;
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::preview2::wasi::blobstore::blobstore::{
Container, ContainerName, Error, Host, ObjectId,
};
Expand All @@ -35,32 +35,26 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "create_container");
let account_id = self.state.owned_worker_id.account_id();
let name_clone = name.clone();
let result: Result<u64, anyhow::Error> =
Durability::<Ctx, String, u64, SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::create_container",
name.clone(),
|ctx| {
Box::pin(async move {
let _ = ctx
.state
.blob_store_service
.create_container(account_id.clone(), name_clone.clone())
.await?;
Ok(ctx
.state
.blob_store_service
.get_container(account_id, name_clone)
.await?
.unwrap())
})
},
)
.await;
let durability = Durability::<Ctx, u64, SerializableError>::new(
self,
"golem blobstore::blobstore",
"create_container",
WrappedFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let svc = self.state.blob_store_service.clone();
let result = svc
.create_container(account_id.clone(), name.clone())
.and_then(|_| svc.get_container(account_id, name.clone()))
.await
.map(|r| r.unwrap());
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(created_at) => {
let container = self
Expand All @@ -77,20 +71,25 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "get_container");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, Option<u64>, SerializableError>::wrap(
let durability = Durability::<Ctx, Option<u64>, SerializableError>::new(
self,
"golem blobstore::blobstore",
"get_container",
WrappedFunctionType::ReadRemote,
"golem blobstore::blobstore::get_container",
name.clone(),
|ctx| {
ctx.state
.blob_store_service
.get_container(account_id, name.clone())
},
)
.await;
.await?;
let result = if durability.is_live() {
let result = self
.state
.blob_store_service
.get_container(account_id, name.clone())
.await;
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(Some(created_at)) => {
let container = self
Expand All @@ -105,20 +104,25 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
}

async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "delete_container");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, (), SerializableError>::wrap(
let durability = Durability::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"delete_container",
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::delete_container",
name.clone(),
|ctx| {
ctx.state
.blob_store_service
.delete_container(account_id, name)
},
)
.await;
.await?;
let result = if durability.is_live() {
let result = self
.state
.blob_store_service
.delete_container(account_id, name.clone())
.await;
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand All @@ -129,20 +133,25 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<bool, Error>> {
record_host_function_call("blobstore::blobstore", "container_exists");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, bool, SerializableError>::wrap(
let durability = Durability::<Ctx, bool, SerializableError>::new(
self,
"golem blobstore::blobstore",
"container_exists",
WrappedFunctionType::ReadRemote,
"golem blobstore::blobstore::container_exists",
name.clone(),
|ctx| {
ctx.state
.blob_store_service
.container_exists(account_id, name)
},
)
.await;
.await?;
let result = if durability.is_live() {
let result = self
.state
.blob_store_service
.container_exists(account_id, name.clone())
.await;
durability.persist(self, name, result).await
} else {
durability.replay(self).await
};

match result {
Ok(exists) => Ok(Ok(exists)),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand All @@ -154,30 +163,37 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
src: ObjectId,
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "copy_object");
let account_id = self.state.owned_worker_id.account_id();
let result =
Durability::<Ctx, (String, String, String, String), (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::copy_object",
(
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
),
|ctx| {
ctx.state.blob_store_service.copy_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
},
)
.await;
let durability = Durability::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"copy_object",
WrappedFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let input = (
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
);
let result = self
.state
.blob_store_service
.copy_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
.await;
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand All @@ -189,30 +205,37 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
src: ObjectId,
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "move_object");
let account_id = self.state.owned_worker_id.account_id();
let result =
Durability::<Ctx, (String, String, String, String), (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::move_object",
(
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
),
|ctx| {
ctx.state.blob_store_service.move_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
},
)
.await;
let durability = Durability::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"move_object",
WrappedFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let input = (
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
);
let result = self
.state
.blob_store_service
.move_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
.await;
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand Down
52 changes: 33 additions & 19 deletions golem-worker-executor-base/src/durable_host/cli/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,60 @@ use async_trait::async_trait;

use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::cli::environment::Host;

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn get_environment(&mut self) -> anyhow::Result<Vec<(String, String)>> {
record_host_function_call("cli::environment", "get_environment");
Durability::<Ctx, (), Vec<(String, String)>, SerializableError>::wrap(
let durability = Durability::<Ctx, Vec<(String, String)>, SerializableError>::new(
self,
"golem_environment",
"get_environment",
WrappedFunctionType::ReadLocal,
"golem_environment::get_environment",
(),
|ctx| Box::pin(async { Host::get_environment(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::get_environment(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn get_arguments(&mut self) -> anyhow::Result<Vec<String>> {
record_host_function_call("cli::environment", "get_arguments");
Durability::<Ctx, (), Vec<String>, SerializableError>::wrap(
let durability = Durability::<Ctx, Vec<String>, SerializableError>::new(
self,
"golem_environment",
"get_arguments",
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments",
(),
|ctx| Box::pin(async { Host::get_arguments(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::get_arguments(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn initial_cwd(&mut self) -> anyhow::Result<Option<String>> {
record_host_function_call("cli::environment", "initial_cwd");
Durability::<Ctx, (), Option<String>, SerializableError>::wrap(
let durability = Durability::<Ctx, Option<String>, SerializableError>::new(
self,
"golem_environment",
"get_arguments", // TODO: fix in 2.0 - for backward compatibility with Golem 1.0
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments", // NOTE: for backward compatibility with Golem 1.0
(),
|ctx| Box::pin(async { Host::initial_cwd(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::initial_cwd(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}
}
Loading

0 comments on commit 8e828fd

Please sign in to comment.