Skip to content

Commit

Permalink
Defined the DurabilityHost interface and migrated Durability to it (#…
Browse files Browse the repository at this point in the history
…1236)

* Defined the DurabilityHost interface and migrated Durability to it

* Updated OpenAPI spec
  • Loading branch information
vigoo authored Jan 13, 2025
1 parent be5e969 commit 376c1c7
Show file tree
Hide file tree
Showing 54 changed files with 877 additions and 711 deletions.
14 changes: 7 additions & 7 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub enum OplogEntry {
timestamp: Timestamp,
function_name: String,
response: OplogPayload,
wrapped_function_type: WrappedFunctionType,
wrapped_function_type: DurableFunctionType, // TODO: rename in Golem 2.0
},
/// The worker has been invoked
ExportedFunctionInvoked {
Expand Down Expand Up @@ -433,7 +433,7 @@ pub enum OplogEntry {
function_name: String,
request: OplogPayload,
response: OplogPayload,
wrapped_function_type: WrappedFunctionType,
wrapped_function_type: DurableFunctionType, // TODO: rename in Golem 2.0
},
/// The current version of the Create entry (previous is CreateV1)
Create {
Expand Down Expand Up @@ -676,14 +676,14 @@ impl OplogEntry {
wrapped_function_type,
..
} => match wrapped_function_type {
WrappedFunctionType::WriteRemoteBatched(Some(begin_index))
DurableFunctionType::WriteRemoteBatched(Some(begin_index))
if *begin_index == idx =>
{
true
}
WrappedFunctionType::ReadLocal => true,
WrappedFunctionType::WriteLocal => true,
WrappedFunctionType::ReadRemote => true,
DurableFunctionType::ReadLocal => true,
DurableFunctionType::WriteLocal => true,
DurableFunctionType::ReadRemote => true,
_ => false,
},
OplogEntry::ExportedFunctionCompleted { .. } => false,
Expand Down Expand Up @@ -857,7 +857,7 @@ pub enum OplogPayload {
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub enum WrappedFunctionType {
pub enum DurableFunctionType {
/// The side-effect reads from the worker's local state (for example local file system,
/// random generator, etc.)
ReadLocal,
Expand Down
78 changes: 39 additions & 39 deletions golem-common/src/model/public_oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::model::lucene::{LeafQuery, Query};
use crate::model::oplog::{LogLevel, OplogIndex, WorkerResourceId, WrappedFunctionType};
use crate::model::oplog::{DurableFunctionType, LogLevel, OplogIndex, WorkerResourceId};
use crate::model::plugin::PluginInstallation;
use crate::model::regions::OplogRegion;
use crate::model::RetryConfig;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct WriteRemoteBatchedParameters {
#[cfg_attr(feature = "poem", derive(poem_openapi::Union))]
#[cfg_attr(feature = "poem", oai(discriminator_name = "type", one_of = true))]
#[serde(tag = "type")]
pub enum PublicWrappedFunctionType {
pub enum PublicDurableFunctionType {
/// The side-effect reads from the worker's local state (for example local file system,
/// random generator, etc.)
ReadLocal(Empty),
Expand All @@ -98,42 +98,42 @@ pub enum PublicWrappedFunctionType {
WriteRemoteBatched(WriteRemoteBatchedParameters),
}

impl From<WrappedFunctionType> for PublicWrappedFunctionType {
fn from(wrapped_function_type: WrappedFunctionType) -> Self {
match wrapped_function_type {
WrappedFunctionType::ReadLocal => PublicWrappedFunctionType::ReadLocal(Empty {}),
WrappedFunctionType::WriteLocal => PublicWrappedFunctionType::WriteLocal(Empty {}),
WrappedFunctionType::ReadRemote => PublicWrappedFunctionType::ReadRemote(Empty {}),
WrappedFunctionType::WriteRemote => PublicWrappedFunctionType::WriteRemote(Empty {}),
WrappedFunctionType::WriteRemoteBatched(index) => {
PublicWrappedFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
impl From<DurableFunctionType> for PublicDurableFunctionType {
fn from(function_type: DurableFunctionType) -> Self {
match function_type {
DurableFunctionType::ReadLocal => PublicDurableFunctionType::ReadLocal(Empty {}),
DurableFunctionType::WriteLocal => PublicDurableFunctionType::WriteLocal(Empty {}),
DurableFunctionType::ReadRemote => PublicDurableFunctionType::ReadRemote(Empty {}),
DurableFunctionType::WriteRemote => PublicDurableFunctionType::WriteRemote(Empty {}),
DurableFunctionType::WriteRemoteBatched(index) => {
PublicDurableFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
index,
})
}
}
}
}

impl IntoValue for PublicWrappedFunctionType {
impl IntoValue for PublicDurableFunctionType {
fn into_value(self) -> Value {
match self {
PublicWrappedFunctionType::ReadLocal(_) => Value::Variant {
PublicDurableFunctionType::ReadLocal(_) => Value::Variant {
case_idx: 0,
case_value: None,
},
PublicWrappedFunctionType::WriteLocal(_) => Value::Variant {
PublicDurableFunctionType::WriteLocal(_) => Value::Variant {
case_idx: 1,
case_value: None,
},
PublicWrappedFunctionType::ReadRemote(_) => Value::Variant {
PublicDurableFunctionType::ReadRemote(_) => Value::Variant {
case_idx: 2,
case_value: None,
},
PublicWrappedFunctionType::WriteRemote(_) => Value::Variant {
PublicDurableFunctionType::WriteRemote(_) => Value::Variant {
case_idx: 3,
case_value: None,
},
PublicWrappedFunctionType::WriteRemoteBatched(params) => Value::Variant {
PublicDurableFunctionType::WriteRemoteBatched(params) => Value::Variant {
case_idx: 4,
case_value: Some(Box::new(params.index.into_value())),
},
Expand Down Expand Up @@ -381,7 +381,7 @@ pub struct ImportedFunctionInvokedParameters {
pub function_name: String,
pub request: ValueAndType,
pub response: ValueAndType,
pub wrapped_function_type: PublicWrappedFunctionType,
pub wrapped_function_type: PublicDurableFunctionType, // TODO: rename in Golem 2.0
}

impl IntoValue for ImportedFunctionInvokedParameters {
Expand All @@ -405,7 +405,7 @@ impl IntoValue for ImportedFunctionInvokedParameters {
field("response", WitValue::get_type()),
field(
"wrapped_function_type",
PublicWrappedFunctionType::get_type(),
PublicDurableFunctionType::get_type(),
),
])
}
Expand Down Expand Up @@ -1504,10 +1504,10 @@ mod protobuf {
ExportedFunctionParameters, FailedUpdateParameters, GrowMemoryParameters,
ImportedFunctionInvokedParameters, JumpParameters, LogParameters, ManualUpdateParameters,
OplogCursor, PendingUpdateParameters, PendingWorkerInvocationParameters,
PluginInstallationDescription, PublicOplogEntry, PublicRetryConfig,
PublicUpdateDescription, PublicWorkerInvocation, PublicWrappedFunctionType,
ResourceParameters, SnapshotBasedUpdateParameters, SuccessfulUpdateParameters,
TimestampParameter, WriteRemoteBatchedParameters,
PluginInstallationDescription, PublicDurableFunctionType, PublicOplogEntry,
PublicRetryConfig, PublicUpdateDescription, PublicWorkerInvocation, ResourceParameters,
SnapshotBasedUpdateParameters, SuccessfulUpdateParameters, TimestampParameter,
WriteRemoteBatchedParameters,
};
use crate::model::regions::OplogRegion;
use crate::model::Empty;
Expand Down Expand Up @@ -2186,7 +2186,7 @@ mod protobuf {
}

impl TryFrom<golem_api_grpc::proto::golem::worker::WrappedFunctionType>
for PublicWrappedFunctionType
for PublicDurableFunctionType
{
type Error = String;

Expand All @@ -2195,54 +2195,54 @@ mod protobuf {
) -> Result<Self, Self::Error> {
match value.r#type() {
wrapped_function_type::Type::ReadLocal => {
Ok(PublicWrappedFunctionType::ReadLocal(Empty {}))
Ok(PublicDurableFunctionType::ReadLocal(Empty {}))
}
wrapped_function_type::Type::WriteLocal => {
Ok(PublicWrappedFunctionType::WriteLocal(Empty {}))
Ok(PublicDurableFunctionType::WriteLocal(Empty {}))
}
wrapped_function_type::Type::ReadRemote => {
Ok(PublicWrappedFunctionType::ReadRemote(Empty {}))
Ok(PublicDurableFunctionType::ReadRemote(Empty {}))
}
wrapped_function_type::Type::WriteRemote => {
Ok(PublicWrappedFunctionType::WriteRemote(Empty {}))
Ok(PublicDurableFunctionType::WriteRemote(Empty {}))
}
wrapped_function_type::Type::WriteRemoteBatched => Ok(
PublicWrappedFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
PublicDurableFunctionType::WriteRemoteBatched(WriteRemoteBatchedParameters {
index: value.oplog_index.map(OplogIndex::from_u64),
}),
),
}
}
}

impl From<PublicWrappedFunctionType> for golem_api_grpc::proto::golem::worker::WrappedFunctionType {
fn from(value: PublicWrappedFunctionType) -> Self {
impl From<PublicDurableFunctionType> for golem_api_grpc::proto::golem::worker::WrappedFunctionType {
fn from(value: PublicDurableFunctionType) -> Self {
match value {
PublicWrappedFunctionType::ReadLocal(_) => {
PublicDurableFunctionType::ReadLocal(_) => {
golem_api_grpc::proto::golem::worker::WrappedFunctionType {
r#type: wrapped_function_type::Type::ReadLocal as i32,
oplog_index: None,
}
}
PublicWrappedFunctionType::WriteLocal(_) => {
PublicDurableFunctionType::WriteLocal(_) => {
golem_api_grpc::proto::golem::worker::WrappedFunctionType {
r#type: wrapped_function_type::Type::WriteLocal as i32,
oplog_index: None,
}
}
PublicWrappedFunctionType::ReadRemote(_) => {
PublicDurableFunctionType::ReadRemote(_) => {
golem_api_grpc::proto::golem::worker::WrappedFunctionType {
r#type: wrapped_function_type::Type::ReadRemote as i32,
oplog_index: None,
}
}
PublicWrappedFunctionType::WriteRemote(_) => {
PublicDurableFunctionType::WriteRemote(_) => {
golem_api_grpc::proto::golem::worker::WrappedFunctionType {
r#type: wrapped_function_type::Type::WriteRemote as i32,
oplog_index: None,
}
}
PublicWrappedFunctionType::WriteRemoteBatched(parameters) => {
PublicDurableFunctionType::WriteRemoteBatched(parameters) => {
golem_api_grpc::proto::golem::worker::WrappedFunctionType {
r#type: wrapped_function_type::Type::WriteRemoteBatched as i32,
oplog_index: parameters.index.map(|index| index.into()),
Expand Down Expand Up @@ -2449,8 +2449,8 @@ mod tests {
ExportedFunctionInvokedParameters, ExportedFunctionParameters, FailedUpdateParameters,
GrowMemoryParameters, ImportedFunctionInvokedParameters, JumpParameters, LogParameters,
PendingUpdateParameters, PendingWorkerInvocationParameters, PluginInstallationDescription,
PublicOplogEntry, PublicRetryConfig, PublicUpdateDescription, PublicWorkerInvocation,
PublicWrappedFunctionType, ResourceParameters, SnapshotBasedUpdateParameters,
PublicDurableFunctionType, PublicOplogEntry, PublicRetryConfig, PublicUpdateDescription,
PublicWorkerInvocation, ResourceParameters, SnapshotBasedUpdateParameters,
SuccessfulUpdateParameters, TimestampParameter,
};
use crate::model::{
Expand Down Expand Up @@ -2525,7 +2525,7 @@ mod tests {
value: Value::List(vec![Value::U64(1)]),
typ: list(u64()),
},
wrapped_function_type: PublicWrappedFunctionType::ReadRemote(Empty {}),
wrapped_function_type: PublicDurableFunctionType::ReadRemote(Empty {}),
});
let serialized = entry.to_json_string();
let deserialized: PublicOplogEntry = serde_json::from_str(&serialized).unwrap();
Expand Down
Loading

0 comments on commit 376c1c7

Please sign in to comment.