Skip to content

Commit

Permalink
Refactoring continues
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Jan 9, 2025
1 parent e140fa4 commit 3600d31
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 260 deletions.
200 changes: 105 additions & 95 deletions golem-worker-executor-base/src/durable_host/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ pub mod container;
pub mod types;

use async_trait::async_trait;
use futures_util::TryFutureExt;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime::component::Resource;
use wasmtime_wasi::WasiView;

use crate::durable_host::blobstore::types::ContainerEntry;
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::preview2::wasi::blobstore::blobstore::{
Container, ContainerName, Error, Host, ObjectId,
Expand All @@ -35,32 +36,24 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "create_container");
let account_id = self.state.owned_worker_id.account_id();
let name_clone = name.clone();
let result: Result<u64, anyhow::Error> =
Durability::<Ctx, String, u64, SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::create_container",
name.clone(),
|ctx| {
Box::pin(async move {
let _ = ctx
.state
.blob_store_service
.create_container(account_id.clone(), name_clone.clone())
.await?;
Ok(ctx
.state
.blob_store_service
.get_container(account_id, name_clone)
.await?
.unwrap())
})
},
)
.await;
let durability = Durability2::<Ctx, u64, SerializableError>::new(
self,
"golem blobstore::blobstore",
"create_container",
WrappedFunctionType::WriteRemote
).await?;
let result = if durability.is_live() {
let svc = self.state.blob_store_service.clone();
let result =
svc.create_container(account_id.clone(), name.clone())
.and_then(|_| svc.get_container(account_id, name.clone())).await
.map(|r| r.unwrap());
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(created_at) => {
let container = self
Expand All @@ -77,20 +70,20 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "get_container");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, Option<u64>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Option<u64>, SerializableError>::new(
self,
"golem blobstore::blobstore",
"get_container",
WrappedFunctionType::ReadRemote,
"golem blobstore::blobstore::get_container",
name.clone(),
|ctx| {
ctx.state
.blob_store_service
.get_container(account_id, name.clone())
},
)
.await;
).await?;
let result = if durability.is_live() {
let result = self.state.blob_store_service.get_container(account_id, name.clone()).await;
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(Some(created_at)) => {
let container = self
Expand All @@ -105,20 +98,24 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
}

async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "delete_container");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, (), SerializableError>::wrap(
let durability = Durability2::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"delete_container",
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::delete_container",
name.clone(),
|ctx| {
ctx.state
.blob_store_service
.delete_container(account_id, name)
},
)
.await;
).await?;
let result = if durability.is_live() {
let result = self
.state
.blob_store_service
.delete_container(account_id, name.clone())
.await;
durability.persist(self, name.clone(), result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand All @@ -129,7 +126,6 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
&mut self,
name: ContainerName,
) -> anyhow::Result<Result<bool, Error>> {
record_host_function_call("blobstore::blobstore", "container_exists");
let account_id = self.state.owned_worker_id.account_id();
let result = Durability::<Ctx, String, bool, SerializableError>::wrap(
self,
Expand All @@ -154,30 +150,37 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
src: ObjectId,
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "copy_object");
let account_id = self.state.owned_worker_id.account_id();
let result =
Durability::<Ctx, (String, String, String, String), (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::copy_object",
(
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
),
|ctx| {
ctx.state.blob_store_service.copy_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
},
)
.await;
let durability = Durability2::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"copy_object",
WrappedFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let input = (
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
);
let result = self
.state
.blob_store_service
.copy_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
.await;
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand All @@ -189,30 +192,37 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
src: ObjectId,
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "move_object");
let account_id = self.state.owned_worker_id.account_id();
let result =
Durability::<Ctx, (String, String, String, String), (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::move_object",
(
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
),
|ctx| {
ctx.state.blob_store_service.move_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
},
)
.await;
let durability = Durability2::<Ctx, (), SerializableError>::new(
self,
"golem blobstore::blobstore",
"move_object",
WrappedFunctionType::WriteRemote,
)
.await?;
let result = if durability.is_live() {
let input = (
src.container.clone(),
src.object.clone(),
dest.container.clone(),
dest.object.clone(),
);
let result = self
.state
.blob_store_service
.move_object(
account_id,
src.container,
src.object,
dest.container,
dest.object,
)
.await;
durability.persist(self, input, result).await
} else {
durability.replay(self).await
};

match result {
Ok(_) => Ok(Ok(())),
Err(e) => Ok(Err(format!("{:?}", e))),
Expand Down
Loading

0 comments on commit 3600d31

Please sign in to comment.