Skip to content

Commit

Permalink
Refactoring continues
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Jan 7, 2025
1 parent c2650a7 commit e7bceac
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 56 deletions.
39 changes: 28 additions & 11 deletions golem-worker-executor-base/src/durable_host/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,43 @@ impl<Ctx: WorkerCtx, SOk, SErr> Durability2<Ctx, SOk, SErr> {
Err: From<SErr> + From<GolemError> + Send + Sync,
SIn: Debug + Encode + Send + Sync,
SErr: Debug + Encode + for<'a> From<&'a Err> + From<GolemError> + Send + Sync,
SOk: Debug + Encode + From<Ok> + Encode + Send + Sync,
SOk: Debug + Encode + From<Ok> + Send + Sync,
{
let serializable_result: Result<SOk, SErr> = result
.as_ref()
.map(|result| result.clone().into())
.map_err(|err| err.into());

self.persist_serializable(ctx, input, serializable_result)
.await
.map_err(|err| {
let err: SErr = err.into();
let err: Err = err.into();
err
})?;
result
}

pub async fn persist_serializable<SIn>(
&self,
ctx: &mut DurableWorkerCtx<Ctx>,
input: SIn,
serializable_result: Result<SOk, SErr>,
) -> Result<(), GolemError>
where
SIn: Debug + Encode + Send + Sync,
SOk: Debug + Encode + Send + Sync,
SErr: Debug + Encode + Send + Sync,
{
let function_name = self.function_name();
ctx.write_to_oplog::<SIn, SOk, Err, SErr>(
ctx.write_to_oplog::<SIn, SOk, SErr>(
&self.function_type,
&function_name,
self.begin_index,
&input,
&serializable_result,
)
.await?;

result
.await
}

pub async fn replay<Ok, Err>(&self, ctx: &mut DurableWorkerCtx<Ctx>) -> Result<Ok, Err>
Expand Down Expand Up @@ -641,19 +660,18 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
.map(|result| result.unwrap())
}

async fn write_to_oplog<SerializedInput, SerializedSuccess, Err, SerializedErr>(
async fn write_to_oplog<SerializedInput, SerializedSuccess, SerializedErr>(
&mut self,
wrapped_function_type: &WrappedFunctionType,
function_name: &str,
begin_index: OplogIndex,
serializable_input: &SerializedInput,
serializable_result: &Result<SerializedSuccess, SerializedErr>,
) -> Result<(), Err>
) -> Result<(), GolemError>
where
Err: Send,
SerializedInput: Encode + Debug + Send + Sync,
SerializedSuccess: Encode + Debug + Send + Sync,
SerializedErr: Encode + Debug + From<GolemError> + Into<Err> + Send + Sync,
SerializedErr: Encode + Debug + Send + Sync,
{
if self.state.persistence_level != PersistenceLevel::PersistNothing {
self.state
Expand All @@ -674,8 +692,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
});
self.state
.end_function(wrapped_function_type, begin_index)
.await
.map_err(|err| Into::<SerializedErr>::into(err).into())?;
.await?;
if *wrapped_function_type == WrappedFunctionType::WriteRemote
|| matches!(
*wrapped_function_type,
Expand Down
89 changes: 44 additions & 45 deletions golem-worker-executor-base/src/durable_host/http/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ use wasmtime_wasi_http::{HttpError, HttpResult};
use golem_common::model::oplog::{OplogEntry, WrappedFunctionType};

use crate::durable_host::http::serialized::{
SerializableErrorCode, SerializableHttpRequest, SerializableResponse,
SerializableResponseHeaders,
SerializableErrorCode, SerializableResponse, SerializableResponseHeaders,
};
use crate::durable_host::http::{continue_http_request, end_http_request};
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx, HttpRequestCloseOwner};
use crate::durable_host::{Durability2, DurableWorkerCtx, HttpRequestCloseOwner};
use crate::get_oplog_entry;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
Expand Down Expand Up @@ -459,64 +458,64 @@ impl<Ctx: WorkerCtx> HostFutureTrailers for DurableWorkerCtx<Ctx> {
.ok_or_else(|| {
anyhow!("No matching BeginRemoteWrite index was found for the open HTTP request")
})?;
let request = request_state.request.clone();

Durability::<
let durability = Durability2::<
Ctx,
SerializableHttpRequest,
Option<Result<Result<Option<HashMap<String, Vec<u8>>>, SerializableErrorCode>, ()>>,
SerializableError,
>::custom_wrap(
>::new(
self,
"golem http::types::future_trailers",
"get",
WrappedFunctionType::WriteRemoteBatched(Some(*begin_idx)),
"golem http::types::future_trailers::get",
request_state.request.clone(),
|ctx| {
Box::pin(async move {
HostFutureTrailers::get(&mut ctx.as_wasi_http_view(), self_).await
})
},
|ctx, result| match result {
Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))),
Some(Ok(Ok(Some(trailers)))) => {
)
.await?;

if durability.is_live() {
let result = HostFutureTrailers::get(&mut self.as_wasi_http_view(), self_).await;
let to_serialize = match &result {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(trailers))))) => {
let mut serialized_trailers = HashMap::new();
let host_fields: &Resource<wasmtime_wasi_http::types::HostFields> =
unsafe { std::mem::transmute(trailers) };

for (key, value) in get_fields(ctx.table(), host_fields)? {
for (key, value) in get_fields(self.table(), host_fields)? {
serialized_trailers
.insert(key.as_str().to_string(), value.as_bytes().to_vec());
}
Ok(Some(Ok(Ok(Some(serialized_trailers)))))
}
Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))),
Some(Err(_)) => Ok(Some(Err(()))),
None => Ok(None),
},
|ctx, serialized| {
Box::pin(async {
match serialized {
Some(Ok(Ok(None))) => Ok(Some(Ok(Ok(None)))),
Some(Ok(Ok(Some(serialized_trailers)))) => {
let mut fields = FieldMap::new();
for (key, value) in serialized_trailers {
fields.insert(
HeaderName::from_str(&key)?,
HeaderValue::try_from(value)?,
);
}
let hdrs = ctx
.table()
.push(wasmtime_wasi_http::types::HostFields::Owned { fields })?;
Ok(Some(Ok(Ok(Some(hdrs)))))
}
Some(Ok(Err(error_code))) => Ok(Some(Ok(Err(error_code.into())))),
Some(Err(_)) => Ok(Some(Err(()))),
None => Ok(None),
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(err) => Err(SerializableError::from(err)),
};
let _ = durability
.persist_serializable(self, request, to_serialize)
.await;
result
} else {
let serialized = durability.replay(self).await;
match serialized {
Ok(Some(Ok(Ok(None)))) => Ok(Some(Ok(Ok(None)))),
Ok(Some(Ok(Ok(Some(serialized_trailers))))) => {
let mut fields = FieldMap::new();
for (key, value) in serialized_trailers {
fields.insert(HeaderName::from_str(&key)?, HeaderValue::try_from(value)?);
}
})
},
)
.await
let hdrs = self
.table()
.push(wasmtime_wasi_http::types::HostFields::Owned { fields })?;
Ok(Some(Ok(Ok(Some(hdrs)))))
}
Ok(Some(Ok(Err(error_code)))) => Ok(Some(Ok(Err(error_code.into())))),
Ok(Some(Err(_))) => Ok(Some(Err(()))),
Ok(None) => Ok(None),
Err(error) => Err(error),
}
}
}

fn drop(&mut self, rep: Resource<FutureTrailers>) -> anyhow::Result<()> {
Expand Down

0 comments on commit e7bceac

Please sign in to comment.