From e7bceac8713a18cfc192864d03e0fa58de72a8f6 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Tue, 7 Jan 2025 18:42:24 +0100 Subject: [PATCH] Refactoring continues --- .../src/durable_host/durability.rs | 39 +++++--- .../src/durable_host/http/types.rs | 89 +++++++++---------- 2 files changed, 72 insertions(+), 56 deletions(-) diff --git a/golem-worker-executor-base/src/durable_host/durability.rs b/golem-worker-executor-base/src/durable_host/durability.rs index b97637a091..c8d9a44fea 100644 --- a/golem-worker-executor-base/src/durable_host/durability.rs +++ b/golem-worker-executor-base/src/durable_host/durability.rs @@ -81,24 +81,43 @@ impl Durability2 { Err: From + From + Send + Sync, SIn: Debug + Encode + Send + Sync, SErr: Debug + Encode + for<'a> From<&'a Err> + From + Send + Sync, - SOk: Debug + Encode + From + Encode + Send + Sync, + SOk: Debug + Encode + From + Send + Sync, { let serializable_result: Result = result .as_ref() .map(|result| result.clone().into()) .map_err(|err| err.into()); + self.persist_serializable(ctx, input, serializable_result) + .await + .map_err(|err| { + let err: SErr = err.into(); + let err: Err = err.into(); + err + })?; + result + } + + pub async fn persist_serializable( + &self, + ctx: &mut DurableWorkerCtx, + input: SIn, + serializable_result: Result, + ) -> Result<(), GolemError> + where + SIn: Debug + Encode + Send + Sync, + SOk: Debug + Encode + Send + Sync, + SErr: Debug + Encode + Send + Sync, + { let function_name = self.function_name(); - ctx.write_to_oplog::( + ctx.write_to_oplog::( &self.function_type, &function_name, self.begin_index, &input, &serializable_result, ) - .await?; - - result + .await } pub async fn replay(&self, ctx: &mut DurableWorkerCtx) -> Result @@ -641,19 +660,18 @@ impl DurableWorkerCtx { .map(|result| result.unwrap()) } - async fn write_to_oplog( + async fn write_to_oplog( &mut self, wrapped_function_type: &WrappedFunctionType, function_name: &str, begin_index: OplogIndex, serializable_input: &SerializedInput, serializable_result: &Result, - ) -> Result<(), Err> + ) -> Result<(), GolemError> where - Err: Send, SerializedInput: Encode + Debug + Send + Sync, SerializedSuccess: Encode + Debug + Send + Sync, - SerializedErr: Encode + Debug + From + Into + Send + Sync, + SerializedErr: Encode + Debug + Send + Sync, { if self.state.persistence_level != PersistenceLevel::PersistNothing { self.state @@ -674,8 +692,7 @@ impl DurableWorkerCtx { }); self.state .end_function(wrapped_function_type, begin_index) - .await - .map_err(|err| Into::::into(err).into())?; + .await?; if *wrapped_function_type == WrappedFunctionType::WriteRemote || matches!( *wrapped_function_type, diff --git a/golem-worker-executor-base/src/durable_host/http/types.rs b/golem-worker-executor-base/src/durable_host/http/types.rs index a7bab8e6af..9bdf10c2e4 100644 --- a/golem-worker-executor-base/src/durable_host/http/types.rs +++ b/golem-worker-executor-base/src/durable_host/http/types.rs @@ -35,12 +35,11 @@ use wasmtime_wasi_http::{HttpError, HttpResult}; use golem_common::model::oplog::{OplogEntry, WrappedFunctionType}; use crate::durable_host::http::serialized::{ - SerializableErrorCode, SerializableHttpRequest, SerializableResponse, - SerializableResponseHeaders, + SerializableErrorCode, SerializableResponse, SerializableResponseHeaders, }; use crate::durable_host::http::{continue_http_request, end_http_request}; use crate::durable_host::serialized::SerializableError; -use crate::durable_host::{Durability, DurableWorkerCtx, HttpRequestCloseOwner}; +use crate::durable_host::{Durability2, DurableWorkerCtx, HttpRequestCloseOwner}; use crate::get_oplog_entry; use crate::metrics::wasm::record_host_function_call; use crate::model::PersistenceLevel; @@ -459,64 +458,64 @@ impl HostFutureTrailers for DurableWorkerCtx { .ok_or_else(|| { anyhow!("No matching BeginRemoteWrite index was found for the open HTTP request") })?; + let request = request_state.request.clone(); - Durability::< + let durability = Durability2::< Ctx, - SerializableHttpRequest, Option>>, SerializableErrorCode>, ()>>, SerializableError, - >::custom_wrap( + >::new( self, + "golem http::types::future_trailers", + "get", WrappedFunctionType::WriteRemoteBatched(Some(*begin_idx)), - "golem http::types::future_trailers::get", - request_state.request.clone(), - |ctx| { - Box::pin(async move { - HostFutureTrailers::get(&mut ctx.as_wasi_http_view(), self_).await - }) - }, - |ctx, result| match result { - Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))), - Some(Ok(Ok(Some(trailers)))) => { + ) + .await?; + + if durability.is_live() { + let result = HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await; + let to_serialize = match &result { + Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))), + Ok(Some(Ok(Ok(Some(trailers))))) => { let mut serialized_trailers = HashMap::new(); let host_fields: &Resource = unsafe { std::mem::transmute(trailers) }; - for (key, value) in get_fields(ctx.table(), host_fields)? { + for (key, value) in get_fields(self.table(), host_fields)? { serialized_trailers .insert(key.as_str().to_string(), value.as_bytes().to_vec()); } Ok(Some(Ok(Ok(Some(serialized_trailers))))) } - Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))), - Some(Err(_)) => Ok(Some(Err(()))), - None => Ok(None), - }, - |ctx, serialized| { - Box::pin(async { - match serialized { - Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))), - Some(Ok(Ok(Some(serialized_trailers)))) => { - let mut fields = FieldMap::new(); - for (key, value) in serialized_trailers { - fields.insert( - HeaderName::from_str(&key)?, - HeaderValue::try_from(value)?, - ); - } - let hdrs = ctx - .table() - .push(wasmtime_wasi_http::types::HostFields::Owned { fields })?; - Ok(Some(Ok(Ok(Some(hdrs))))) - } - Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))), - Some(Err(_)) => Ok(Some(Err(()))), - None => Ok(None), + Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))), + Ok(Some(Err(_))) => Ok(Some(Err(()))), + Ok(None) => Ok(None), + Err(err) => Err(SerializableError::from(err)), + }; + let _ = durability + .persist_serializable(self, request, to_serialize) + .await; + result + } else { + let serialized = durability.replay(self).await; + match serialized { + Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))), + Ok(Some(Ok(Ok(Some(serialized_trailers))))) => { + let mut fields = FieldMap::new(); + for (key, value) in serialized_trailers { + fields.insert(HeaderName::from_str(&key)?, HeaderValue::try_from(value)?); } - }) - }, - ) - .await + let hdrs = self + .table() + .push(wasmtime_wasi_http::types::HostFields::Owned { fields })?; + Ok(Some(Ok(Ok(Some(hdrs))))) + } + Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))), + Ok(Some(Err(_))) => Ok(Some(Err(()))), + Ok(None) => Ok(None), + Err(error) => Err(error), + } + } } fn drop(&mut self, rep: Resource) -> anyhow::Result<()> {