From 8e828fd91d7a51d5ab25a1bc8df107d335401386 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Thu, 9 Jan 2025 13:20:08 +0100 Subject: [PATCH] Refactoring the Durability construct (#1097) * Refactoring the Durability construct, WIP * Refactoring continues * Refactoring continues * Refactoring continues * Finished initial refactoring * Fix --- .../src/durable_host/blobstore/container.rs | 295 ++++---- .../src/durable_host/blobstore/mod.rs | 227 +++--- .../src/durable_host/cli/environment.rs | 52 +- .../durable_host/clocks/monotonic_clock.rs | 52 +- .../src/durable_host/clocks/wall_clock.rs | 35 +- .../src/durable_host/durability.rs | 674 ++++-------------- .../src/durable_host/filesystem/preopens.rs | 64 +- .../src/durable_host/filesystem/types.rs | 153 ++-- .../src/durable_host/golem/mod.rs | 146 ++-- .../src/durable_host/http/types.rs | 87 ++- .../src/durable_host/io/poll.rs | 23 +- .../src/durable_host/io/streams.rs | 129 ++-- .../src/durable_host/keyvalue/eventual.rs | 100 ++- .../durable_host/keyvalue/eventual_batch.rs | 113 +-- .../src/durable_host/mod.rs | 23 +- .../src/durable_host/random/insecure.rs | 39 +- .../src/durable_host/random/insecure_seed.rs | 17 +- .../src/durable_host/random/random.rs | 35 +- .../durable_host/sockets/ip_name_lookup.rs | 25 +- .../src/durable_host/wasm_rpc/mod.rs | 266 +++---- .../src/services/oplog/mod.rs | 27 +- 21 files changed, 1221 insertions(+), 1361 deletions(-) diff --git a/golem-worker-executor-base/src/durable_host/blobstore/container.rs b/golem-worker-executor-base/src/durable_host/blobstore/container.rs index cd5ccb56cb..a88767d8fe 100644 --- a/golem-worker-executor-base/src/durable_host/blobstore/container.rs +++ b/golem-worker-executor-base/src/durable_host/blobstore/container.rs @@ -67,31 +67,33 @@ impl HostContainer for DurableWorkerCtx { start: u64, end: u64, ) -> anyhow::Result, Error>> { - record_host_function_call("blobstore::container::container", "get_data"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::, SerializableError>::new( + self, + "golem blobstore::container", + "get_data", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = - Durability::, SerializableError>::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::get_data", - (container_name.clone(), name.clone(), start, end), - |ctx| { - ctx.state.blob_store_service.get_data( - account_id, - container_name, - name, - start, - end, - ) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .get_data(account_id, container_name.clone(), name.clone(), start, end) + .await; + durability + .persist(self, (container_name, name, start, end), result) + .await + } else { + durability.replay(self).await + }; match result { Ok(get_data) => { let incoming_value = self @@ -110,9 +112,15 @@ impl HostContainer for DurableWorkerCtx { name: ObjectName, data: Resource, ) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "write_data"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::::new( + self, + "golem blobstore::container", + "write_data", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() @@ -123,18 +131,21 @@ impl HostContainer for DurableWorkerCtx { .table() .get::(&data) .map(|outgoing_value_entry| outgoing_value_entry.body.read().unwrap().clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::write_data", - (container_name.clone(), name.clone(), data.len() as u64), - |ctx| { - ctx.state - .blob_store_service - .write_data(account_id, container_name, name, data) - }, - ) - .await; + + let result = if durability.is_live() { + let len = data.len() as u64; + let result = self + .state + .blob_store_service + .write_data(account_id, container_name.clone(), name.clone(), data) + .await; + durability + .persist(self, (container_name, name, len), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -145,26 +156,32 @@ impl HostContainer for DurableWorkerCtx { &mut self, container: Resource, ) -> anyhow::Result, Error>> { - record_host_function_call("blobstore::container::container", "list_objects"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::, SerializableError>::new( + self, + "golem blobstore::container", + "list_object", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::, SerializableError>::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::list_objects", - container_name.clone(), - |ctx| { - ctx.state - .blob_store_service - .list_objects(account_id, container_name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .list_objects(account_id, container_name.clone()) + .await; + durability.persist(self, container_name, result).await + } else { + durability.replay(self).await + }; + match result { Ok(list_objects) => { let stream_object_names = self @@ -182,26 +199,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "delete_object"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::::new( + self, + "golem blobstore::container", + "delete_object", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::delete_object", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .delete_object(account_id, container_name, name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .delete_object(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -213,26 +238,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, names: Vec, ) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "delete_objects"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::::new( + self, + "golem blobstore::container", + "delete_objects", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::), (), SerializableError>::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::delete_objects", - (container_name.clone(), names.clone()), - |ctx| { - ctx.state - .blob_store_service - .delete_objects(account_id, container_name, names) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .delete_objects(account_id, container_name.clone(), names.clone()) + .await; + durability + .persist(self, (container_name, names), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -244,26 +277,34 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "has_object"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::::new( + self, + "golem blobstore::container", + "has_object", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::has_object", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .has_object(account_id, container_name, name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .has_object(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(has_object) => Ok(Ok(has_object)), Err(e) => Ok(Err(format!("{:?}", e))), @@ -275,31 +316,35 @@ impl HostContainer for DurableWorkerCtx { container: Resource, name: ObjectName, ) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "object_info"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = + Durability::::new( + self, + "golem blobstore::container", + "object_info", + WrappedFunctionType::ReadRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - let result = Durability::< - Ctx, - (String, String), - crate::services::blob_store::ObjectMetadata, - SerializableError, - >::wrap( - self, - WrappedFunctionType::ReadRemote, - "golem blobstore::container::object_info", - (container_name.clone(), name.clone()), - |ctx| { - ctx.state - .blob_store_service - .object_info(account_id, container_name, name) - }, - ) - .await; + + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .object_info(account_id, container_name.clone(), name.clone()) + .await; + durability + .persist(self, (container_name, name), result) + .await + } else { + durability.replay(self).await + }; + match result { Ok(object_info) => { let object_info = ObjectMetadata { @@ -315,26 +360,32 @@ impl HostContainer for DurableWorkerCtx { } async fn clear(&mut self, container: Resource) -> anyhow::Result> { - record_host_function_call("blobstore::container::container", "clear"); - let account_id = self.state.owned_worker_id.account_id(); + let durability = Durability::::new( + self, + "golem blobstore::container", + "clear", + WrappedFunctionType::WriteRemote, + ) + .await?; + let account_id = self.state.owned_worker_id.account_id(); let container_name = self .as_wasi_view() .table() .get::(&container) .map(|container_entry| container_entry.name.clone())?; - Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::container::clear", - container_name.clone(), - |ctx| { - ctx.state - .blob_store_service - .clear(account_id, container_name) - }, - ) - .await?; + + if durability.is_live() { + let result = self + .state + .blob_store_service + .clear(account_id, container_name.clone()) + .await; + durability.persist(self, container_name, result).await + } else { + durability.replay(self).await + }?; + Ok(Ok(())) } diff --git a/golem-worker-executor-base/src/durable_host/blobstore/mod.rs b/golem-worker-executor-base/src/durable_host/blobstore/mod.rs index 8120e92425..cea9ae06ed 100644 --- a/golem-worker-executor-base/src/durable_host/blobstore/mod.rs +++ b/golem-worker-executor-base/src/durable_host/blobstore/mod.rs @@ -16,6 +16,7 @@ pub mod container; pub mod types; use async_trait::async_trait; +use futures_util::TryFutureExt; use golem_common::model::oplog::WrappedFunctionType; use wasmtime::component::Resource; use wasmtime_wasi::WasiView; @@ -23,7 +24,6 @@ use wasmtime_wasi::WasiView; use crate::durable_host::blobstore::types::ContainerEntry; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::preview2::wasi::blobstore::blobstore::{ Container, ContainerName, Error, Host, ObjectId, }; @@ -35,32 +35,26 @@ impl Host for DurableWorkerCtx { &mut self, name: ContainerName, ) -> anyhow::Result, Error>> { - record_host_function_call("blobstore::blobstore", "create_container"); let account_id = self.state.owned_worker_id.account_id(); - let name_clone = name.clone(); - let result: Result = - Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::blobstore::create_container", - name.clone(), - |ctx| { - Box::pin(async move { - let _ = ctx - .state - .blob_store_service - .create_container(account_id.clone(), name_clone.clone()) - .await?; - Ok(ctx - .state - .blob_store_service - .get_container(account_id, name_clone) - .await? - .unwrap()) - }) - }, - ) - .await; + let durability = Durability::::new( + self, + "golem blobstore::blobstore", + "create_container", + WrappedFunctionType::WriteRemote, + ) + .await?; + let result = if durability.is_live() { + let svc = self.state.blob_store_service.clone(); + let result = svc + .create_container(account_id.clone(), name.clone()) + .and_then(|_| svc.get_container(account_id, name.clone())) + .await + .map(|r| r.unwrap()); + durability.persist(self, name.clone(), result).await + } else { + durability.replay(self).await + }; + match result { Ok(created_at) => { let container = self @@ -77,20 +71,25 @@ impl Host for DurableWorkerCtx { &mut self, name: ContainerName, ) -> anyhow::Result, Error>> { - record_host_function_call("blobstore::blobstore", "get_container"); let account_id = self.state.owned_worker_id.account_id(); - let result = Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem blobstore::blobstore", + "get_container", WrappedFunctionType::ReadRemote, - "golem blobstore::blobstore::get_container", - name.clone(), - |ctx| { - ctx.state - .blob_store_service - .get_container(account_id, name.clone()) - }, ) - .await; + .await?; + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .get_container(account_id, name.clone()) + .await; + durability.persist(self, name.clone(), result).await + } else { + durability.replay(self).await + }; + match result { Ok(Some(created_at)) => { let container = self @@ -105,20 +104,25 @@ impl Host for DurableWorkerCtx { } async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result> { - record_host_function_call("blobstore::blobstore", "delete_container"); let account_id = self.state.owned_worker_id.account_id(); - let result = Durability::::wrap( + let durability = Durability::::new( self, + "golem blobstore::blobstore", + "delete_container", WrappedFunctionType::WriteRemote, - "golem blobstore::blobstore::delete_container", - name.clone(), - |ctx| { - ctx.state - .blob_store_service - .delete_container(account_id, name) - }, ) - .await; + .await?; + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .delete_container(account_id, name.clone()) + .await; + durability.persist(self, name.clone(), result).await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -129,20 +133,25 @@ impl Host for DurableWorkerCtx { &mut self, name: ContainerName, ) -> anyhow::Result> { - record_host_function_call("blobstore::blobstore", "container_exists"); let account_id = self.state.owned_worker_id.account_id(); - let result = Durability::::wrap( + let durability = Durability::::new( self, + "golem blobstore::blobstore", + "container_exists", WrappedFunctionType::ReadRemote, - "golem blobstore::blobstore::container_exists", - name.clone(), - |ctx| { - ctx.state - .blob_store_service - .container_exists(account_id, name) - }, ) - .await; + .await?; + let result = if durability.is_live() { + let result = self + .state + .blob_store_service + .container_exists(account_id, name.clone()) + .await; + durability.persist(self, name, result).await + } else { + durability.replay(self).await + }; + match result { Ok(exists) => Ok(Ok(exists)), Err(e) => Ok(Err(format!("{:?}", e))), @@ -154,30 +163,37 @@ impl Host for DurableWorkerCtx { src: ObjectId, dest: ObjectId, ) -> anyhow::Result> { - record_host_function_call("blobstore::blobstore", "copy_object"); let account_id = self.state.owned_worker_id.account_id(); - let result = - Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::blobstore::copy_object", - ( - src.container.clone(), - src.object.clone(), - dest.container.clone(), - dest.object.clone(), - ), - |ctx| { - ctx.state.blob_store_service.copy_object( - account_id, - src.container, - src.object, - dest.container, - dest.object, - ) - }, - ) - .await; + let durability = Durability::::new( + self, + "golem blobstore::blobstore", + "copy_object", + WrappedFunctionType::WriteRemote, + ) + .await?; + let result = if durability.is_live() { + let input = ( + src.container.clone(), + src.object.clone(), + dest.container.clone(), + dest.object.clone(), + ); + let result = self + .state + .blob_store_service + .copy_object( + account_id, + src.container, + src.object, + dest.container, + dest.object, + ) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), @@ -189,30 +205,37 @@ impl Host for DurableWorkerCtx { src: ObjectId, dest: ObjectId, ) -> anyhow::Result> { - record_host_function_call("blobstore::blobstore", "move_object"); let account_id = self.state.owned_worker_id.account_id(); - let result = - Durability::::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem blobstore::blobstore::move_object", - ( - src.container.clone(), - src.object.clone(), - dest.container.clone(), - dest.object.clone(), - ), - |ctx| { - ctx.state.blob_store_service.move_object( - account_id, - src.container, - src.object, - dest.container, - dest.object, - ) - }, - ) - .await; + let durability = Durability::::new( + self, + "golem blobstore::blobstore", + "move_object", + WrappedFunctionType::WriteRemote, + ) + .await?; + let result = if durability.is_live() { + let input = ( + src.container.clone(), + src.object.clone(), + dest.container.clone(), + dest.object.clone(), + ); + let result = self + .state + .blob_store_service + .move_object( + account_id, + src.container, + src.object, + dest.container, + dest.object, + ) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(_) => Ok(Ok(())), Err(e) => Ok(Err(format!("{:?}", e))), diff --git a/golem-worker-executor-base/src/durable_host/cli/environment.rs b/golem-worker-executor-base/src/durable_host/cli/environment.rs index 6d23699d61..a0bdbbcfa0 100644 --- a/golem-worker-executor-base/src/durable_host/cli/environment.rs +++ b/golem-worker-executor-base/src/durable_host/cli/environment.rs @@ -16,7 +16,6 @@ use async_trait::async_trait; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::cli::environment::Host; @@ -24,38 +23,53 @@ use wasmtime_wasi::bindings::cli::environment::Host; #[async_trait] impl Host for DurableWorkerCtx { async fn get_environment(&mut self) -> anyhow::Result> { - record_host_function_call("cli::environment", "get_environment"); - Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem_environment", + "get_environment", WrappedFunctionType::ReadLocal, - "golem_environment::get_environment", - (), - |ctx| Box::pin(async { Host::get_environment(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::get_environment(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn get_arguments(&mut self) -> anyhow::Result> { - record_host_function_call("cli::environment", "get_arguments"); - Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem_environment", + "get_arguments", WrappedFunctionType::ReadLocal, - "golem_environment::get_arguments", - (), - |ctx| Box::pin(async { Host::get_arguments(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::get_arguments(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn initial_cwd(&mut self) -> anyhow::Result> { - record_host_function_call("cli::environment", "initial_cwd"); - Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem_environment", + "get_arguments", // TODO: fix in 2.0 - for backward compatibility with Golem 1.0 WrappedFunctionType::ReadLocal, - "golem_environment::get_arguments", // NOTE: for backward compatibility with Golem 1.0 - (), - |ctx| Box::pin(async { Host::initial_cwd(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::initial_cwd(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs b/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs index f537922773..d3ef9e267e 100644 --- a/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs +++ b/golem-worker-executor-base/src/durable_host/clocks/monotonic_clock.rs @@ -26,27 +26,37 @@ use wasmtime_wasi::bindings::clocks::monotonic_clock::{Duration, Host, Instant, #[async_trait] impl Host for DurableWorkerCtx { async fn now(&mut self) -> anyhow::Result { - record_host_function_call("clocks::monotonic_clock", "now"); - Durability::::wrap( + let durability = Durability::::new( self, + "monotonic_clock", + "now", WrappedFunctionType::ReadLocal, - "monotonic_clock::now", - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn resolution(&mut self) -> anyhow::Result { - record_host_function_call("clocks::monotonic_clock", "resolution"); - Durability::::wrap( + let durability = Durability::::new( self, + "monotonic_clock", + "resolution", WrappedFunctionType::ReadLocal, - "monotonic_clock::resolution", - (), - |ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::resolution(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result> { @@ -55,15 +65,23 @@ impl Host for DurableWorkerCtx { } async fn subscribe_duration(&mut self, when: Duration) -> anyhow::Result> { - record_host_function_call("clocks::monotonic_clock", "subscribe_duration"); - let now = Durability::::wrap( + let durability = Durability::::new( self, + "monotonic_clock", + "now", // TODO: fix in 2.0 - should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0 WrappedFunctionType::ReadLocal, - "monotonic_clock::now", // should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0 - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) .await?; + + let now = { + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } + }?; + self.state.oplog.commit(CommitLevel::DurableOnly).await; let when = now.saturating_add(when); Host::subscribe_instant(&mut self.as_wasi_view(), when).await diff --git a/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs b/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs index 31e066f402..78dfb268c5 100644 --- a/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs +++ b/golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs @@ -16,7 +16,6 @@ use async_trait::async_trait; use crate::durable_host::serialized::{SerializableDateTime, SerializableError}; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host}; @@ -24,26 +23,36 @@ use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host}; #[async_trait] impl Host for DurableWorkerCtx { async fn now(&mut self) -> anyhow::Result { - record_host_function_call("clocks::wall_clock", "now"); - Durability::::wrap( + let durability = Durability::::new( self, + "wall_clock", + "now", WrappedFunctionType::ReadLocal, - "wall_clock::now", - (), - |ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::now(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } async fn resolution(&mut self) -> anyhow::Result { - record_host_function_call("clocks::wall_clock", "resolution"); - Durability::::wrap( + let durability = Durability::::new( self, + "wall_clock", + "resolution", WrappedFunctionType::ReadLocal, - "wall_clock::resolution", - (), - |ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::resolution(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/durability.rs b/golem-worker-executor-base/src/durable_host/durability.rs index a05885731d..4f3e4bb72d 100644 --- a/golem-worker-executor-base/src/durable_host/durability.rs +++ b/golem-worker-executor-base/src/durable_host/durability.rs @@ -14,570 +14,210 @@ use crate::durable_host::DurableWorkerCtx; use crate::error::GolemError; +use crate::metrics::wasm::record_host_function_call; use crate::model::PersistenceLevel; -use crate::services::oplog::{CommitLevel, Oplog, OplogOps}; +use crate::services::oplog::{CommitLevel, OplogOps}; use crate::workerctx::WorkerCtx; -use async_trait::async_trait; use bincode::{Decode, Encode}; +use bytes::Bytes; use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType}; +use golem_common::serialization::try_deserialize; use std::fmt::Debug; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; +use std::marker::PhantomData; use tracing::error; -#[async_trait] -pub trait Durability { - /// A version of `wrap` allowing conversion between the success value and the serialized value within the mutable worker context. - /// - /// This can be used to fetch/register resources. - /// - /// Live mode: - /// value|error <- function() - /// serialized|serialized_err <- get_serializable(value) | error.into() - /// write_to_oplog(serialized|serialized_err) - /// return value|error - /// - /// Replay mode: - /// serialized|serialized_err <- read_from_oplog(serialized|serialized_err) - /// value|error <- put_serializable(serialized) | serialized_err.into() - /// return value|error - async fn custom_wrap( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - to_serializable: ToSerializable, - from_serializable: FromSerializable, - ) -> Result - where - Success: Send + Sync, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ToSerializable: - FnOnce(&mut DurableWorkerCtx, &Success) -> Result + Send, - FromSerializable: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - SerializableSuccess, - ) - -> Pin> + 'b + Send>> - + Send - + 'static, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync; - - /// A version of `wrap` allowing conversion between the success value and the serialized value within the mutable worker context. - /// Deserialization from oplog is also fully customizable, which makes this version suitable to implement - /// backward compatibility tricks. - /// - /// Live mode: - /// value|error <- function() - /// serialized|serialized_err <- get_serializable(value) | error.into() - /// write_to_oplog(serialized|serialized_err) - /// return value|error - /// - /// Replay mode: - /// value|error <- load(oplog) - /// return value|error - async fn full_custom_wrap< - Intermediate, - Success, - Err, - AsyncFn, - ToSerializable, - ToResult, - FromSerialized, - >( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - to_serializable: ToSerializable, - to_result: ToResult, - from_serialized: FromSerialized, - ) -> Result - where - Intermediate: Send + Sync, - Success: Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ToSerializable: FnOnce(&mut DurableWorkerCtx, &Intermediate) -> Result - + Send, - ToResult: FnOnce(&mut DurableWorkerCtx, Intermediate) -> Result + Send, - FromSerialized: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - Arc, - &'b OplogEntry, - ) - -> Pin> + 'b + Send>> - + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync; - - /// Wrap a WASI call with durability handling - /// - /// The function checks if the execution is live, and if so performs the function and then - /// saves its results into the oplog. If the execution is not live, it reads the previously - /// saved results from the oplog and returns them. - /// - /// Type parameters: - /// - `AsyncFn`: the async WASI function to perform, expected to return with a Result of `Success` or `Err` - /// - `Success`: The type of the success value returned by the WASI function - /// - `Err`: The type of the error value returned by the WASI function. There need to be a conversion from `GolemError` - /// to `Err` to be able to return internal failures. - /// - `SerializedSuccess`: The type of the success value serialized into the oplog. It has to be encodeable/decodeable - /// and convertable from/to `Success` - /// - `SerializedErr`: The type of the error value serialized into the oplog. It has to be encodeable/decodeable and - /// convertable from/to `Err` - /// - /// Parameters: - /// - `wrapped_function_type`: The type of the wrapped function, it is a combination of being local or remote, and - /// being read or write - /// - `function_name`: The name of the function, used for logging - /// - `function`: The async WASI function to perform - async fn wrap( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - ) -> Result - where - Success: Clone + Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: - Encode + Decode + From + Into + Debug + Send + Sync + 'static, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync - + 'static; - - async fn wrap_conditionally( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - persist: ConditionFn, - ) -> Result - where - Success: Clone + Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ConditionFn: FnOnce(&Result) -> bool + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + From + Into + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync; +pub enum OplogEntryVersion { + V1, + V2, } -#[async_trait] -impl - Durability - for DurableWorkerCtx -{ - async fn custom_wrap( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - to_serializable: ToSerializable, - from_serializable: FromSerializable, - ) -> Result - where - Success: Send + Sync, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ToSerializable: - FnOnce(&mut DurableWorkerCtx, &Success) -> Result + Send, - FromSerializable: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - SerializableSuccess, - ) - -> Pin> + 'b + Send>> - + Send - + 'static, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync, - { - as Durability< - Ctx, - SerializableInput, - SerializableSuccess, - SerializableErr, - >>::full_custom_wrap::( - self, - wrapped_function_type, - function_name, - input, - function, - to_serializable, - |_, result| Ok(result), - |ctx, oplog, entry| { - Box::pin(async move { - let response: Result = - DurableWorkerCtx::::default_load(oplog, entry).await; - match response { - Ok(serialized_success) => { - let success: Success = - from_serializable(ctx, serialized_success).await?; - Ok(success) - } - Err(serialized_err) => Err(serialized_err.into()), - } - }) - }, - ) - .await - } - - async fn full_custom_wrap< - Intermediate, - Success, - Err, - AsyncFn, - ToSerializable, - ToResult, - FromSerialized, - >( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - to_serializable: ToSerializable, - to_result: ToResult, - from_serialized: FromSerialized, - ) -> Result - where - Intermediate: Send + Sync, - Success: Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ToSerializable: FnOnce(&mut DurableWorkerCtx, &Intermediate) -> Result - + Send, - ToResult: FnOnce(&mut DurableWorkerCtx, Intermediate) -> Result + Send, - FromSerialized: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - Arc, - &'b OplogEntry, - ) - -> Pin> + 'b + Send>> - + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync, - { - let begin_index = self - .state - .begin_function(&wrapped_function_type.clone()) - .await?; - if self.state.is_live() || self.state.persistence_level == PersistenceLevel::PersistNothing - { - let intermediate = function(self).await; - let serializable_result: Result = intermediate - .as_ref() - .map_err(|err| err.into()) - .and_then(|result| to_serializable(self, result).map_err(|err| (&err).into())); - - self.write_to_oplog( - &wrapped_function_type, - function_name, - begin_index, - &input, - &serializable_result, - ) - .await?; - - intermediate.and_then(|value| to_result(self, value)) - } else { - let (_, oplog_entry) = crate::get_oplog_entry!( - self.state.replay_state, - OplogEntry::ImportedFunctionInvoked, - OplogEntry::ImportedFunctionInvokedV1 - )?; - DurableWorkerCtx::::validate_oplog_entry(&oplog_entry, function_name)?; - - let oplog = self.state.oplog.clone(); - let result = from_serialized(self, oplog, &oplog_entry).await; +pub struct Durability { + package: &'static str, + function: &'static str, + function_type: WrappedFunctionType, + begin_index: OplogIndex, + is_live: bool, + persistence_level: PersistenceLevel, + _ctx: PhantomData, + _sok: PhantomData, + _serr: PhantomData, +} - self.state - .end_function(&wrapped_function_type, begin_index) - .await?; +impl Durability { + pub async fn new( + ctx: &mut DurableWorkerCtx, + package: &'static str, + function: &'static str, + function_type: WrappedFunctionType, + ) -> Result { + record_host_function_call(package, function); - result - } - } + let begin_index = ctx.state.begin_function(&function_type).await?; - async fn wrap( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - ) -> Result - where - Success: Clone + Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: - Encode + Decode + From + Into + Debug + Send + Sync + 'static, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync - + 'static, - { - as Durability< - Ctx, - SerializableInput, - SerializableSuccess, - SerializableErr, - >>::wrap_conditionally::( - self, - wrapped_function_type, - function_name, - input, + Ok(Self { + package, function, - |_: &Result| true, - ) - .await + function_type, + begin_index, + is_live: ctx.state.is_live(), + persistence_level: ctx.state.persistence_level.clone(), + _ctx: PhantomData, + _sok: PhantomData, + _serr: PhantomData, + }) } - async fn wrap_conditionally( - &mut self, - wrapped_function_type: WrappedFunctionType, - function_name: &str, - input: SerializableInput, - function: AsyncFn, - persist: ConditionFn, - ) -> Result - where - Success: Clone + Send, - Err: From + Send, - AsyncFn: for<'b> FnOnce( - &'b mut DurableWorkerCtx, - ) - -> Pin> + 'b + Send>> - + Send, - ConditionFn: FnOnce(&Result) -> bool + Send, - SerializableInput: Encode + Debug + Send + Sync + 'static, - SerializableSuccess: Encode + Decode + From + Into + Debug + Send + Sync, - SerializableErr: Encode - + Decode - + for<'b> From<&'b Err> - + From - + Into - + Debug - + Send - + Sync, - { - let begin_index = self - .state - .begin_function(&wrapped_function_type.clone()) - .await?; - if self.state.is_live() || self.state.persistence_level == PersistenceLevel::PersistNothing - { - let result = function(self).await; - if persist(&result) { - let serializable_result: Result = result - .as_ref() - .map(|result| result.clone().into()) - .map_err(|err| err.into()); - - self.write_to_oplog( - &wrapped_function_type, - function_name, - begin_index, - &input, - &serializable_result, - ) - .await?; - } - result - } else { - let (_, oplog_entry) = crate::get_oplog_entry!( - self.state.replay_state, - OplogEntry::ImportedFunctionInvoked, - OplogEntry::ImportedFunctionInvokedV1 - )?; - DurableWorkerCtx::::validate_oplog_entry(&oplog_entry, function_name)?; - let response: Result = - DurableWorkerCtx::::default_load(self.state.oplog.clone(), &oplog_entry).await; - - self.state - .end_function(&wrapped_function_type, begin_index) - .await?; - - response - .map(|serialized_success| serialized_success.into()) - .map_err(|serialized_err| serialized_err.into()) - } + pub fn is_live(&self) -> bool { + self.is_live || self.persistence_level == PersistenceLevel::PersistNothing } -} -impl DurableWorkerCtx { - pub async fn default_load( - oplog: Arc, - entry: &OplogEntry, - ) -> Result + pub async fn persist( + &self, + ctx: &mut DurableWorkerCtx, + input: SIn, + result: Result, + ) -> Result where - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode + Decode + Debug + From + Send + Sync, + Ok: Clone, + Err: From + From + Send + Sync, + SIn: Debug + Encode + Send + Sync, + SErr: Debug + Encode + for<'a> From<&'a Err> + From + Send + Sync, + SOk: Debug + Encode + From + Send + Sync, { - oplog - .get_payload_of_entry::>(entry) - .await - .map_err(|err| { - GolemError::unexpected_oplog_entry("ImportedFunctionInvoked payload", err) - })? - .unwrap() - } + let serializable_result: Result = result + .as_ref() + .map(|result| result.clone().into()) + .map_err(|err| err.into()); - pub async fn try_default_load( - oplog: Arc, - entry: &OplogEntry, - ) -> Result, GolemError> - where - SerializableSuccess: Encode + Decode + Debug + Send + Sync, - SerializableErr: Encode + Decode + Debug + From + Send + Sync, - { - oplog - .get_payload_of_entry::>(entry) + self.persist_serializable(ctx, input, serializable_result) .await .map_err(|err| { - GolemError::unexpected_oplog_entry("ImportedFunctionInvoked payload", err) - }) - .map(|result| result.unwrap()) + let err: SErr = err.into(); + let err: Err = err.into(); + err + })?; + result } - async fn write_to_oplog( - &mut self, - wrapped_function_type: &WrappedFunctionType, - function_name: &str, - begin_index: OplogIndex, - serializable_input: &SerializedInput, - serializable_result: &Result, - ) -> Result<(), Err> + pub async fn persist_serializable( + &self, + ctx: &mut DurableWorkerCtx, + input: SIn, + serializable_result: Result, + ) -> Result<(), GolemError> where - Err: Send, - SerializedInput: Encode + Debug + Send + Sync, - SerializedSuccess: Encode + Debug + Send + Sync, - SerializedErr: Encode + Debug + From + Into + Send + Sync, + SIn: Debug + Encode + Send + Sync, + SOk: Debug + Encode + Send + Sync, + SErr: Debug + Encode + Send + Sync, { - if self.state.persistence_level != PersistenceLevel::PersistNothing { - self.state + let function_name = self.function_name(); + if ctx.state.persistence_level != PersistenceLevel::PersistNothing { + ctx.state .oplog .add_imported_function_invoked( function_name.to_string(), - &serializable_input, + &input, &serializable_result, - wrapped_function_type.clone(), + self.function_type.clone(), ) .await .unwrap_or_else(|err| { panic!( "failed to serialize and store function request ({:?}) and response ({:?}): {err}", - serializable_input, + input, serializable_result ) }); - self.state - .end_function(wrapped_function_type, begin_index) - .await - .map_err(|err| Into::::into(err).into())?; - if *wrapped_function_type == WrappedFunctionType::WriteRemote + ctx.state + .end_function(&self.function_type, self.begin_index) + .await?; + if self.function_type == WrappedFunctionType::WriteRemote || matches!( - *wrapped_function_type, + self.function_type, WrappedFunctionType::WriteRemoteBatched(_) ) { - self.state.oplog.commit(CommitLevel::DurableOnly).await; + ctx.state.oplog.commit(CommitLevel::DurableOnly).await; } } Ok(()) } + pub async fn replay_raw( + &self, + ctx: &mut DurableWorkerCtx, + ) -> Result<(Bytes, OplogEntryVersion), GolemError> { + let (_, oplog_entry) = crate::get_oplog_entry!( + ctx.state.replay_state, + OplogEntry::ImportedFunctionInvoked, + OplogEntry::ImportedFunctionInvokedV1 + )?; + + let version = if matches!(oplog_entry, OplogEntry::ImportedFunctionInvokedV1 { .. }) { + OplogEntryVersion::V1 + } else { + OplogEntryVersion::V2 + }; + + let function_name = self.function_name(); + Self::validate_oplog_entry(&oplog_entry, &function_name)?; + + let bytes = ctx + .state + .oplog + .get_raw_payload_of_entry(&oplog_entry) + .await + .map_err(|err| { + GolemError::unexpected_oplog_entry("ImportedFunctionInvoked payload", err) + })? + .unwrap(); + + ctx.state + .end_function(&self.function_type, self.begin_index) + .await?; + + Ok((bytes, version)) + } + + pub async fn replay_serializable( + &self, + ctx: &mut DurableWorkerCtx, + ) -> Result, GolemError> + where + SOk: Decode, + SErr: Decode, + { + let (bytes, _) = self.replay_raw(ctx).await?; + let result: Result = try_deserialize(&bytes) + .map_err(|err| { + GolemError::unexpected_oplog_entry("ImportedFunctionInvoked payload", err) + })? + .expect("Payload is empty"); + Ok(result) + } + + pub async fn replay(&self, ctx: &mut DurableWorkerCtx) -> Result + where + Ok: From, + Err: From + From, + SErr: Debug + Encode + Decode + From + Send + Sync, + SOk: Debug + Encode + Decode + Send + Sync, + { + Self::replay_serializable(self, ctx) + .await? + .map(|sok| sok.into()) + .map_err(|serr| serr.into()) + } + + fn function_name(&self) -> String { + if self.package.is_empty() { + // For backward compatibility - some of the recorded function names were not following the pattern + self.function.to_string() + } else { + format!("{}::{}", self.package, self.function) + } + } + fn validate_oplog_entry( oplog_entry: &OplogEntry, expected_function_name: &str, diff --git a/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs b/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs index f10e1d7f53..1667e9cf3a 100644 --- a/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs +++ b/golem-worker-executor-base/src/durable_host/filesystem/preopens.rs @@ -18,7 +18,6 @@ use wasmtime::component::Resource; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::filesystem::preopens::{Descriptor, Host}; @@ -26,42 +25,41 @@ use wasmtime_wasi::bindings::filesystem::preopens::{Descriptor, Host}; #[async_trait] impl Host for DurableWorkerCtx { async fn get_directories(&mut self) -> anyhow::Result, String)>> { - record_host_function_call("cli_base::preopens", "get_directories"); - - let current_dirs1 = Host::get_directories(&mut self.as_wasi_view()).await?; - let current_dirs2 = Host::get_directories(&mut self.as_wasi_view()).await?; - Durability::, SerializableError>::custom_wrap( + let durability = Durability::, SerializableError>::new( self, + "cli::preopens", + "get_directories", WrappedFunctionType::ReadLocal, - "cli::preopens::get_directories", - (), - |_ctx| Box::pin(async move { Ok(current_dirs1) }), - |_ctx, dirs| { - // We can only serialize the names - Ok(dirs + ) + .await?; + + let current_dirs = Host::get_directories(&mut self.as_wasi_view()).await?; + + let names = { + if durability.is_live() { + let result: Result, anyhow::Error> = Ok(current_dirs .iter() .map(|(_, name)| name.clone()) - .collect::>()) - }, - move |_ctx, names| { - Box::pin(async move { - // Filtering the current set of pre-opened directories by the serialized names - let filtered = current_dirs2 - .into_iter() - .filter(|(_, name)| names.contains(name)) - .collect::>(); + .collect::>()); + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } + }?; - if filtered.len() == names.len() { - // All directories were found - Ok(filtered) - } else { - Err(anyhow!( - "Not all previously available pre-opened directories were found" - )) - } - }) - }, - ) - .await + // Filtering the current set of pre-opened directories by the serialized names + let filtered = current_dirs + .into_iter() + .filter(|(_, name)| names.contains(name)) + .collect::>(); + + if filtered.len() == names.len() { + // All directories were found + Ok(filtered) + } else { + Err(anyhow!( + "Not all previously available pre-opened directories were found" + )) + } } } diff --git a/golem-worker-executor-base/src/durable_host/filesystem/types.rs b/golem-worker-executor-base/src/durable_host/filesystem/types.rs index 0e75f8ed06..aed6257eac 100644 --- a/golem-worker-executor-base/src/durable_host/filesystem/types.rs +++ b/golem-worker-executor-base/src/durable_host/filesystem/types.rs @@ -186,7 +186,14 @@ impl HostDescriptor for DurableWorkerCtx { } async fn stat(&mut self, self_: Resource) -> Result { - record_host_function_call("filesystem::types::descriptor", "stat"); + let durability = Durability::::new( + self, + "filesystem::types::descriptor", + "stat", + WrappedFunctionType::ReadLocal, + ) + .await + .map_err(FsError::trap)?; let path = match self.table().get(&self_)? { Descriptor::File(f) => f.path.clone(), @@ -196,43 +203,36 @@ impl HostDescriptor for DurableWorkerCtx { let mut stat = HostDescriptor::stat(&mut self.as_wasi_view(), self_).await?; stat.status_change_timestamp = None; // We cannot guarantee this to be the same during replays, so we rather not support it - let stat_clone1 = stat; - Durability::::custom_wrap( - self, - WrappedFunctionType::ReadLocal, - "filesystem::types::descriptor::stat", - path.to_string_lossy().to_string(), - |_ctx| { - Box::pin(async move { Ok(stat_clone1) as Result }) - }, - |_ctx, stat| { - Ok(SerializableFileTimes { - data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), - data_modification_timestamp: stat.data_modification_timestamp.map(|t| t.into()), - }) - }, - move |_ctx, times| { - Box::pin(async move { - let accessed = times.data_access_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - let modified = times.data_modification_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - spawn_blocking(|| set_symlink_times(path, accessed, modified)).await?; - stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); - stat.data_modification_timestamp = - times.data_modification_timestamp.map(|t| t.into()); - Ok(stat) - }) - }, - ) - .await - .map_err(FsError::trap) + let times = if durability.is_live() { + durability + .persist( + self, + path.to_string_lossy().to_string(), + Ok(SerializableFileTimes { + data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), + data_modification_timestamp: stat + .data_modification_timestamp + .map(|t| t.into()), + }), + ) + .await + } else { + durability + .replay::(self) + .await + } + .map_err(FsError::trap)?; + + let accessed = times.data_access_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + let modified = times.data_modification_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + spawn_blocking(|| set_symlink_times(path, accessed, modified)).await?; + stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); + stat.data_modification_timestamp = times.data_modification_timestamp.map(|t| t.into()); + Ok(stat) } async fn stat_at( @@ -241,7 +241,15 @@ impl HostDescriptor for DurableWorkerCtx { path_flags: PathFlags, path: String, ) -> Result { - record_host_function_call("filesystem::types::descriptor", "stat_at"); + let durability = Durability::::new( + self, + "filesystem::types::descriptor", + "stat_at", + WrappedFunctionType::ReadLocal, + ) + .await + .map_err(FsError::trap)?; + let full_path = match self.table().get(&self_)? { Descriptor::File(f) => f.path.join(path.clone()), Descriptor::Dir(d) => d.path.join(path.clone()), @@ -251,43 +259,36 @@ impl HostDescriptor for DurableWorkerCtx { HostDescriptor::stat_at(&mut self.as_wasi_view(), self_, path_flags, path).await?; stat.status_change_timestamp = None; // We cannot guarantee this to be the same during replays, so we rather not support it - let stat_clone1 = stat; - Durability::::custom_wrap( - self, - WrappedFunctionType::ReadLocal, - "filesystem::types::descriptor::stat_at", - full_path.to_string_lossy().to_string(), - |_ctx| { - Box::pin(async move { Ok(stat_clone1) as Result }) - }, - |_ctx, stat| { - Ok(SerializableFileTimes { - data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), - data_modification_timestamp: stat.data_modification_timestamp.map(|t| t.into()), - }) - }, - move |_ctx, times| { - Box::pin(async move { - let accessed = times.data_access_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - let modified = times.data_modification_timestamp.as_ref().map(|t| { - SystemTimeSpec::from(>::into( - t.clone(), - )) - }); - spawn_blocking(|| set_symlink_times(full_path, accessed, modified)).await?; - stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); - stat.data_modification_timestamp = - times.data_modification_timestamp.map(|t| t.into()); - Ok(stat) - }) - }, - ) - .await - .map_err(FsError::trap) + let times = if durability.is_live() { + durability + .persist( + self, + full_path.to_string_lossy().to_string(), + Ok(SerializableFileTimes { + data_access_timestamp: stat.data_access_timestamp.map(|t| t.into()), + data_modification_timestamp: stat + .data_modification_timestamp + .map(|t| t.into()), + }), + ) + .await + } else { + durability + .replay::(self) + .await + } + .map_err(FsError::trap)?; + + let accessed = times.data_access_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + let modified = times.data_modification_timestamp.as_ref().map(|t| { + SystemTimeSpec::from(>::into(t.clone())) + }); + spawn_blocking(|| set_symlink_times(full_path, accessed, modified)).await?; + stat.data_access_timestamp = times.data_access_timestamp.map(|t| t.into()); + stat.data_modification_timestamp = times.data_modification_timestamp.map(|t| t.into()); + Ok(stat) } async fn set_times_at( diff --git a/golem-worker-executor-base/src/durable_host/golem/mod.rs b/golem-worker-executor-base/src/durable_host/golem/mod.rs index a2d9184acd..35288a5b4a 100644 --- a/golem-worker-executor-base/src/durable_host/golem/mod.rs +++ b/golem-worker-executor-base/src/durable_host/golem/mod.rs @@ -26,7 +26,6 @@ use wasmtime_wasi::WasiView; use crate::durable_host::serialized::SerializableError; use crate::durable_host::wasm_rpc::UrnExtensions; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::error::GolemError; use crate::get_oplog_entry; use crate::metrics::wasm::record_host_function_call; use crate::model::InterruptKind; @@ -171,45 +170,54 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { promise_id: golem::api0_2_0::host::PromiseId, data: Vec, ) -> Result { - record_host_function_call("golem::api", "complete_promise"); - let promise_id: PromiseId = promise_id.into(); - Durability::::wrap( + let durability = Durability::::new( self, - WrappedFunctionType::WriteLocal, + "", // TODO: fix in 2.0 "golem_complete_promise", - promise_id.clone(), - |ctx| { - Box::pin(async move { - Ok(ctx - .public_state - .promise_service - .complete(promise_id, data) - .await?) - }) - }, + WrappedFunctionType::WriteLocal, ) - .await + .await?; + + let promise_id: PromiseId = promise_id.into(); + let result = if durability.is_live() { + let result = self + .public_state + .promise_service + .complete(promise_id.clone(), data) + .await; + + durability.persist(self, promise_id, result).await + } else { + durability.replay(self).await + }?; + Ok(result) } async fn delete_promise( &mut self, promise_id: golem::api0_2_0::host::PromiseId, ) -> Result<(), anyhow::Error> { - record_host_function_call("golem::api", "delete_promise"); - let promise_id: PromiseId = promise_id.into(); - Durability::::wrap( + let durability = Durability::::new( self, - WrappedFunctionType::WriteLocal, + "", // TODO: fix in 2.0 "golem_delete_promise", - promise_id.clone(), - |ctx| { - Box::pin(async move { - ctx.public_state.promise_service.delete(promise_id).await; - Ok(()) - }) - }, + WrappedFunctionType::WriteLocal, ) - .await + .await?; + + let promise_id: PromiseId = promise_id.into(); + if durability.is_live() { + let result = { + self.public_state + .promise_service + .delete(promise_id.clone()) + .await; + Ok(()) + }; + durability.persist(self, promise_id, result).await + } else { + durability.replay(self).await + } } async fn get_self_uri( @@ -432,7 +440,13 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { } async fn generate_idempotency_key(&mut self) -> anyhow::Result { - record_host_function_call("golem::api", "generate_idempotency_key"); + let durability = Durability::::new( + self, + "golem api", + "generate_idempotency_key", + WrappedFunctionType::WriteRemote, + ) + .await?; let current_idempotency_key = self .get_current_idempotency_key() @@ -440,25 +454,16 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { .unwrap_or(IdempotencyKey::fresh()); let oplog_index = self.state.current_oplog_index().await; - // NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs - let uuid = Durability::::custom_wrap( - self, - WrappedFunctionType::WriteRemote, - "golem api::generate_idempotency_key", - (), - |_ctx| { - Box::pin(async move { - let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); - let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid - Ok::(uuid) - }) - }, - |_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()), - |_ctx, (high_bits, low_bits)| { - Box::pin(async move { Ok(Uuid::from_u64_pair(high_bits, low_bits)) }) - }, - ) - .await?; + // TODO: Fix in 2.0 - Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs + let (hi, lo) = if durability.is_live() { + let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); + let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid + let result: Result<(u64, u64), anyhow::Error> = Ok(uuid.as_u64_pair()); + durability.persist(self, (), result).await + } else { + durability.replay(self).await + }?; + let uuid = Uuid::from_u64_pair(hi, lo); Ok(uuid.into()) } @@ -468,7 +473,13 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { target_version: ComponentVersion, mode: UpdateMode, ) -> anyhow::Result<()> { - record_host_function_call("golem::api", "update_worker"); + let durability = Durability::::new( + self, + "golem::api", + "update-worker", + WrappedFunctionType::WriteRemote, + ) + .await?; let worker_id: WorkerId = worker_id.into(); let owned_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &worker_id); @@ -477,30 +488,19 @@ impl golem::api0_2_0::host::Host for DurableWorkerCtx { UpdateMode::Automatic => golem_api_grpc::proto::golem::worker::UpdateMode::Automatic, UpdateMode::SnapshotBased => golem_api_grpc::proto::golem::worker::UpdateMode::Manual, }; - Durability::< - Ctx, - ( - WorkerId, - u64, - golem_api_grpc::proto::golem::worker::UpdateMode, - ), - (), - SerializableError, - >::wrap( - self, - WrappedFunctionType::WriteRemote, - "golem::api::update-worker", - (worker_id, target_version, mode), - |ctx| { - Box::pin(async move { - ctx.state - .worker_proxy - .update(&owned_worker_id, target_version, mode) - .await - }) - }, - ) - .await?; + + if durability.is_live() { + let result = self + .state + .worker_proxy + .update(&owned_worker_id, target_version, mode) + .await; + durability + .persist(self, (worker_id, target_version, mode), result) + .await + } else { + durability.replay(self).await + }?; Ok(()) } diff --git a/golem-worker-executor-base/src/durable_host/http/types.rs b/golem-worker-executor-base/src/durable_host/http/types.rs index a7bab8e6af..45f0865392 100644 --- a/golem-worker-executor-base/src/durable_host/http/types.rs +++ b/golem-worker-executor-base/src/durable_host/http/types.rs @@ -35,8 +35,7 @@ use wasmtime_wasi_http::{HttpError, HttpResult}; use golem_common::model::oplog::{OplogEntry, WrappedFunctionType}; use crate::durable_host::http::serialized::{ - SerializableErrorCode, SerializableHttpRequest, SerializableResponse, - SerializableResponseHeaders, + SerializableErrorCode, SerializableResponse, SerializableResponseHeaders, }; use crate::durable_host::http::{continue_http_request, end_http_request}; use crate::durable_host::serialized::SerializableError; @@ -459,64 +458,64 @@ impl HostFutureTrailers for DurableWorkerCtx { .ok_or_else(|| { anyhow!("No matching BeginRemoteWrite index was found for the open HTTP request") })?; + let request = request_state.request.clone(); - Durability::< + let durability = Durability::< Ctx, - SerializableHttpRequest, Option>>, SerializableErrorCode>, ()>>, SerializableError, - >::custom_wrap( + >::new( self, + "golem http::types::future_trailers", + "get", WrappedFunctionType::WriteRemoteBatched(Some(*begin_idx)), - "golem http::types::future_trailers::get", - request_state.request.clone(), - |ctx| { - Box::pin(async move { - HostFutureTrailers::get(&mut ctx.as_wasi_http_view(), self_).await - }) - }, - |ctx, result| match result { - Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))), - Some(Ok(Ok(Some(trailers)))) => { + ) + .await?; + + if durability.is_live() { + let result = HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await; + let to_serialize = match &result { + Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))), + Ok(Some(Ok(Ok(Some(trailers))))) => { let mut serialized_trailers = HashMap::new(); let host_fields: &Resource = unsafe { std::mem::transmute(trailers) }; - for (key, value) in get_fields(ctx.table(), host_fields)? { + for (key, value) in get_fields(self.table(), host_fields)? { serialized_trailers .insert(key.as_str().to_string(), value.as_bytes().to_vec()); } Ok(Some(Ok(Ok(Some(serialized_trailers))))) } - Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))), - Some(Err(_)) => Ok(Some(Err(()))), - None => Ok(None), - }, - |ctx, serialized| { - Box::pin(async { - match serialized { - Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))), - Some(Ok(Ok(Some(serialized_trailers)))) => { - let mut fields = FieldMap::new(); - for (key, value) in serialized_trailers { - fields.insert( - HeaderName::from_str(&key)?, - HeaderValue::try_from(value)?, - ); - } - let hdrs = ctx - .table() - .push(wasmtime_wasi_http::types::HostFields::Owned { fields })?; - Ok(Some(Ok(Ok(Some(hdrs))))) - } - Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))), - Some(Err(_)) => Ok(Some(Err(()))), - None => Ok(None), + Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))), + Ok(Some(Err(_))) => Ok(Some(Err(()))), + Ok(None) => Ok(None), + Err(err) => Err(SerializableError::from(err)), + }; + let _ = durability + .persist_serializable(self, request, to_serialize) + .await; + result + } else { + let serialized = durability.replay(self).await; + match serialized { + Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))), + Ok(Some(Ok(Ok(Some(serialized_trailers))))) => { + let mut fields = FieldMap::new(); + for (key, value) in serialized_trailers { + fields.insert(HeaderName::from_str(&key)?, HeaderValue::try_from(value)?); } - }) - }, - ) - .await + let hdrs = self + .table() + .push(wasmtime_wasi_http::types::HostFields::Owned { fields })?; + Ok(Some(Ok(Ok(Some(hdrs))))) + } + Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))), + Ok(Some(Err(_))) => Ok(Some(Err(()))), + Ok(None) => Ok(None), + Err(error) => Err(error), + } + } } fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { diff --git a/golem-worker-executor-base/src/durable_host/io/poll.rs b/golem-worker-executor-base/src/durable_host/io/poll.rs index 9faa8f5905..f12a1cb030 100644 --- a/golem-worker-executor-base/src/durable_host/io/poll.rs +++ b/golem-worker-executor-base/src/durable_host/io/poll.rs @@ -48,17 +48,24 @@ impl HostPollable for DurableWorkerCtx { #[async_trait] impl Host for DurableWorkerCtx { async fn poll(&mut self, in_: Vec>) -> anyhow::Result> { - record_host_function_call("io::poll", "poll"); - - let result = Durability::, SerializableError>::wrap_conditionally( + let durability = Durability::, SerializableError>::new( self, + "golem io::poll", + "poll", WrappedFunctionType::ReadLocal, - "golem io::poll::poll", - (), - |ctx| Box::pin(async move { Host::poll(&mut ctx.as_wasi_view(), in_).await }), - |result| is_suspend_for_sleep(result).is_none(), // We must not persist the suspend signal ) - .await; + .await?; + + let result = if durability.is_live() { + let result = Host::poll(&mut self.as_wasi_view(), in_).await; + if is_suspend_for_sleep(&result).is_none() { + durability.persist(self, (), result).await + } else { + result + } + } else { + durability.replay(self).await + }; match is_suspend_for_sleep(&result) { Some(duration) => { diff --git a/golem-worker-executor-base/src/durable_host/io/streams.rs b/golem-worker-executor-base/src/durable_host/io/streams.rs index 00ff066595..fd13adc659 100644 --- a/golem-worker-executor-base/src/durable_host/io/streams.rs +++ b/golem-worker-executor-base/src/durable_host/io/streams.rs @@ -39,28 +39,30 @@ impl HostInputStream for DurableWorkerCtx { self_: Resource, len: u64, ) -> Result, StreamError> { - record_host_function_call("io::streams::input_stream", "read"); if is_incoming_http_body_stream(self.table(), &self_) { let handle = self_.rep(); let begin_idx = get_http_request_begin_idx(self, handle)?; - let request = get_http_stream_request(self, handle)?; - let result = - Durability::, SerializableStreamError>::wrap( - self, - WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), - "http::types::incoming_body_stream::read", - request, - |ctx| { - Box::pin(async move { - HostInputStream::read(&mut ctx.as_wasi_view(), self_, len).await - }) - }, - ) - .await; + let durability = Durability::, SerializableStreamError>::new( + self, + "http::types::incoming_body_stream", + "read", + WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), + ) + .await?; + + let result = if durability.is_live() { + let request = get_http_stream_request(self, handle)?; + let result = HostInputStream::read(&mut self.as_wasi_view(), self_, len).await; + durability.persist(self, request, result).await + } else { + durability.replay(self).await + }; + end_http_request_if_closed(self, handle, &result).await?; result } else { + record_host_function_call("io::streams::input_stream", "read"); HostInputStream::read(&mut self.as_wasi_view(), self_, len).await } } @@ -70,56 +72,58 @@ impl HostInputStream for DurableWorkerCtx { self_: Resource, len: u64, ) -> Result, StreamError> { - record_host_function_call("io::streams::input_stream", "blocking_read"); if is_incoming_http_body_stream(self.table(), &self_) { let handle = self_.rep(); let begin_idx = get_http_request_begin_idx(self, handle)?; - let request = get_http_stream_request(self, handle)?; - let result = - Durability::, SerializableStreamError>::wrap( - self, - WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), - "http::types::incoming_body_stream::blocking_read", - request, - |ctx| { - Box::pin(async move { - HostInputStream::blocking_read(&mut ctx.as_wasi_view(), self_, len) - .await - }) - }, - ) - .await; + let durability = Durability::, SerializableStreamError>::new( + self, + "http::types::incoming_body_stream", + "blocking_read", + WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), + ) + .await?; + let result = if durability.is_live() { + let request = get_http_stream_request(self, handle)?; + let result = + HostInputStream::blocking_read(&mut self.as_wasi_view(), self_, len).await; + durability.persist(self, request, result).await + } else { + durability.replay(self).await + }; + end_http_request_if_closed(self, handle, &result).await?; result } else { + record_host_function_call("io::streams::input_stream", "blocking_read"); HostInputStream::blocking_read(&mut self.as_wasi_view(), self_, len).await } } async fn skip(&mut self, self_: Resource, len: u64) -> Result { - record_host_function_call("io::streams::input_stream", "skip"); if is_incoming_http_body_stream(self.table(), &self_) { let handle = self_.rep(); let begin_idx = get_http_request_begin_idx(self, handle)?; - let request = get_http_stream_request(self, handle)?; - let result = - Durability::::wrap( - self, - WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), - "http::types::incoming_body_stream::skip", - request, - |ctx| { - Box::pin(async move { - HostInputStream::skip(&mut ctx.as_wasi_view(), self_, len).await - }) - }, - ) - .await; + let durability = Durability::::new( + self, + "http::types::incoming_body_stream", + "skip", + WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), + ) + .await?; + let result = if durability.is_live() { + let request = get_http_stream_request(self, handle)?; + let result = HostInputStream::skip(&mut self.as_wasi_view(), self_, len).await; + durability.persist(self, request, result).await + } else { + durability.replay(self).await + }; + end_http_request_if_closed(self, handle, &result).await?; result } else { + record_host_function_call("io::streams::input_stream", "skip"); HostInputStream::skip(&mut self.as_wasi_view(), self_, len).await } } @@ -129,29 +133,30 @@ impl HostInputStream for DurableWorkerCtx { self_: Resource, len: u64, ) -> Result { - record_host_function_call("io::streams::input_stream", "blocking_skip"); if is_incoming_http_body_stream(self.table(), &self_) { let handle = self_.rep(); let begin_idx = get_http_request_begin_idx(self, handle)?; - let request = get_http_stream_request(self, handle)?; - let result = - Durability::::wrap( - self, - WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), - "http::types::incoming_body_stream::blocking_skip", - request, - |ctx| { - Box::pin(async move { - HostInputStream::blocking_skip(&mut ctx.as_wasi_view(), self_, len) - .await - }) - }, - ) - .await; + let durability = Durability::::new( + self, + "http::types::incoming_body_stream", + "blocking_skip", + WrappedFunctionType::WriteRemoteBatched(Some(begin_idx)), + ) + .await?; + + let result = if durability.is_live() { + let request = get_http_stream_request(self, handle)?; + let result = + HostInputStream::blocking_skip(&mut self.as_wasi_view(), self_, len).await; + durability.persist(self, request, result).await + } else { + durability.replay(self).await + }; end_http_request_if_closed(self, handle, &result).await?; result } else { + record_host_function_call("io::streams::input_stream", "blocking_skip"); HostInputStream::blocking_skip(&mut self.as_wasi_view(), self_, len).await } } diff --git a/golem-worker-executor-base/src/durable_host/keyvalue/eventual.rs b/golem-worker-executor-base/src/durable_host/keyvalue/eventual.rs index 072499a6e7..945e6ef10a 100644 --- a/golem-worker-executor-base/src/durable_host/keyvalue/eventual.rs +++ b/golem-worker-executor-base/src/durable_host/keyvalue/eventual.rs @@ -21,7 +21,6 @@ use crate::durable_host::keyvalue::error::ErrorEntry; use crate::durable_host::keyvalue::types::{BucketEntry, IncomingValueEntry, OutgoingValueEntry}; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::preview2::wasi::keyvalue::eventual::{ Bucket, Error, Host, IncomingValue, Key, OutgoingValue, }; @@ -34,7 +33,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, key: Key, ) -> anyhow::Result>, Resource>> { - record_host_function_call("keyvalue::eventual", "get"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -42,14 +40,26 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let result = Durability::>, SerializableError>::wrap( + + let durability = Durability::>, SerializableError>::new( self, + "golem keyvalue::eventual", + "get", WrappedFunctionType::ReadRemote, - "golem keyvalue::eventual::get", - (bucket.clone(), key.clone()), - |ctx| ctx.state.key_value_service.get(account_id, bucket, key), ) - .await; + .await?; + + let result = if durability.is_live() { + let result = self + .state + .key_value_service + .get(account_id, bucket.clone(), key.clone()) + .await; + durability.persist(self, (bucket, key), result).await + } else { + durability.replay(self).await + }; + match result { Ok(Some(value)) => { let incoming_value = self @@ -75,7 +85,6 @@ impl Host for DurableWorkerCtx { key: Key, outgoing_value: Resource, ) -> anyhow::Result>> { - record_host_function_call("keyvalue::eventual", "set"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -91,18 +100,27 @@ impl Host for DurableWorkerCtx { .read() .unwrap() .clone(); - let result = Durability::::wrap( + + let durability = Durability::::new( self, + "golem keyvalue::eventual", + "set", WrappedFunctionType::WriteRemote, - "golem keyvalue::eventual::set", - (bucket.clone(), key.clone(), outgoing_value.len() as u64), - |ctx| { - ctx.state - .key_value_service - .set(account_id, bucket, key, outgoing_value) - }, ) - .await; + .await?; + + let result = if durability.is_live() { + let input = (bucket.clone(), key.clone(), outgoing_value.len() as u64); + let result = self + .state + .key_value_service + .set(account_id, bucket, key, outgoing_value) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(()) => Ok(Ok(())), Err(e) => { @@ -120,7 +138,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, key: Key, ) -> anyhow::Result>> { - record_host_function_call("keyvalue::eventual", "delete"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -128,14 +145,27 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let result = Durability::::wrap( + + let durability = Durability::::new( self, + "golem keyvalue::eventual", + "delete", WrappedFunctionType::WriteRemote, - "golem keyvalue::eventual::delete", - (bucket.clone(), key.clone()), - |ctx| ctx.state.key_value_service.delete(account_id, bucket, key), ) - .await; + .await?; + + let result = if durability.is_live() { + let input = (bucket.clone(), key.clone()); + let result = self + .state + .key_value_service + .delete(account_id, bucket, key) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(()) => Ok(Ok(())), Err(e) => { @@ -153,7 +183,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, key: Key, ) -> anyhow::Result>> { - record_host_function_call("keyvalue::eventual", "exists"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -161,14 +190,27 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let result = Durability::::wrap( + + let durability = Durability::::new( self, + "golem keyvalue::eventual", + "exists", WrappedFunctionType::ReadRemote, - "golem keyvalue::eventual::exists", - (bucket.clone(), key.clone()), - |ctx| ctx.state.key_value_service.exists(account_id, bucket, key), ) - .await; + .await?; + + let result = if durability.is_live() { + let input = (bucket.clone(), key.clone()); + let result = self + .state + .key_value_service + .exists(account_id, bucket, key) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(exists) => Ok(Ok(exists)), Err(e) => { diff --git a/golem-worker-executor-base/src/durable_host/keyvalue/eventual_batch.rs b/golem-worker-executor-base/src/durable_host/keyvalue/eventual_batch.rs index b2b3b12b4d..001b6f84a3 100644 --- a/golem-worker-executor-base/src/durable_host/keyvalue/eventual_batch.rs +++ b/golem-worker-executor-base/src/durable_host/keyvalue/eventual_batch.rs @@ -21,7 +21,6 @@ use crate::durable_host::keyvalue::error::ErrorEntry; use crate::durable_host::keyvalue::types::{BucketEntry, IncomingValueEntry, OutgoingValueEntry}; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::preview2::wasi::keyvalue::eventual_batch::{ Bucket, Error, Host, IncomingValue, Key, OutgoingValue, }; @@ -34,7 +33,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, keys: Vec, ) -> anyhow::Result>>, Resource>> { - record_host_function_call("keyvalue::eventual_batch", "get_many"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -42,23 +40,26 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let result: anyhow::Result>>> = Durability::< - Ctx, - (String, Vec), - Vec>>, - SerializableError, - >::wrap( + + let durability = Durability::>>, SerializableError>::new( self, + "golem keyvalue::eventual_batch", + "get_many", WrappedFunctionType::ReadRemote, - "golem keyvalue::eventual_batch::get_many", - (bucket.clone(), keys.clone()), - |ctx| { - ctx.state - .key_value_service - .get_many(account_id, bucket, keys) - }, ) - .await; + .await?; + let result = if durability.is_live() { + let input = (bucket.clone(), keys.clone()); + let result = self + .state + .key_value_service + .get_many(account_id, bucket, keys) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(values) => { let mut result = Vec::new(); @@ -92,7 +93,6 @@ impl Host for DurableWorkerCtx { &mut self, bucket: Resource, ) -> anyhow::Result, Resource>> { - record_host_function_call("keyvalue::eventual_batch", "get_keys"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -100,14 +100,25 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let keys = Durability::, SerializableError>::wrap( + + let durability = Durability::, SerializableError>::new( self, + "golem keyvalue::eventual_batch", + "get_keys", WrappedFunctionType::ReadRemote, - "golem keyvalue::eventual_batch::get_keys", - bucket.clone(), - |ctx| ctx.state.key_value_service.get_keys(account_id, bucket), ) .await?; + let keys = if durability.is_live() { + let result = self + .state + .key_value_service + .get_keys(account_id, bucket.clone()) + .await; + durability.persist(self, bucket, result).await + } else { + durability.replay(self).await + }?; + Ok(Ok(keys)) } @@ -116,7 +127,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, key_values: Vec<(Key, Resource)>, ) -> anyhow::Result>> { - record_host_function_call("keyvalue::eventual_batch", "set_many"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -138,24 +148,33 @@ impl Host for DurableWorkerCtx { Ok((key, outgoing_value)) }) .collect::)>, ResourceTableError>>()?; - let result = Durability::), (), SerializableError>::wrap( + + let durability = Durability::::new( self, + "golem keyvalue::eventual_batch", + "set_many", WrappedFunctionType::WriteRemote, - "golem keyvalue::eventual_batch::set_many", - ( + ) + .await?; + + let result = if durability.is_live() { + let input: (String, Vec<(String, u64)>) = ( bucket.clone(), key_values .iter() .map(|(k, v)| (k.clone(), v.len() as u64)) .collect(), - ), - |ctx| { - ctx.state - .key_value_service - .set_many(account_id, bucket, key_values) - }, - ) - .await; + ); + let result = self + .state + .key_value_service + .set_many(account_id, bucket, key_values) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(()) => Ok(Ok(())), Err(e) => { @@ -173,7 +192,6 @@ impl Host for DurableWorkerCtx { bucket: Resource, keys: Vec, ) -> anyhow::Result>> { - record_host_function_call("keyvalue::eventual_batch", "delete_many"); let account_id = self.owned_worker_id.account_id(); let bucket = self .as_wasi_view() @@ -181,18 +199,27 @@ impl Host for DurableWorkerCtx { .get::(&bucket)? .name .clone(); - let result = Durability::), (), SerializableError>::wrap( + + let durability = Durability::::new( self, + "golem keyvalue::eventual_batch", + "delete_many", WrappedFunctionType::WriteRemote, - "golem keyvalue::eventual_batch::delete_many", - (bucket.clone(), keys.clone()), - |ctx| { - ctx.state - .key_value_service - .delete_many(account_id, bucket, keys) - }, ) - .await; + .await?; + + let result = if durability.is_live() { + let input = (bucket.clone(), keys.clone()); + let result = self + .state + .key_value_service + .delete_many(account_id, bucket, keys) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; + match result { Ok(()) => Ok(Ok(())), Err(e) => { diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 818a9e4922..64f9047ee0 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -480,20 +480,23 @@ impl DurableWorkerCtx { match remote_worker_id.clone().try_into_worker_id() { Some(worker_id) => Ok(worker_id), None => { - let worker_id = Durability::::wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "generate_unique_local_worker_id", WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::generate_unique_local_worker_id", - (), - |ctx| { - Box::pin(async move { - ctx.rpc() - .generate_unique_local_worker_id(remote_worker_id) - .await - }) - }, ) .await?; + let worker_id = if durability.is_live() { + let result = self + .rpc() + .generate_unique_local_worker_id(remote_worker_id) + .await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + }?; + Ok(worker_id) } } diff --git a/golem-worker-executor-base/src/durable_host/random/insecure.rs b/golem-worker-executor-base/src/durable_host/random/insecure.rs index 7fec05a6e0..80b031fa93 100644 --- a/golem-worker-executor-base/src/durable_host/random/insecure.rs +++ b/golem-worker-executor-base/src/durable_host/random/insecure.rs @@ -16,7 +16,6 @@ use async_trait::async_trait; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::random::insecure::Host; @@ -24,30 +23,36 @@ use wasmtime_wasi::bindings::random::insecure::Host; #[async_trait] impl Host for DurableWorkerCtx { async fn get_insecure_random_bytes(&mut self, len: u64) -> anyhow::Result> { - record_host_function_call("random::insecure", "get_insecure_random_bytes"); - Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem random::insecure", + "get_insecure_random_bytes", WrappedFunctionType::ReadLocal, - "golem random::insecure::get_insecure_random_bytes", - (), - |ctx| { - Box::pin(async move { - Host::get_insecure_random_bytes(&mut ctx.as_wasi_view(), len).await - }) - }, ) - .await + .await?; + + if durability.is_live() { + let result = Host::get_insecure_random_bytes(&mut self.as_wasi_view(), len).await; + durability.persist(self, len, result).await + } else { + durability.replay(self).await + } } async fn get_insecure_random_u64(&mut self) -> anyhow::Result { - record_host_function_call("random::insecure", "get_insecure_random_u64"); - Durability::::wrap( + let durability = Durability::::new( self, + "golem random::insecure", + "get_insecure_random_u64", WrappedFunctionType::ReadLocal, - "golem random::insecure::get_insecure_random_u64", - (), - |ctx| Box::pin(async { Host::get_insecure_random_u64(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + + if durability.is_live() { + let result = Host::get_insecure_random_u64(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/random/insecure_seed.rs b/golem-worker-executor-base/src/durable_host/random/insecure_seed.rs index 9fa4caf179..2e6215ddc8 100644 --- a/golem-worker-executor-base/src/durable_host/random/insecure_seed.rs +++ b/golem-worker-executor-base/src/durable_host/random/insecure_seed.rs @@ -16,7 +16,6 @@ use async_trait::async_trait; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::random::insecure_seed::Host; @@ -24,14 +23,18 @@ use wasmtime_wasi::bindings::random::insecure_seed::Host; #[async_trait] impl Host for DurableWorkerCtx { async fn insecure_seed(&mut self) -> anyhow::Result<(u64, u64)> { - record_host_function_call("random::insecure_seed", "insecure_seed"); - Durability::::wrap( + let durability = Durability::::new( self, + "golem random::insecure_seed", + "insecure_seed", WrappedFunctionType::ReadLocal, - "golem random::insecure_seed::insecure_seed", - (), - |ctx| Box::pin(async { Host::insecure_seed(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + if durability.is_live() { + let result = Host::insecure_seed(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/random/random.rs b/golem-worker-executor-base/src/durable_host/random/random.rs index 2beaef5318..6497c9a583 100644 --- a/golem-worker-executor-base/src/durable_host/random/random.rs +++ b/golem-worker-executor-base/src/durable_host/random/random.rs @@ -16,7 +16,6 @@ use async_trait::async_trait; use crate::durable_host::serialized::SerializableError; use crate::durable_host::{Durability, DurableWorkerCtx}; -use crate::metrics::wasm::record_host_function_call; use crate::workerctx::WorkerCtx; use golem_common::model::oplog::WrappedFunctionType; use wasmtime_wasi::bindings::random::random::Host; @@ -24,28 +23,34 @@ use wasmtime_wasi::bindings::random::random::Host; #[async_trait] impl Host for DurableWorkerCtx { async fn get_random_bytes(&mut self, len: u64) -> anyhow::Result> { - record_host_function_call("random::random", "get_random_bytes"); - Durability::, SerializableError>::wrap( + let durability = Durability::, SerializableError>::new( self, + "golem random", + "get_random_bytes", WrappedFunctionType::ReadLocal, - "golem random::get_random_bytes", - (), - |ctx| { - Box::pin(async move { Host::get_random_bytes(&mut ctx.as_wasi_view(), len).await }) - }, ) - .await + .await?; + if durability.is_live() { + let result = Host::get_random_bytes(&mut self.as_wasi_view(), len).await; + durability.persist(self, len, result).await + } else { + durability.replay(self).await + } } async fn get_random_u64(&mut self) -> anyhow::Result { - record_host_function_call("random::random", "get_random_u64"); - Durability::::wrap( + let durability = Durability::::new( self, + "golem random", + "get_random_u64", WrappedFunctionType::ReadLocal, - "golem random::get_random_u64", - (), - |ctx| Box::pin(async { Host::get_random_u64(&mut ctx.as_wasi_view()).await }), ) - .await + .await?; + if durability.is_live() { + let result = Host::get_random_u64(&mut self.as_wasi_view()).await; + durability.persist(self, (), result).await + } else { + durability.replay(self).await + } } } diff --git a/golem-worker-executor-base/src/durable_host/sockets/ip_name_lookup.rs b/golem-worker-executor-base/src/durable_host/sockets/ip_name_lookup.rs index fb1a4ae81d..37e134dbd1 100644 --- a/golem-worker-executor-base/src/durable_host/sockets/ip_name_lookup.rs +++ b/golem-worker-executor-base/src/durable_host/sockets/ip_name_lookup.rs @@ -64,19 +64,20 @@ impl Host for DurableWorkerCtx { network: Resource, name: String, ) -> Result, SocketError> { - record_host_function_call("sockets::ip_name_lookup", "resolve_addresses"); + let durability = Durability::::new( + self, + "sockets::ip_name_lookup", + "resolve_addresses", + WrappedFunctionType::ReadRemote, + ) + .await?; - let addresses: Result, SocketError> = - Durability::::wrap( - self, - WrappedFunctionType::ReadRemote, - "sockets::ip_name_lookup::resolve_addresses", - name.clone(), - |ctx| { - Box::pin(async move { resolve_and_drain_addresses(ctx, network, name).await }) - }, - ) - .await; + let addresses = if durability.is_live() { + let result = resolve_and_drain_addresses(self, network, name.clone()).await; + durability.persist(self, name, result).await + } else { + durability.replay(self).await + }; let stream = ResolveAddressStream::Done(Ok(addresses?.into_iter())); Ok(self.table().push(stream)?) diff --git a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs index 7acc8aeb00..c5560a4284 100644 --- a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs @@ -18,7 +18,7 @@ use crate::durable_host::serialized::SerializableError; use crate::durable_host::wasm_rpc::serialized::{ SerializableInvokeRequest, SerializableInvokeResult, SerializableInvokeResultV1, }; -use crate::durable_host::{Durability, DurableWorkerCtx}; +use crate::durable_host::{Durability, DurableWorkerCtx, OplogEntryVersion}; use crate::error::GolemError; use crate::get_oplog_entry; use crate::metrics::wasm::record_host_function_call; @@ -34,6 +34,7 @@ use golem_common::model::oplog::{OplogEntry, WrappedFunctionType}; use golem_common::model::{ AccountId, ComponentId, IdempotencyKey, OwnedWorkerId, TargetWorkerId, WorkerId, }; +use golem_common::serialization::try_deserialize; use golem_common::uri::oss::urn::{WorkerFunctionUrn, WorkerOrFunctionUrn}; use golem_wasm_rpc::golem::rpc::types::{ FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, @@ -88,7 +89,6 @@ impl HostWasmRpc for DurableWorkerCtx { function_name: String, mut function_params: Vec, ) -> anyhow::Result> { - record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await"); let args = self.get_arguments().await?; let env = self.get_environment().await?; @@ -105,37 +105,36 @@ impl HostWasmRpc for DurableWorkerCtx { let oplog_index = self.state.current_oplog_index().await; // NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs - let uuid = Durability::::custom_wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "invoke-and-await idempotency key", WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::invoke-and-await idempotency key", - (), - |_ctx| { - Box::pin(async move { - let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); - let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid - Ok::(uuid) - }) - }, - |_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()), - |_ctx, (high_bits, low_bits)| { - Box::pin(async move { Ok(Uuid::from_u64_pair(high_bits, low_bits)) }) - }, ) .await?; + let uuid = if durability.is_live() { + let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); + let uuid = Uuid::parse_str(&key.value.to_string())?; // this is guaranteed to be a uuid + durability + .persist_serializable(self, (), Ok(uuid.as_u64_pair())) + .await?; + uuid + } else { + let (high_bits, low_bits) = + durability.replay::<(u64, u64), anyhow::Error>(self).await?; + Uuid::from_u64_pair(high_bits, low_bits) + }; let idempotency_key = IdempotencyKey::from_uuid(uuid); - // NOTE: Could be Durability::::wrap but need to support old WitValue values during recovery - let result: Result = Durability::< - Ctx, - SerializableInvokeRequest, - TypeAnnotatedValue, - SerializableError, - >::full_custom_wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "invoke-and-await result", WrappedFunctionType::WriteRemote, - "golem::rpc::wasm-rpc::invoke-and-await", - SerializableInvokeRequest { + ) + .await?; + let result: Result = if durability.is_live() { + let input = SerializableInvokeRequest { remote_worker_id: remote_worker_id.worker_id(), idempotency_key: idempotency_key.clone(), function_name: function_name.clone(), @@ -147,62 +146,65 @@ impl HostWasmRpc for DurableWorkerCtx { &function_params, ) .await, - }, - |ctx| { - Box::pin(async move { - ctx.rpc() - .invoke_and_await( - &remote_worker_id, - Some(idempotency_key), - function_name, - function_params, - ctx.worker_id(), - &args, - &env, - ) - .await - }) - }, - |_, typed_value| Ok(typed_value.clone()), - |_, typed_value| { - typed_value - .clone() - .try_into() + }; + let result = self + .rpc() + .invoke_and_await( + &remote_worker_id, + Some(idempotency_key), + function_name, + function_params, + self.worker_id(), + &args, + &env, + ) + .await; + durability + .persist_serializable(self, input, result.clone().map_err(|err| (&err).into())) + .await?; + result.and_then(|tav| { + tav.try_into() .map_err(|s: String| RpcError::ProtocolError { details: s }) - }, - |_, oplog, entry| { - Box::pin(async move { - match entry { - OplogEntry::ImportedFunctionInvokedV1 { .. } => { - // Legacy oplog entry, used WitValue in its payload - let wit_value = DurableWorkerCtx::::default_load::< - WitValue, - SerializableError, - >(oplog, entry) - .await; - wit_value.map_err(|err| err.into()) - } - OplogEntry::ImportedFunctionInvoked { .. } => { - // New oplog entry, uses TypeAnnotatedValue in its payload - let typed_value = DurableWorkerCtx::::try_default_load::< - TypeAnnotatedValue, - SerializableError, - >(oplog.clone(), entry) - .await; - match typed_value { - Ok(Ok(typed_value)) => typed_value - .try_into() - .map_err(|s: String| RpcError::ProtocolError { details: s }), - Ok(Err(err)) => Err(err.into()), - Err(err) => Err(err.into()), - } - } - _ => unreachable!(), + }) + } else { + let (bytes, oplog_entry_version) = durability.replay_raw(self).await?; + match oplog_entry_version { + OplogEntryVersion::V1 => { + // Legacy oplog entry, used WitValue in its payload + let wit_value: Result = try_deserialize(&bytes) + .map_err(|err| { + GolemError::unexpected_oplog_entry( + "ImportedFunctionInvoked payload", + err, + ) + })? + .expect("Empty payload"); + wit_value.map_err(|err| err.into()) + } + OplogEntryVersion::V2 => { + // New oplog entry, uses TypeAnnotatedValue in its payload + let typed_value: Result< + Result, + GolemError, + > = try_deserialize(&bytes) + .map_err(|err| { + GolemError::unexpected_oplog_entry( + "ImportedFunctionInvoked payload", + err, + ) + }) + .map(|ok| ok.expect("Empty payload")); + + match typed_value { + Ok(Ok(typed_value)) => typed_value + .try_into() + .map_err(|s: String| RpcError::ProtocolError { details: s }), + Ok(Err(err)) => Err(err.into()), + Err(err) => Err(err.into()), } - }) - }, - ) - .await; + } + } + }; match result { Ok(wit_value) => Ok(Ok(wit_value)), @@ -219,7 +221,6 @@ impl HostWasmRpc for DurableWorkerCtx { function_name: String, mut function_params: Vec, ) -> anyhow::Result> { - record_host_function_call("golem::rpc::wasm-rpc", "invoke"); let args = self.get_arguments().await?; let env = self.get_environment().await?; @@ -236,31 +237,37 @@ impl HostWasmRpc for DurableWorkerCtx { let oplog_index = self.state.current_oplog_index().await; // NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs - let uuid = Durability::::custom_wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "invoke-and-await idempotency key", // NOTE: must keep invoke-and-await in the name for compatibility with Golem 1.0 WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::invoke-and-await idempotency key", // NOTE: must keep invoke-and-await in the name for compatibility with Golem 1.0 - (), - |_ctx| { - Box::pin(async move { - let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); - let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid - Ok::(uuid) - }) - }, - |_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()), - |_ctx, (high_bits, low_bits)| { - Box::pin(async move { Ok(Uuid::from_u64_pair(high_bits, low_bits)) }) - }, ) .await?; + let uuid = if durability.is_live() { + let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); + let uuid = Uuid::parse_str(&key.value.to_string())?; // this is guaranteed to be a uuid + durability + .persist_serializable(self, (), Ok(uuid.as_u64_pair())) + .await?; + uuid + } else { + let (high_bits, low_bits) = + durability.replay::<(u64, u64), anyhow::Error>(self).await?; + Uuid::from_u64_pair(high_bits, low_bits) + }; + let idempotency_key = IdempotencyKey::from_uuid(uuid); - let result = Durability::::wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "invoke", WrappedFunctionType::WriteRemote, - "golem::rpc::wasm-rpc::invoke", - SerializableInvokeRequest { + ) + .await?; + let result: Result<(), RpcError> = if durability.is_live() { + let input = SerializableInvokeRequest { remote_worker_id: remote_worker_id.worker_id(), idempotency_key: idempotency_key.clone(), function_name: function_name.clone(), @@ -272,24 +279,23 @@ impl HostWasmRpc for DurableWorkerCtx { &function_params, ) .await, - }, - |ctx| { - Box::pin(async move { - ctx.rpc() - .invoke( - &remote_worker_id, - Some(idempotency_key), - function_name, - function_params, - ctx.worker_id(), - &args, - &env, - ) - .await - }) - }, - ) - .await; + }; + let result = self + .rpc() + .invoke( + &remote_worker_id, + Some(idempotency_key), + function_name, + function_params, + self.worker_id(), + &args, + &env, + ) + .await; + durability.persist(self, input, result).await + } else { + durability.replay(self).await + }; match result { Ok(result) => Ok(Ok(result)), @@ -306,7 +312,6 @@ impl HostWasmRpc for DurableWorkerCtx { function_name: String, mut function_params: Vec, ) -> anyhow::Result> { - record_host_function_call("golem::rpc::wasm-rpc", "async-invoke-and-await"); let args = self.get_arguments().await?; let env = self.get_environment().await?; @@ -328,24 +333,25 @@ impl HostWasmRpc for DurableWorkerCtx { let oplog_index = self.state.current_oplog_index().await; // NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs - let uuid = Durability::::custom_wrap( + let durability = Durability::::new( self, + "golem::rpc::wasm-rpc", + "invoke-and-await idempotency key", // NOTE: must keep invoke-and-await in the name for compatibility with Golem 1.0 WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::invoke-and-await idempotency key", // NOTE: must keep invoke-and-await in the name for compatibility with Golem 1.0 - (), - |_ctx| { - Box::pin(async move { - let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); - let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid - Ok::(uuid) - }) - }, - |_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()), - |_ctx, (high_bits, low_bits)| { - Box::pin(async move { Ok(Uuid::from_u64_pair(high_bits, low_bits)) }) - }, ) .await?; + let uuid = if durability.is_live() { + let key = IdempotencyKey::derived(¤t_idempotency_key, oplog_index); + let uuid = Uuid::parse_str(&key.value.to_string())?; // this is guaranteed to be a uuid + durability + .persist_serializable(self, (), Ok(uuid.as_u64_pair())) + .await?; + uuid + } else { + let (high_bits, low_bits) = + durability.replay::<(u64, u64), anyhow::Error>(self).await?; + Uuid::from_u64_pair(high_bits, low_bits) + }; let idempotency_key = IdempotencyKey::from_uuid(uuid); let worker_id = self.worker_id().clone(); let request = SerializableInvokeRequest { diff --git a/golem-worker-executor-base/src/services/oplog/mod.rs b/golem-worker-executor-base/src/services/oplog/mod.rs index 8989fb30b9..80f078bce1 100644 --- a/golem-worker-executor-base/src/services/oplog/mod.rs +++ b/golem-worker-executor-base/src/services/oplog/mod.rs @@ -290,31 +290,34 @@ pub trait OplogOps: Oplog { }) } - async fn get_payload_of_entry( - &self, - entry: &OplogEntry, - ) -> Result, String> { + async fn get_raw_payload_of_entry(&self, entry: &OplogEntry) -> Result, String> { match entry { OplogEntry::ImportedFunctionInvokedV1 { response, .. } => { - let response_bytes: Bytes = self.download_payload(response).await?; - try_deserialize(&response_bytes) + Ok(Some(self.download_payload(response).await?)) } OplogEntry::ImportedFunctionInvoked { response, .. } => { - let response_bytes: Bytes = self.download_payload(response).await?; - try_deserialize(&response_bytes) + Ok(Some(self.download_payload(response).await?)) } OplogEntry::ExportedFunctionInvoked { request, .. } => { - let response_bytes: Bytes = self.download_payload(request).await?; - try_deserialize(&response_bytes) + Ok(Some(self.download_payload(request).await?)) } OplogEntry::ExportedFunctionCompleted { response, .. } => { - let response_bytes: Bytes = self.download_payload(response).await?; - try_deserialize(&response_bytes) + Ok(Some(self.download_payload(response).await?)) } _ => Ok(None), } } + async fn get_payload_of_entry( + &self, + entry: &OplogEntry, + ) -> Result, String> { + match self.get_raw_payload_of_entry(entry).await? { + Some(response_bytes) => try_deserialize(&response_bytes), + None => Ok(None), + } + } + async fn get_upload_description_payload( &self, description: &UpdateDescription,