Skip to content

Commit

Permalink
Worker executor API changes for hot update (#443)
Browse files Browse the repository at this point in the history
* Worker executor gRPC API changes, introducing update

* Initial wireing of the update feature

* Update OpenAPI spec

* Nicer metadata

* Fix
  • Loading branch information
vigoo authored Apr 19, 2024
1 parent 0216877 commit 19e4fc0
Show file tree
Hide file tree
Showing 13 changed files with 948 additions and 154 deletions.
25 changes: 25 additions & 0 deletions golem-api-grpc/proto/golem/worker/worker_metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package golem.worker;
import "golem/common/account_id.proto";
import "golem/worker/worker_id.proto";
import "golem/worker/worker_status.proto";
import "google/protobuf/timestamp.proto";

message WorkerMetadata {
WorkerId worker_id = 1;
Expand All @@ -14,4 +15,28 @@ message WorkerMetadata {
WorkerStatus status = 5;
uint64 template_version = 6;
uint64 retry_count = 7;
uint64 pending_invocation_count = 8;
repeated UpdateRecord updates = 9;
google.protobuf.Timestamp created_at = 10;
optional string last_error = 11;
}

message UpdateRecord {
google.protobuf.Timestamp timestamp = 1;
uint64 target_version = 2;
oneof update {
PendingUpdate pending = 3;
FailedUpdate failed = 4;
SuccessfulUpdate successful = 5;
}
}

message PendingUpdate {
}

message FailedUpdate {
optional string details = 3;
}

message SuccessfulUpdate {
}
19 changes: 19 additions & 0 deletions golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ service WorkerExecutor {
rpc ResumeWorker(ResumeWorkerRequest) returns (ResumeWorkerResponse);
rpc GetRunningWorkersMetadata(GetRunningWorkersMetadataRequest) returns (GetRunningWorkersMetadataResponse);
rpc GetWorkersMetadata(GetWorkersMetadataRequest) returns (GetWorkersMetadataResponse);
rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse);
}

message InvokeWorkerResponse {
Expand Down Expand Up @@ -219,4 +220,22 @@ message GetWorkersMetadataResponse {
message GetWorkersMetadataSuccessResponse {
repeated golem.worker.WorkerMetadata workers = 1;
optional uint64 cursor = 2;
}

message UpdateWorkerRequest {
golem.worker.WorkerId worker_id = 1;
uint64 target_version = 2;
UpdateMode mode = 3;
}

enum UpdateMode {
AUTOMATIC = 0;
MANUAL = 1;
}

message UpdateWorkerResponse {
oneof result {
golem.common.Empty success = 1;
golem.worker.WorkerExecutionError failure = 2;
}
}
1 change: 1 addition & 0 deletions golem-api-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[allow(clippy::large_enum_variant)]
pub mod proto {
use uuid::Uuid;
tonic::include_proto!("mod");
Expand Down
41 changes: 19 additions & 22 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use serde_json::Value;
use uuid::Uuid;

use crate::config::RetryConfig;
use crate::model::oplog::{OplogIndex, UpdateDescription};
use crate::model::oplog::{OplogIndex, TimestampedUpdateDescription};
use crate::model::regions::DeletedRegions;
use crate::newtype_uuid;

Expand Down Expand Up @@ -224,20 +224,6 @@ impl WorkerId {
format!("{}/{}", self.template_id, self.worker_name)
}

pub fn into_proto(self) -> golem_api_grpc::proto::golem::worker::WorkerId {
golem_api_grpc::proto::golem::worker::WorkerId {
template_id: Some(self.template_id.into()),
name: self.worker_name,
}
}

pub fn from_proto(proto: golem_api_grpc::proto::golem::worker::WorkerId) -> Self {
Self {
template_id: proto.template_id.unwrap().try_into().unwrap(),
worker_name: proto.name,
}
}

pub fn to_json_string(&self) -> String {
serde_json::to_string(self)
.unwrap_or_else(|_| panic!("failed to serialize worker id {self}"))
Expand Down Expand Up @@ -597,8 +583,8 @@ pub struct WorkerStatusRecord {
pub status: WorkerStatus,
pub deleted_regions: DeletedRegions,
pub overridden_retry_config: Option<RetryConfig>,
pub pending_invocations: Vec<WorkerInvocation>,
pub pending_updates: VecDeque<UpdateDescription>,
pub pending_invocations: Vec<TimestampedWorkerInvocation>,
pub pending_updates: VecDeque<TimestampedUpdateDescription>,
pub failed_updates: Vec<FailedUpdateRecord>,
pub successful_updates: Vec<SuccessfulUpdateRecord>,
pub component_version: ComponentVersion,
Expand Down Expand Up @@ -737,11 +723,22 @@ impl From<WorkerStatus> for i32 {
}

#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub struct WorkerInvocation {
pub invocation_key: InvocationKey,
pub full_function_name: String,
pub function_input: Vec<golem_wasm_rpc::Value>,
pub calling_convention: CallingConvention,
pub enum WorkerInvocation {
ExportedFunction {
invocation_key: InvocationKey,
full_function_name: String,
function_input: Vec<golem_wasm_rpc::Value>,
calling_convention: CallingConvention,
},
ManualUpdate {
target_version: ComponentVersion,
},
}

#[derive(Clone, Debug, PartialEq, Encode, Decode)]
pub struct TimestampedWorkerInvocation {
pub timestamp: Timestamp,
pub invocation: WorkerInvocation,
}

#[derive(
Expand Down
39 changes: 39 additions & 0 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,30 @@ impl OplogEntry {
_ => Ok(None),
}
}

pub fn timestamp(&self) -> Timestamp {
match self {
OplogEntry::Create { timestamp, .. }
| OplogEntry::ImportedFunctionInvoked { timestamp, .. }
| OplogEntry::ExportedFunctionInvoked { timestamp, .. }
| OplogEntry::ExportedFunctionCompleted { timestamp, .. }
| OplogEntry::Suspend { timestamp }
| OplogEntry::Error { timestamp, .. }
| OplogEntry::NoOp { timestamp }
| OplogEntry::Jump { timestamp, .. }
| OplogEntry::Interrupted { timestamp }
| OplogEntry::Exited { timestamp }
| OplogEntry::ChangeRetryPolicy { timestamp, .. }
| OplogEntry::BeginAtomicRegion { timestamp }
| OplogEntry::EndAtomicRegion { timestamp, .. }
| OplogEntry::BeginRemoteWrite { timestamp }
| OplogEntry::EndRemoteWrite { timestamp, .. }
| OplogEntry::PendingWorkerInvocation { timestamp, .. }
| OplogEntry::PendingUpdate { timestamp, .. }
| OplogEntry::SuccessfulUpdate { timestamp, .. }
| OplogEntry::FailedUpdate { timestamp, .. } => *timestamp,
}
}
}

/// Describes a pending update
Expand All @@ -327,6 +351,21 @@ pub enum UpdateDescription {
},
}

impl UpdateDescription {
pub fn target_version(&self) -> &ComponentVersion {
match self {
UpdateDescription::Automatic { target_version } => target_version,
UpdateDescription::SnapshotBased { target_version, .. } => target_version,
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub struct TimestampedUpdateDescription {
pub timestamp: Timestamp,
pub description: UpdateDescription,
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub enum SnapshotSource {
/// Load the snapshot from the given byte array
Expand Down
129 changes: 127 additions & 2 deletions golem-service-base/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use golem_api_grpc::proto::golem::shardmanager::{
Pod as GrpcPod, RoutingTable as GrpcRoutingTable, RoutingTableEntry as GrpcRoutingTableEntry,
};
use golem_common::model::{
parse_function_name, ComponentVersion, ShardId, TemplateId, WorkerFilter, WorkerStatus,
parse_function_name, ComponentVersion, ShardId, TemplateId, Timestamp, WorkerFilter,
WorkerStatus,
};
use golem_wasm_ast::analysis::{AnalysedResourceId, AnalysedResourceMode};
use http::Uri;
Expand Down Expand Up @@ -2631,8 +2632,12 @@ pub struct WorkerMetadata {
pub args: Vec<String>,
pub env: HashMap<String, String>,
pub status: WorkerStatus,
pub template_version: u64,
pub template_version: ComponentVersion,
pub retry_count: u64,
pub pending_invocation_count: u64,
pub updates: Vec<UpdateRecord>,
pub created_at: Timestamp,
pub last_error: Option<String>,
}

impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerMetadata> for WorkerMetadata {
Expand All @@ -2648,6 +2653,14 @@ impl TryFrom<golem_api_grpc::proto::golem::worker::WorkerMetadata> for WorkerMet
status: value.status.try_into()?,
template_version: value.template_version,
retry_count: value.retry_count,
pending_invocation_count: value.pending_invocation_count,
updates: value
.updates
.into_iter()
.map(|update| update.try_into())
.collect::<Result<Vec<UpdateRecord>, String>>()?,
created_at: value.created_at.ok_or("Missing created_at")?.into(),
last_error: value.last_error,
})
}
}
Expand All @@ -2664,6 +2677,118 @@ impl From<WorkerMetadata> for golem_api_grpc::proto::golem::worker::WorkerMetada
status: value.status.into(),
template_version: value.template_version,
retry_count: value.retry_count,
pending_invocation_count: value.pending_invocation_count,
updates: value.updates.iter().cloned().map(|u| u.into()).collect(),
created_at: Some(value.created_at.into()),
last_error: value.last_error,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Union)]
#[serde(rename_all = "camelCase")]
#[oai(discriminator_name = "type", one_of = true, rename_all = "camelCase")]
pub enum UpdateRecord {
PendingUpdate(PendingUpdate),
SuccessfulUpdate(SuccessfulUpdate),
FailedUpdate(FailedUpdate),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct PendingUpdate {
timestamp: Timestamp,
target_version: ComponentVersion,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct SuccessfulUpdate {
timestamp: Timestamp,
target_version: ComponentVersion,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)]
#[serde(rename_all = "camelCase")]
#[oai(rename_all = "camelCase")]
pub struct FailedUpdate {
timestamp: Timestamp,
target_version: ComponentVersion,
details: Option<String>,
}

impl TryFrom<golem_api_grpc::proto::golem::worker::UpdateRecord> for UpdateRecord {
type Error = String;

fn try_from(
value: golem_api_grpc::proto::golem::worker::UpdateRecord,
) -> Result<Self, Self::Error> {
match value.update.ok_or("Missing update field")? {
golem_api_grpc::proto::golem::worker::update_record::Update::Failed(failed) => {
Ok(Self::FailedUpdate(FailedUpdate {
timestamp: value.timestamp.ok_or("Missing timestamp")?.into(),
target_version: value.target_version,
details: { failed.details },
}))
}
golem_api_grpc::proto::golem::worker::update_record::Update::Pending(_) => {
Ok(Self::PendingUpdate(PendingUpdate {
timestamp: value.timestamp.ok_or("Missing timestamp")?.into(),
target_version: value.target_version,
}))
}
golem_api_grpc::proto::golem::worker::update_record::Update::Successful(_) => {
Ok(Self::SuccessfulUpdate(SuccessfulUpdate {
timestamp: value.timestamp.ok_or("Missing timestamp")?.into(),
target_version: value.target_version,
}))
}
}
}
}

impl From<UpdateRecord> for golem_api_grpc::proto::golem::worker::UpdateRecord {
fn from(value: UpdateRecord) -> Self {
match value {
UpdateRecord::FailedUpdate(FailedUpdate {
timestamp,
target_version,
details,
}) => Self {
timestamp: Some(timestamp.into()),
target_version,
update: Some(
golem_api_grpc::proto::golem::worker::update_record::Update::Failed(
golem_api_grpc::proto::golem::worker::FailedUpdate { details },
),
),
},
UpdateRecord::PendingUpdate(PendingUpdate {
timestamp,
target_version,
}) => Self {
timestamp: Some(timestamp.into()),
target_version,
update: Some(
golem_api_grpc::proto::golem::worker::update_record::Update::Pending(
golem_api_grpc::proto::golem::worker::PendingUpdate {},
),
),
},
UpdateRecord::SuccessfulUpdate(SuccessfulUpdate {
timestamp,
target_version,
}) => Self {
timestamp: Some(timestamp.into()),
target_version,
update: Some(
golem_api_grpc::proto::golem::worker::update_record::Update::Successful(
golem_api_grpc::proto::golem::worker::SuccessfulUpdate {},
),
),
},
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
deleted_regions: self.state.deleted_regions.clone(),
overridden_retry_config: self.state.overridden_retry_policy.clone(),
pending_invocations: self.public_state.invocation_queue().pending_invocations(),
pending_updates: last_known_status.pending_updates,
pending_updates: self.public_state.invocation_queue().pending_updates(),
failed_updates: last_known_status.failed_updates,
successful_updates: last_known_status.successful_updates,
component_version: last_known_status.component_version,
Expand Down Expand Up @@ -340,7 +340,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
deleted_regions: self.state.deleted_regions.clone(),
overridden_retry_config: self.state.overridden_retry_policy.clone(),
pending_invocations: self.public_state.invocation_queue().pending_invocations(),
pending_updates: last_known_status.pending_updates,
pending_updates: self.public_state.invocation_queue().pending_updates(),
failed_updates: last_known_status.failed_updates,
successful_updates: last_known_status.successful_updates,
component_version: last_known_status.component_version,
Expand Down
Loading

0 comments on commit 19e4fc0

Please sign in to comment.