From 19e4fc0c4082041d6b9d798252d7a1c23f4114f2 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Fri, 19 Apr 2024 16:34:00 +0200 Subject: [PATCH] Worker executor API changes for hot update (#443) * Worker executor gRPC API changes, introducing update * Initial wireing of the update feature * Update OpenAPI spec * Nicer metadata * Fix --- .../proto/golem/worker/worker_metadata.proto | 25 ++ .../workerexecutor/worker_executor.proto | 19 + golem-api-grpc/src/lib.rs | 1 + golem-common/src/model/mod.rs | 41 +- golem-common/src/model/oplog.rs | 39 ++ golem-service-base/src/model.rs | 129 +++++- .../src/durable_host/mod.rs | 4 +- golem-worker-executor-base/src/error.rs | 8 +- golem-worker-executor-base/src/grpc.rs | 384 ++++++++++++++---- .../src/services/invocation_queue.rs | 171 ++++++-- golem-worker-executor-base/src/worker.rs | 178 +++++++- .../src/service/worker/default.rs | 8 +- openapi/golem-service.yaml | 95 +++++ 13 files changed, 948 insertions(+), 154 deletions(-) diff --git a/golem-api-grpc/proto/golem/worker/worker_metadata.proto b/golem-api-grpc/proto/golem/worker/worker_metadata.proto index 9062dd0c13..8324fc4e90 100644 --- a/golem-api-grpc/proto/golem/worker/worker_metadata.proto +++ b/golem-api-grpc/proto/golem/worker/worker_metadata.proto @@ -5,6 +5,7 @@ package golem.worker; import "golem/common/account_id.proto"; import "golem/worker/worker_id.proto"; import "golem/worker/worker_status.proto"; +import "google/protobuf/timestamp.proto"; message WorkerMetadata { WorkerId worker_id = 1; @@ -14,4 +15,28 @@ message WorkerMetadata { WorkerStatus status = 5; uint64 template_version = 6; uint64 retry_count = 7; + uint64 pending_invocation_count = 8; + repeated UpdateRecord updates = 9; + google.protobuf.Timestamp created_at = 10; + optional string last_error = 11; +} + +message UpdateRecord { + google.protobuf.Timestamp timestamp = 1; + uint64 target_version = 2; + oneof update { + PendingUpdate pending = 3; + FailedUpdate failed = 4; + SuccessfulUpdate successful = 5; + } +} + +message PendingUpdate { +} + +message FailedUpdate { + optional string details = 3; +} + +message SuccessfulUpdate { } diff --git a/golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto b/golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto index 272208b497..c274b42b81 100644 --- a/golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto +++ b/golem-api-grpc/proto/golem/workerexecutor/worker_executor.proto @@ -33,6 +33,7 @@ service WorkerExecutor { rpc ResumeWorker(ResumeWorkerRequest) returns (ResumeWorkerResponse); rpc GetRunningWorkersMetadata(GetRunningWorkersMetadataRequest) returns (GetRunningWorkersMetadataResponse); rpc GetWorkersMetadata(GetWorkersMetadataRequest) returns (GetWorkersMetadataResponse); + rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse); } message InvokeWorkerResponse { @@ -219,4 +220,22 @@ message GetWorkersMetadataResponse { message GetWorkersMetadataSuccessResponse { repeated golem.worker.WorkerMetadata workers = 1; optional uint64 cursor = 2; +} + +message UpdateWorkerRequest { + golem.worker.WorkerId worker_id = 1; + uint64 target_version = 2; + UpdateMode mode = 3; +} + +enum UpdateMode { + AUTOMATIC = 0; + MANUAL = 1; +} + +message UpdateWorkerResponse { + oneof result { + golem.common.Empty success = 1; + golem.worker.WorkerExecutionError failure = 2; + } } \ No newline at end of file diff --git a/golem-api-grpc/src/lib.rs b/golem-api-grpc/src/lib.rs index fad249498b..77d457bc07 100644 --- a/golem-api-grpc/src/lib.rs +++ b/golem-api-grpc/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[allow(clippy::large_enum_variant)] pub mod proto { use uuid::Uuid; tonic::include_proto!("mod"); diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 8b67f78486..dc02e4a103 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -35,7 +35,7 @@ use serde_json::Value; use uuid::Uuid; use crate::config::RetryConfig; -use crate::model::oplog::{OplogIndex, UpdateDescription}; +use crate::model::oplog::{OplogIndex, TimestampedUpdateDescription}; use crate::model::regions::DeletedRegions; use crate::newtype_uuid; @@ -224,20 +224,6 @@ impl WorkerId { format!("{}/{}", self.template_id, self.worker_name) } - pub fn into_proto(self) -> golem_api_grpc::proto::golem::worker::WorkerId { - golem_api_grpc::proto::golem::worker::WorkerId { - template_id: Some(self.template_id.into()), - name: self.worker_name, - } - } - - pub fn from_proto(proto: golem_api_grpc::proto::golem::worker::WorkerId) -> Self { - Self { - template_id: proto.template_id.unwrap().try_into().unwrap(), - worker_name: proto.name, - } - } - pub fn to_json_string(&self) -> String { serde_json::to_string(self) .unwrap_or_else(|_| panic!("failed to serialize worker id {self}")) @@ -597,8 +583,8 @@ pub struct WorkerStatusRecord { pub status: WorkerStatus, pub deleted_regions: DeletedRegions, pub overridden_retry_config: Option, - pub pending_invocations: Vec, - pub pending_updates: VecDeque, + pub pending_invocations: Vec, + pub pending_updates: VecDeque, pub failed_updates: Vec, pub successful_updates: Vec, pub component_version: ComponentVersion, @@ -737,11 +723,22 @@ impl From for i32 { } #[derive(Clone, Debug, PartialEq, Encode, Decode)] -pub struct WorkerInvocation { - pub invocation_key: InvocationKey, - pub full_function_name: String, - pub function_input: Vec, - pub calling_convention: CallingConvention, +pub enum WorkerInvocation { + ExportedFunction { + invocation_key: InvocationKey, + full_function_name: String, + function_input: Vec, + calling_convention: CallingConvention, + }, + ManualUpdate { + target_version: ComponentVersion, + }, +} + +#[derive(Clone, Debug, PartialEq, Encode, Decode)] +pub struct TimestampedWorkerInvocation { + pub timestamp: Timestamp, + pub invocation: WorkerInvocation, } #[derive( diff --git a/golem-common/src/model/oplog.rs b/golem-common/src/model/oplog.rs index d96c61cf72..c599d2eb3e 100644 --- a/golem-common/src/model/oplog.rs +++ b/golem-common/src/model/oplog.rs @@ -312,6 +312,30 @@ impl OplogEntry { _ => Ok(None), } } + + pub fn timestamp(&self) -> Timestamp { + match self { + OplogEntry::Create { timestamp, .. } + | OplogEntry::ImportedFunctionInvoked { timestamp, .. } + | OplogEntry::ExportedFunctionInvoked { timestamp, .. } + | OplogEntry::ExportedFunctionCompleted { timestamp, .. } + | OplogEntry::Suspend { timestamp } + | OplogEntry::Error { timestamp, .. } + | OplogEntry::NoOp { timestamp } + | OplogEntry::Jump { timestamp, .. } + | OplogEntry::Interrupted { timestamp } + | OplogEntry::Exited { timestamp } + | OplogEntry::ChangeRetryPolicy { timestamp, .. } + | OplogEntry::BeginAtomicRegion { timestamp } + | OplogEntry::EndAtomicRegion { timestamp, .. } + | OplogEntry::BeginRemoteWrite { timestamp } + | OplogEntry::EndRemoteWrite { timestamp, .. } + | OplogEntry::PendingWorkerInvocation { timestamp, .. } + | OplogEntry::PendingUpdate { timestamp, .. } + | OplogEntry::SuccessfulUpdate { timestamp, .. } + | OplogEntry::FailedUpdate { timestamp, .. } => *timestamp, + } + } } /// Describes a pending update @@ -327,6 +351,21 @@ pub enum UpdateDescription { }, } +impl UpdateDescription { + pub fn target_version(&self) -> &ComponentVersion { + match self { + UpdateDescription::Automatic { target_version } => target_version, + UpdateDescription::SnapshotBased { target_version, .. } => target_version, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)] +pub struct TimestampedUpdateDescription { + pub timestamp: Timestamp, + pub description: UpdateDescription, +} + #[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)] pub enum SnapshotSource { /// Load the snapshot from the given byte array diff --git a/golem-service-base/src/model.rs b/golem-service-base/src/model.rs index ac46734bc1..e599d19206 100644 --- a/golem-service-base/src/model.rs +++ b/golem-service-base/src/model.rs @@ -16,7 +16,8 @@ use golem_api_grpc::proto::golem::shardmanager::{ Pod as GrpcPod, RoutingTable as GrpcRoutingTable, RoutingTableEntry as GrpcRoutingTableEntry, }; use golem_common::model::{ - parse_function_name, ComponentVersion, ShardId, TemplateId, WorkerFilter, WorkerStatus, + parse_function_name, ComponentVersion, ShardId, TemplateId, Timestamp, WorkerFilter, + WorkerStatus, }; use golem_wasm_ast::analysis::{AnalysedResourceId, AnalysedResourceMode}; use http::Uri; @@ -2631,8 +2632,12 @@ pub struct WorkerMetadata { pub args: Vec, pub env: HashMap, pub status: WorkerStatus, - pub template_version: u64, + pub template_version: ComponentVersion, pub retry_count: u64, + pub pending_invocation_count: u64, + pub updates: Vec, + pub created_at: Timestamp, + pub last_error: Option, } impl TryFrom for WorkerMetadata { @@ -2648,6 +2653,14 @@ impl TryFrom for WorkerMet status: value.status.try_into()?, template_version: value.template_version, retry_count: value.retry_count, + pending_invocation_count: value.pending_invocation_count, + updates: value + .updates + .into_iter() + .map(|update| update.try_into()) + .collect::, String>>()?, + created_at: value.created_at.ok_or("Missing created_at")?.into(), + last_error: value.last_error, }) } } @@ -2664,6 +2677,118 @@ impl From for golem_api_grpc::proto::golem::worker::WorkerMetada status: value.status.into(), template_version: value.template_version, retry_count: value.retry_count, + pending_invocation_count: value.pending_invocation_count, + updates: value.updates.iter().cloned().map(|u| u.into()).collect(), + created_at: Some(value.created_at.into()), + last_error: value.last_error, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Union)] +#[serde(rename_all = "camelCase")] +#[oai(discriminator_name = "type", one_of = true, rename_all = "camelCase")] +pub enum UpdateRecord { + PendingUpdate(PendingUpdate), + SuccessfulUpdate(SuccessfulUpdate), + FailedUpdate(FailedUpdate), +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct PendingUpdate { + timestamp: Timestamp, + target_version: ComponentVersion, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct SuccessfulUpdate { + timestamp: Timestamp, + target_version: ComponentVersion, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct FailedUpdate { + timestamp: Timestamp, + target_version: ComponentVersion, + details: Option, +} + +impl TryFrom for UpdateRecord { + type Error = String; + + fn try_from( + value: golem_api_grpc::proto::golem::worker::UpdateRecord, + ) -> Result { + match value.update.ok_or("Missing update field")? { + golem_api_grpc::proto::golem::worker::update_record::Update::Failed(failed) => { + Ok(Self::FailedUpdate(FailedUpdate { + timestamp: value.timestamp.ok_or("Missing timestamp")?.into(), + target_version: value.target_version, + details: { failed.details }, + })) + } + golem_api_grpc::proto::golem::worker::update_record::Update::Pending(_) => { + Ok(Self::PendingUpdate(PendingUpdate { + timestamp: value.timestamp.ok_or("Missing timestamp")?.into(), + target_version: value.target_version, + })) + } + golem_api_grpc::proto::golem::worker::update_record::Update::Successful(_) => { + Ok(Self::SuccessfulUpdate(SuccessfulUpdate { + timestamp: value.timestamp.ok_or("Missing timestamp")?.into(), + target_version: value.target_version, + })) + } + } + } +} + +impl From for golem_api_grpc::proto::golem::worker::UpdateRecord { + fn from(value: UpdateRecord) -> Self { + match value { + UpdateRecord::FailedUpdate(FailedUpdate { + timestamp, + target_version, + details, + }) => Self { + timestamp: Some(timestamp.into()), + target_version, + update: Some( + golem_api_grpc::proto::golem::worker::update_record::Update::Failed( + golem_api_grpc::proto::golem::worker::FailedUpdate { details }, + ), + ), + }, + UpdateRecord::PendingUpdate(PendingUpdate { + timestamp, + target_version, + }) => Self { + timestamp: Some(timestamp.into()), + target_version, + update: Some( + golem_api_grpc::proto::golem::worker::update_record::Update::Pending( + golem_api_grpc::proto::golem::worker::PendingUpdate {}, + ), + ), + }, + UpdateRecord::SuccessfulUpdate(SuccessfulUpdate { + timestamp, + target_version, + }) => Self { + timestamp: Some(timestamp.into()), + target_version, + update: Some( + golem_api_grpc::proto::golem::worker::update_record::Update::Successful( + golem_api_grpc::proto::golem::worker::SuccessfulUpdate {}, + ), + ), + }, } } } diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index d655bdbd2e..bc72289a27 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -311,7 +311,7 @@ impl DurableWorkerCtx { deleted_regions: self.state.deleted_regions.clone(), overridden_retry_config: self.state.overridden_retry_policy.clone(), pending_invocations: self.public_state.invocation_queue().pending_invocations(), - pending_updates: last_known_status.pending_updates, + pending_updates: self.public_state.invocation_queue().pending_updates(), failed_updates: last_known_status.failed_updates, successful_updates: last_known_status.successful_updates, component_version: last_known_status.component_version, @@ -340,7 +340,7 @@ impl DurableWorkerCtx { deleted_regions: self.state.deleted_regions.clone(), overridden_retry_config: self.state.overridden_retry_policy.clone(), pending_invocations: self.public_state.invocation_queue().pending_invocations(), - pending_updates: last_known_status.pending_updates, + pending_updates: self.public_state.invocation_queue().pending_updates(), failed_updates: last_known_status.failed_updates, successful_updates: last_known_status.successful_updates, component_version: last_known_status.component_version, diff --git a/golem-worker-executor-base/src/error.rs b/golem-worker-executor-base/src/error.rs index f459a5fac4..bd25a645bf 100644 --- a/golem-worker-executor-base/src/error.rs +++ b/golem-worker-executor-base/src/error.rs @@ -368,7 +368,7 @@ impl From for golem::worker::WorkerExecutionError { error: Some( golem::worker::worker_execution_error::Error::WorkerAlreadyExists( golem::worker::WorkerAlreadyExists { - worker_id: Some(worker_id.into_proto()), + worker_id: Some(worker_id.into()), }, ), ), @@ -377,7 +377,7 @@ impl From for golem::worker::WorkerExecutionError { error: Some( golem::worker::worker_execution_error::Error::WorkerNotFound( golem::worker::WorkerNotFound { - worker_id: Some(worker_id.into_proto()), + worker_id: Some(worker_id.into()), }, ), ), @@ -387,7 +387,7 @@ impl From for golem::worker::WorkerExecutionError { error: Some( golem::worker::worker_execution_error::Error::WorkerCreationFailed( golem::worker::WorkerCreationFailed { - worker_id: Some(worker_id.into_proto()), + worker_id: Some(worker_id.into()), details, }, ), @@ -398,7 +398,7 @@ impl From for golem::worker::WorkerExecutionError { error: Some( golem::worker::worker_execution_error::Error::FailedToResumeWorker( golem::worker::FailedToResumeWorker { - worker_id: Some(worker_id.into_proto()), + worker_id: Some(worker_id.into()), }, ), ), diff --git a/golem-worker-executor-base/src/grpc.rs b/golem-worker-executor-base/src/grpc.rs index 2334be7901..31caa7e7fb 100644 --- a/golem-worker-executor-base/src/grpc.rs +++ b/golem-worker-executor-base/src/grpc.rs @@ -24,13 +24,14 @@ use golem_api_grpc::proto::golem::common::ResourceLimits as GrpcResourceLimits; use golem_api_grpc::proto::golem::workerexecutor::worker_executor_server::WorkerExecutor; use golem_api_grpc::proto::golem::workerexecutor::{ GetRunningWorkersMetadataRequest, GetRunningWorkersMetadataResponse, GetWorkersMetadataRequest, - GetWorkersMetadataResponse, + GetWorkersMetadataResponse, UpdateMode, UpdateWorkerRequest, UpdateWorkerResponse, }; use golem_common::cache::PendingOrFinal; use golem_common::model as common_model; +use golem_common::model::oplog::UpdateDescription; use golem_common::model::{ - AccountId, CallingConvention, InvocationKey, ShardId, WorkerFilter, WorkerMetadata, - WorkerStatus, WorkerStatusRecord, + AccountId, CallingConvention, InvocationKey, ShardId, TimestampedWorkerInvocation, + WorkerFilter, WorkerId, WorkerInvocation, WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::protobuf::Val; use tokio::sync::mpsc; @@ -44,7 +45,7 @@ use crate::error::*; use crate::metrics::grpc::{ record_closed_grpc_active_stream, record_new_grpc_active_stream, RecordedGrpcRequest, }; -use crate::model::InterruptKind; +use crate::model::{InterruptKind, LastError}; use crate::services::worker_activator::{DefaultWorkerActivator, LazyWorkerActivator}; use crate::services::worker_event::LogLevel; use crate::services::{ @@ -221,7 +222,7 @@ impl + UsesAllDeps + Send + Sync + &self, request: golem::workerexecutor::CreateWorkerRequest, ) -> Result<(), GolemError> { - let proto_worker_id = request + let worker_id = request .worker_id .ok_or(GolemError::invalid_request("worker_id not found"))?; @@ -235,8 +236,7 @@ impl + UsesAllDeps + Send + Sync + } let template_version = request.template_version; - - let worker_id = common_model::WorkerId::from_proto(proto_worker_id); + let worker_id: WorkerId = worker_id.try_into().map_err(GolemError::invalid_request)?; self.validate_worker_id(&worker_id)?; @@ -271,12 +271,12 @@ impl + UsesAllDeps + Send + Sync + .ok_or(GolemError::invalid_request("promise_id not found"))?; let data = request.data; - let worker_id = common_model::WorkerId::from_proto( - promise_id - .worker_id - .clone() - .ok_or(GolemError::invalid_request("worker_id not found"))?, - ); + let worker_id: WorkerId = promise_id + .worker_id + .clone() + .ok_or(GolemError::invalid_request("worker_id not found"))? + .try_into() + .map_err(GolemError::invalid_request)?; self.validate_worker_id(&worker_id)?; @@ -324,7 +324,7 @@ impl + UsesAllDeps + Send + Sync + &self, inner: golem::worker::WorkerId, ) -> Result<(), GolemError> { - let worker_id = common_model::WorkerId::from_proto(inner); + let worker_id: WorkerId = inner.try_into().map_err(GolemError::invalid_request)?; self.validate_worker_id(&worker_id)?; @@ -369,11 +369,11 @@ impl + UsesAllDeps + Send + Sync + &self, request: golem::workerexecutor::GetInvocationKeyRequest, ) -> Result { - let worker_id = common_model::WorkerId::from_proto( - request - .worker_id - .ok_or(GolemError::invalid_request("worker_id not found"))?, - ); + let worker_id: WorkerId = request + .worker_id + .ok_or(GolemError::invalid_request("worker_id not found"))? + .try_into() + .map_err(GolemError::invalid_request)?; self.validate_worker_id(&worker_id)?; @@ -392,7 +392,7 @@ impl + UsesAllDeps + Send + Sync + .worker_id .ok_or(GolemError::invalid_request("worker_id not found"))?; - let worker_id = common_model::WorkerId::from_proto(worker_id); + let worker_id: WorkerId = worker_id.try_into().map_err(GolemError::invalid_request)?; let metadata = self.worker_service().get(&worker_id).await; let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata).await?; @@ -466,7 +466,7 @@ impl + UsesAllDeps + Send + Sync + .worker_id .ok_or(GolemError::invalid_request("worker_id not found"))?; - let worker_id = common_model::WorkerId::from_proto(worker_id); + let worker_id: WorkerId = worker_id.try_into().map_err(GolemError::invalid_request)?; self.validate_worker_id(&worker_id)?; @@ -699,7 +699,10 @@ impl + UsesAllDeps + Send + Sync + &self, worker_id: &golem::worker::WorkerId, ) -> Result { - let worker_id = common_model::WorkerId::from_proto(worker_id.clone()); + let worker_id: WorkerId = worker_id + .clone() + .try_into() + .map_err(GolemError::invalid_request)?; let metadata = self .worker_service() .get(&worker_id) @@ -711,19 +714,11 @@ impl + UsesAllDeps + Send + Sync + let last_error_and_retry_count = Ctx::get_last_error_and_retry_count(self, &worker_id).await; - Ok(golem::worker::WorkerMetadata { - worker_id: Some(worker_id.into_proto()), - args: metadata.args.clone(), - env: HashMap::from_iter(metadata.env.iter().cloned()), - account_id: Some(metadata.account_id.into()), - template_version: latest_status.component_version, - status: Into::::into(latest_status.status).into(), - retry_count: last_error_and_retry_count - .map(|last_error| last_error.retry_count) - .unwrap_or_default(), - // TODO: add update info - // TODO: add error details - }) + Ok(Self::create_proto_metadata( + metadata, + latest_status, + last_error_and_retry_count, + )) } async fn get_running_workers_metadata_internal( @@ -746,8 +741,11 @@ impl + UsesAllDeps + Send + Sync + .await?; let result: Vec = workers - .iter() - .map(|worker| self.to_proto_metadata(worker.clone())) + .into_iter() + .map(|worker| { + let status = worker.last_known_status.clone(); + Self::create_proto_metadata(worker, status, None) + }) .collect(); Ok(result) @@ -779,23 +777,238 @@ impl + UsesAllDeps + Send + Sync + .await?; let result: Vec = workers - .iter() - .map(|worker| self.to_proto_metadata(worker.clone())) + .into_iter() + .map(|worker| { + let status = worker.last_known_status.clone(); + Self::create_proto_metadata(worker, status, None) + }) .collect(); Ok((new_cursor, result)) } - fn to_proto_metadata(&self, value: WorkerMetadata) -> golem::worker::WorkerMetadata { + async fn update_worker_internal( + &self, + request: golem::workerexecutor::UpdateWorkerRequest, + ) -> Result<(), GolemError> { + let worker_id = request + .worker_id + .clone() + .ok_or(GolemError::invalid_request("worker_id not found"))?; + + let worker_id: WorkerId = worker_id.try_into().map_err(GolemError::invalid_request)?; + + let metadata = self.worker_service().get(&worker_id).await; + let mut worker_status = + Ctx::compute_latest_worker_status(self, &worker_id, &metadata).await?; + let metadata = metadata.ok_or(GolemError::worker_not_found(worker_id.clone()))?; + + if metadata.last_known_status.component_version == request.target_version { + return Err(GolemError::invalid_request( + "Worker is already at the target version", + )); + } + + match request.mode() { + UpdateMode::Automatic => { + let update_description = UpdateDescription::Automatic { + target_version: request.target_version, + }; + + if metadata + .last_known_status + .pending_updates + .iter() + .any(|update| update.description == update_description) + { + return Err(GolemError::invalid_request( + "The same update is already in progress", + )); + } + + match &worker_status.status { + WorkerStatus::Exited => { + warn!("Attempted updating worker {worker_id} which already exited") + } + WorkerStatus::Idle + | WorkerStatus::Interrupted + | WorkerStatus::Suspended + | WorkerStatus::Retrying + | WorkerStatus::Failed => { + // The worker is not active. + // + // We start activating it but block on a signal. + // This way we eliminate the race condition of activating the worker, but have + // time to inject the pending update oplog entry so the at the time the worker + // really gets activated it is going to see it and perform the update. + let (pending_worker, resume) = Worker::get_or_create_paused_pending( + self, + &worker_id, + metadata.args, + metadata.env, + Some(worker_status.component_version), + metadata.account_id, + ) + .await?; + + pending_worker + .invocation_queue + .enqueue_update(update_description.clone()) + .await; + + if worker_status.status == WorkerStatus::Failed { + // If the worker was previously in a permanently failed state, + // we reset this state to Retrying, so we can fix the failure cause + // with an update. + worker_status.status = WorkerStatus::Retrying; + } + worker_status.pending_updates = + pending_worker.invocation_queue.pending_updates(); + self.worker_service() + .update_status(&worker_id, &worker_status) + .await; + + resume.send(()).unwrap(); + } + WorkerStatus::Running => { + // If the worker is already running we need to write to its oplog the + // update attempt, and then interrupt it and have it immediately restarting + // to begin the update. + let worker = Worker::get_or_create_with_config( + self, + &metadata.worker_id, + metadata.args, + metadata.env, + Some(worker_status.component_version), + metadata.account_id, + ) + .await?; + + worker + .public_state + .invocation_queue() + .enqueue_update(update_description.clone()) + .await; + + worker.set_interrupting(InterruptKind::Restart); + } + } + } + + UpdateMode::Manual => { + if metadata.last_known_status.pending_invocations.iter().any(|invocation| + matches!(invocation, TimestampedWorkerInvocation { invocation: WorkerInvocation::ManualUpdate { target_version, .. }, ..} if *target_version == request.target_version) + ) { + return Err(GolemError::invalid_request( + "The same update is already in progress", + )); + } + + // For manual update we need to invoke the worker to save the custom snapshot. + // This is in a race condition with other worker invocations, so the whole update + // process need to be initiated through the worker's invocation queue. + + let pending_or_final = Worker::get_or_create_pending( + self, + &metadata.worker_id, + metadata.args, + metadata.env, + Some(worker_status.component_version), + metadata.account_id, + ) + .await?; + let (invocation_queue, _worker_id) = match pending_or_final { + PendingOrFinal::Pending(pending_worker) => ( + pending_worker.invocation_queue.clone(), + pending_worker.worker_id.clone(), + ), + PendingOrFinal::Final(worker) => ( + worker.public_state.invocation_queue(), + worker.metadata.worker_id.clone(), + ), + }; + + invocation_queue + .enqueue_manual_update(request.target_version) + .await; + } + } + + Ok(()) + } + + fn create_proto_metadata( + metadata: WorkerMetadata, + latest_status: WorkerStatusRecord, + last_error_and_retry_count: Option, + ) -> golem::worker::WorkerMetadata { + let mut updates = Vec::new(); + + for pending_invocation in &latest_status.pending_invocations { + if let TimestampedWorkerInvocation { + timestamp, + invocation: WorkerInvocation::ManualUpdate { target_version }, + } = pending_invocation + { + updates.push(golem::worker::UpdateRecord { + timestamp: Some((*timestamp).into()), + target_version: *target_version, + update: Some(golem::worker::update_record::Update::Pending( + golem::worker::PendingUpdate {}, + )), + }); + } + } + for pending_update in &latest_status.pending_updates { + updates.push(golem::worker::UpdateRecord { + timestamp: Some(pending_update.timestamp.into()), + target_version: *pending_update.description.target_version(), + update: Some(golem::worker::update_record::Update::Pending( + golem::worker::PendingUpdate {}, + )), + }); + } + for successful_update in &latest_status.successful_updates { + updates.push(golem::worker::UpdateRecord { + timestamp: Some(successful_update.timestamp.into()), + target_version: successful_update.target_version, + update: Some(golem::worker::update_record::Update::Successful( + golem::worker::SuccessfulUpdate {}, + )), + }); + } + for failed_update in &latest_status.failed_updates { + updates.push(golem::worker::UpdateRecord { + timestamp: Some(failed_update.timestamp.into()), + target_version: failed_update.target_version, + update: Some(golem::worker::update_record::Update::Failed( + golem::worker::FailedUpdate { + details: failed_update.details.clone(), + }, + )), + }); + } + updates.sort_by_key(|record| { + record.timestamp.as_ref().unwrap().seconds * 1_000_000_000 + + record.timestamp.as_ref().unwrap().nanos as i64 + }); + golem::worker::WorkerMetadata { - worker_id: Some(value.worker_id.into_proto()), - account_id: Some(value.account_id.into()), - args: value.args, - env: HashMap::from_iter(value.env.iter().cloned()), - template_version: value.last_known_status.component_version, - status: Into::::into(value.last_known_status.status) - .into(), - retry_count: 0, + worker_id: Some(metadata.worker_id.into()), + args: metadata.args.clone(), + env: HashMap::from_iter(metadata.env.iter().cloned()), + account_id: Some(metadata.account_id.into()), + template_version: latest_status.component_version, + status: Into::::into(latest_status.status).into(), + retry_count: last_error_and_retry_count + .as_ref() + .map(|last_error| last_error.retry_count) + .unwrap_or_default(), + + pending_invocation_count: latest_status.pending_invocations.len() as u64, + updates, + created_at: Some(metadata.created_at.into()), + last_error: last_error_and_retry_count.map(|last_error| last_error.error.to_string()), } } } @@ -820,7 +1033,7 @@ impl + UsesAllDeps + Send + Sync + ) -> Result, Status> { let request = request.into_inner(); let record = RecordedGrpcRequest::new( - "create_instance", + "create_worker", format!( "worker_id={:?}, template_version={:?}, account_id={:?}", request.worker_id, request.template_version, request.account_id @@ -887,7 +1100,7 @@ impl + UsesAllDeps + Send + Sync + ) -> Result, Status> { let request = request.into_inner(); let record = RecordedGrpcRequest::new( - "invoke_and_await_instance", + "invoke_and_await_worker", format!( "worker_id={:?}, name={:?}, invocation_key={:?}, calling_convention={:?}, account_id={:?}", request.worker_id, request.name, request.invocation_key, request.calling_convention, request.account_id @@ -922,7 +1135,7 @@ impl + UsesAllDeps + Send + Sync + ) -> Result, Status> { let request = request.into_inner(); let record = RecordedGrpcRequest::new( - "invoke_instance", + "invoke_worker", format!( "worker_id={:?}, name={:?}, account_id={:?}", request.worker_id, request.name, request.account_id @@ -959,11 +1172,11 @@ impl + UsesAllDeps + Send + Sync + ) -> ResponseResult { let inner = request.into_inner(); - let worker_id = common_model::WorkerId::from_proto( - inner - .worker_id - .ok_or(GolemError::invalid_request("missing worker_id"))?, - ); + let worker_id: WorkerId = inner + .worker_id + .ok_or(GolemError::invalid_request("missing worker_id"))? + .try_into() + .map_err(GolemError::invalid_request)?; let account_id: AccountId = inner .account_id .ok_or(GolemError::invalid_request("missing account_id"))? @@ -1110,8 +1323,7 @@ impl + UsesAllDeps + Send + Sync + request: Request, ) -> Result, Status> { let request = request.into_inner(); - let record = - RecordedGrpcRequest::new("delete_instance", format!("worker_id={:?}", request)); + let record = RecordedGrpcRequest::new("delete_worker", format!("worker_id={:?}", request)); match self.delete_worker_internal(request).await { Ok(_) => record.succeed(Ok(Response::new( golem::workerexecutor::DeleteWorkerResponse { @@ -1173,7 +1385,7 @@ impl + UsesAllDeps + Send + Sync + ) -> Result, Status> { let request = request.into_inner(); let record = RecordedGrpcRequest::new( - "interrupt_instance", + "interrupt_worker", format!("worker_id={:?}", request.worker_id), ); match self.interrupt_worker_internal(request).await { @@ -1313,7 +1525,7 @@ impl + UsesAllDeps + Send + Sync + ) -> Result, Status> { let request = request.into_inner(); let record = RecordedGrpcRequest::new( - "resume_instance", + "resume_worker", format!("worker_id={:?}", request.worker_id), ); match self.resume_worker_internal(request).await { @@ -1413,6 +1625,38 @@ impl + UsesAllDeps + Send + Sync + ), } } + + async fn update_worker( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let record = RecordedGrpcRequest::new( + "update_worker", + format!("worker_id={:?}", request.worker_id), + ); + match self.update_worker_internal(request).await { + Ok(_) => record.succeed(Ok(Response::new( + golem::workerexecutor::UpdateWorkerResponse { + result: Some( + golem::workerexecutor::update_worker_response::Result::Success( + golem::common::Empty {}, + ), + ), + }, + ))), + Err(err) => record.fail( + Ok(Response::new(golem::workerexecutor::UpdateWorkerResponse { + result: Some( + golem::workerexecutor::update_worker_response::Result::Failure( + err.clone().into(), + ), + ), + })), + &err, + ), + } + } } trait GrpcInvokeRequest { @@ -1447,11 +1691,11 @@ impl GrpcInvokeRequest for golem::workerexecutor::InvokeWorkerRequest { } fn worker_id(&self) -> Result { - Ok(common_model::WorkerId::from_proto( - self.worker_id - .clone() - .ok_or(GolemError::invalid_request("worker_id not found"))?, - )) + self.worker_id + .clone() + .ok_or(GolemError::invalid_request("worker_id not found"))? + .try_into() + .map_err(GolemError::invalid_request) } fn invocation_key(&self) -> Result, GolemError> { @@ -1489,11 +1733,11 @@ impl GrpcInvokeRequest for golem::workerexecutor::InvokeAndAwaitWorkerRequest { } fn worker_id(&self) -> Result { - Ok(common_model::WorkerId::from_proto( - self.worker_id - .clone() - .ok_or(GolemError::invalid_request("worker_id not found"))?, - )) + self.worker_id + .clone() + .ok_or(GolemError::invalid_request("worker_id not found"))? + .try_into() + .map_err(GolemError::invalid_request) } fn invocation_key(&self) -> Result, GolemError> { diff --git a/golem-worker-executor-base/src/services/invocation_queue.rs b/golem-worker-executor-base/src/services/invocation_queue.rs index d68ff1615f..ece64473fe 100644 --- a/golem-worker-executor-base/src/services/invocation_queue.rs +++ b/golem-worker-executor-base/src/services/invocation_queue.rs @@ -27,8 +27,11 @@ use crate::services::oplog::Oplog; use crate::services::HasOplog; use crate::worker::Worker; use crate::workerctx::WorkerCtx; -use golem_common::model::oplog::OplogEntry; -use golem_common::model::{CallingConvention, InvocationKey, WorkerId, WorkerInvocation}; +use golem_common::model::oplog::{OplogEntry, TimestampedUpdateDescription, UpdateDescription}; +use golem_common::model::{ + CallingConvention, ComponentVersion, InvocationKey, TimestampedWorkerInvocation, WorkerId, + WorkerInvocation, +}; /// Per-worker invocation queue service /// @@ -42,7 +45,8 @@ use golem_common::model::{CallingConvention, InvocationKey, WorkerId, WorkerInvo pub struct InvocationQueue { worker_id: WorkerId, oplog: Arc, - queue: Arc>>, + queue: Arc>>, + pending_updates: Arc>>, running: Arc>>>, } @@ -50,16 +54,21 @@ impl InvocationQueue { pub fn new( worker_id: WorkerId, oplog: Arc, - initial_pending_invocations: &[WorkerInvocation], + initial_pending_invocations: &[TimestampedWorkerInvocation], + initial_pending_updates: &[TimestampedUpdateDescription], ) -> Self { let queue = Arc::new(RwLock::new(VecDeque::from_iter( initial_pending_invocations.iter().cloned(), ))); + let pending_updates = Arc::new(RwLock::new(VecDeque::from_iter( + initial_pending_updates.iter().cloned(), + ))); InvocationQueue { worker_id, oplog, queue, + pending_updates, running: Arc::new(Mutex::new(None)), } } @@ -70,6 +79,7 @@ impl InvocationQueue { *running = Some(RunningInvocationQueue::new(worker, self.queue.clone())); } + /// Enqueue invocation of an exported function pub async fn enqueue( &self, invocation_key: InvocationKey, @@ -93,35 +103,96 @@ impl InvocationQueue { "Worker {} is initializing, persisting pending invocation", self.worker_id ); - let invocation = WorkerInvocation { + let invocation = WorkerInvocation::ExportedFunction { invocation_key, full_function_name, function_input, calling_convention, }; - self.queue.write().unwrap().push_back(invocation.clone()); - self.oplog - .add(OplogEntry::pending_worker_invocation(invocation)) - .await; + let entry = OplogEntry::pending_worker_invocation(invocation.clone()); + let timestamped_invocation = TimestampedWorkerInvocation { + timestamp: entry.timestamp(), + invocation, + }; + self.queue + .write() + .unwrap() + .push_back(timestamped_invocation); + self.oplog.add(entry).await; self.oplog.commit().await; } } } - pub fn pending_invocations(&self) -> Vec { + /// Enqueue attempting an update. + /// + /// The update itself is not performed by the invocation queue's processing loop, + /// it is going to affect how the worker is recovered next time. + pub async fn enqueue_update(&self, update_description: UpdateDescription) { + let entry = OplogEntry::pending_update(update_description.clone()); + let timestamped_update = TimestampedUpdateDescription { + timestamp: entry.timestamp(), + description: update_description, + }; + self.pending_updates + .write() + .unwrap() + .push_back(timestamped_update); + self.oplog.add(entry).await; + self.oplog.commit().await; + } + + /// Enqueues a manual update. + /// + /// This enqueues a special function invocation that saves the component's state and + /// triggers a restart immediately. + pub async fn enqueue_manual_update(&self, target_version: ComponentVersion) { + match self.running.lock().await.as_ref() { + Some(running) => { + running.enqueue_manual_update(target_version).await; + } + None => { + debug!( + "Worker {} is initializing, persisting manual update request", + self.worker_id + ); + let invocation = WorkerInvocation::ManualUpdate { target_version }; + let entry = OplogEntry::pending_worker_invocation(invocation.clone()); + let timestamped_invocation = TimestampedWorkerInvocation { + timestamp: entry.timestamp(), + invocation, + }; + self.queue + .write() + .unwrap() + .push_back(timestamped_invocation); + self.oplog.add(entry).await; + self.oplog.commit().await; + } + } + } + + pub fn pending_invocations(&self) -> Vec { self.queue.read().unwrap().iter().cloned().collect() } + + pub fn pending_updates(&self) -> VecDeque { + self.pending_updates.read().unwrap().clone() + } } struct RunningInvocationQueue { _handle: Option>, sender: UnboundedSender<()>, - queue: Arc>>, + queue: Arc>>, worker: Weak>, } impl RunningInvocationQueue { - pub fn new(worker: Arc>, queue: Arc>>) -> Self { + pub fn new( + worker: Arc>, + queue: Arc>>, + ) -> Self { let worker_id = worker.metadata.worker_id.clone(); let worker = Arc::downgrade(&worker); @@ -159,12 +230,26 @@ impl RunningInvocationQueue { function_input: Vec, calling_convention: CallingConvention, ) { - let invocation = WorkerInvocation { + let invocation = WorkerInvocation::ExportedFunction { invocation_key, full_function_name, function_input, calling_convention, }; + self.enqueue_worker_invocation(invocation).await; + } + + pub async fn enqueue_manual_update(&self, target_version: ComponentVersion) { + let invocation = WorkerInvocation::ManualUpdate { target_version }; + self.enqueue_worker_invocation(invocation).await; + } + + async fn enqueue_worker_invocation(&self, invocation: WorkerInvocation) { + let entry = OplogEntry::pending_worker_invocation(invocation.clone()); + let timestamped_invocation = TimestampedWorkerInvocation { + timestamp: entry.timestamp(), + invocation, + }; if let Some(worker) = self.worker.upgrade() { if worker.store.try_lock().is_none() { debug!( @@ -172,21 +257,20 @@ impl RunningInvocationQueue { worker.metadata.worker_id ); // The worker is currently busy, so we write the pending worker invocation to the oplog - worker - .public_state - .oplog() - .add(OplogEntry::pending_worker_invocation(invocation.clone())) - .await; + worker.public_state.oplog().add(entry).await; worker.public_state.oplog().commit().await; } } - self.queue.write().unwrap().push_back(invocation); + self.queue + .write() + .unwrap() + .push_back(timestamped_invocation); self.sender.send(()).unwrap() } async fn invocation_loop( mut receiver: UnboundedReceiver<()>, - active: Arc>>, + active: Arc>>, worker: Weak>, worker_id: WorkerId, ) { @@ -206,24 +290,37 @@ impl RunningInvocationQueue { let mut store_mutex = store.lock().await; let store = store_mutex.deref_mut(); - store - .data_mut() - .set_current_invocation_key(message.invocation_key) - .await; + match message.invocation { + WorkerInvocation::ExportedFunction { + invocation_key, + full_function_name, + function_input, + calling_convention, + } => { + store + .data_mut() + .set_current_invocation_key(invocation_key) + .await; + + // Make sure to update the pending invocation queue in the status record before + // the invocation writes the invocation start oplog entry + store.data_mut().update_pending_invocations().await; - // Make sure to update the pending invocation queue in the status record before - // the invocation writes the invocation start oplog entry - store.data_mut().update_pending_invocations().await; - - let _ = invoke_worker( - message.full_function_name, - message.function_input, - store, - instance, - message.calling_convention, - true, // Invocation queue is always initialized _after_ the worker recovery - ) - .await; + let _ = invoke_worker( + full_function_name, + function_input, + store, + instance, + calling_convention, + true, // Invocation queue is always initialized _after_ the worker recovery + ) + .await; + } + WorkerInvocation::ManualUpdate { target_version: _ } => { + // TODO: invoke snapshot save function, write pending update oplog entry and deactivate the worker + todo!() + } + } } else { warn!( "Lost invocation message because the worker {worker_id} was dropped: {message:?}" diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index 7e270709cf..6b9a65dabd 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -21,11 +21,12 @@ use async_mutex::Mutex; use bytes::Bytes; use golem_common::cache::PendingOrFinal; use golem_common::config::RetryConfig; -use golem_common::model::oplog::{OplogEntry, UpdateDescription}; +use golem_common::model::oplog::{OplogEntry, TimestampedUpdateDescription, UpdateDescription}; use golem_common::model::regions::{DeletedRegions, DeletedRegionsBuilder}; use golem_common::model::{ AccountId, CallingConvention, FailedUpdateRecord, InvocationKey, SuccessfulUpdateRecord, - Timestamp, WorkerId, WorkerInvocation, WorkerMetadata, WorkerStatus, WorkerStatusRecord, + Timestamp, TimestampedWorkerInvocation, WorkerId, WorkerInvocation, WorkerMetadata, + WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::Value; use tokio::sync::broadcast::Receiver; @@ -316,6 +317,12 @@ impl Worker { .last_known_status .pending_invocations .clone(); + let initial_pending_updates = worker_metadata + .last_known_status + .pending_updates + .iter() + .cloned() + .collect::>(); let worker_details = this .active_workers() @@ -327,6 +334,7 @@ impl Worker { config_clone, oplog, &initial_pending_invocations, + &initial_pending_updates, ) }, |pending_worker| { @@ -356,8 +364,8 @@ impl Worker { /// Gets an already active worker or creates a new one and returns the pending worker object /// - /// The pending worker object holds a reference to the event service of the worker that is getting - /// created, allowing the caller to connect to the worker's event stream even before it is fully + /// The pending worker object holds a reference to the event service, invocation queue and oplog + /// of the worker that is getting created, allowing the caller to connect to the worker's event stream even before it is fully /// initialized. pub async fn get_or_create_pending( this: &T, @@ -392,6 +400,12 @@ impl Worker { .last_known_status .pending_invocations .clone(); + let initial_pending_updates = worker_metadata + .last_known_status + .pending_updates + .iter() + .cloned() + .collect::>(); this.active_workers() .get_pending_with( @@ -402,6 +416,7 @@ impl Worker { config_clone, oplog, &initial_pending_invocations, + &initial_pending_updates, ) }, move |pending_worker| { @@ -422,6 +437,95 @@ impl Worker { .await } + /// Creates a new worker and returns the pending worker object, and pauses loading + /// the worker until an explicit call to a oneshot resume channel. + /// + /// If the worker is already active, the function fails. + /// + /// The pending worker object holds a reference to the event service, invocation queue and oplog + /// of the worker that is getting created, allowing the caller to connect to the worker's event stream even before it is fully + /// initialized. + pub async fn get_or_create_paused_pending( + this: &T, + worker_id: &WorkerId, + worker_args: Vec, + worker_env: Vec<(String, String)>, + template_version: Option, + account_id: AccountId, + ) -> Result<(PendingWorker, tokio::sync::oneshot::Sender<()>), GolemError> + where + T: HasAll + Clone + Send + Sync + 'static, + { + let this_clone = this.clone(); + let worker_id_clone_1 = worker_id.clone(); + let worker_id_clone_2 = worker_id.clone(); + let worker_args_clone = worker_args.clone(); + let worker_env_clone = worker_env.clone(); + let config_clone = this.config().clone(); + + let worker_metadata = Self::get_or_create_worker_metadata( + this, + worker_id, + template_version, + worker_args.clone(), + worker_env.clone(), + account_id, + ) + .await?; + + let oplog = this.oplog_service().open(worker_id).await; + let initial_pending_invocations = worker_metadata + .last_known_status + .pending_invocations + .clone(); + let initial_pending_updates = worker_metadata + .last_known_status + .pending_updates + .iter() + .cloned() + .collect::>(); + + let (resume_sender, resume_receiver) = tokio::sync::oneshot::channel(); + + let pending_or_final = this + .active_workers() + .get_pending_with( + worker_id.clone(), + || { + PendingWorker::new( + worker_id_clone_1, + config_clone, + oplog, + &initial_pending_invocations, + &initial_pending_updates, + ) + }, + move |pending_worker| { + let pending_worker_clone = pending_worker.clone(); + Box::pin(async move { + resume_receiver.await.unwrap(); + Worker::new( + &this_clone, + worker_id_clone_2, + worker_args_clone, + worker_env_clone, + worker_metadata, + &pending_worker_clone, + ) + .await + }) + }, + ) + .await?; + + match pending_or_final { + PendingOrFinal::Pending(pending) => Ok((pending, resume_sender)), + PendingOrFinal::Final(_) => Err(GolemError::unknown( + "Worker was unexpectedly already active", + )), + } + } + /// Looks up a given invocation key's current status. /// As the invocation key status is only stored in memory, we need to have an active /// instance (instance_details) to call this function. @@ -572,12 +676,14 @@ impl PendingWorker { worker_id: WorkerId, config: Arc, oplog: Arc, - initial_pending_invocations: &[WorkerInvocation], + initial_pending_invocations: &[TimestampedWorkerInvocation], + initial_pending_updates: &[TimestampedUpdateDescription], ) -> Result, GolemError> { let invocation_queue = Arc::new(InvocationQueue::new( worker_id.clone(), oplog.clone(), initial_pending_invocations, + initial_pending_updates, )); Ok(PendingWorker { @@ -894,7 +1000,11 @@ fn calculate_latest_worker_status( result = WorkerStatus::Running; } OplogEntry::PendingWorkerInvocation { .. } => {} - OplogEntry::PendingUpdate { .. } => {} + OplogEntry::PendingUpdate { .. } => { + if result == WorkerStatus::Failed { + result = WorkerStatus::Retrying; + } + } OplogEntry::FailedUpdate { .. } => {} OplogEntry::SuccessfulUpdate { .. } => {} } @@ -947,18 +1057,49 @@ fn calculate_overridden_retry_policy( } fn calculate_pending_invocations( - initial: Vec, + initial: Vec, entries: &[OplogEntry], -) -> Vec { +) -> Vec { let mut result = initial; for entry in entries { match entry { - OplogEntry::PendingWorkerInvocation { invocation, .. } => { - result.push(invocation.clone()); + OplogEntry::PendingWorkerInvocation { + timestamp, + invocation, + .. + } => { + result.push(TimestampedWorkerInvocation { + timestamp: *timestamp, + invocation: invocation.clone(), + }); } OplogEntry::ExportedFunctionInvoked { invocation_key, .. } => { - result.retain(|invocation| &invocation.invocation_key != invocation_key); + result.retain(|invocation| match invocation { + TimestampedWorkerInvocation { + invocation: + WorkerInvocation::ExportedFunction { + invocation_key: key, + .. + }, + .. + } => key != invocation_key, + _ => true, + }); } + OplogEntry::PendingUpdate { + description: UpdateDescription::SnapshotBased { target_version, .. }, + .. + } => result.retain(|invocation| match invocation { + TimestampedWorkerInvocation { + invocation: + WorkerInvocation::ManualUpdate { + target_version: version, + .. + }, + .. + } => version != target_version, + _ => true, + }), _ => {} } } @@ -966,13 +1107,13 @@ fn calculate_pending_invocations( } fn calculate_update_fields( - initial_pending_updates: VecDeque, + initial_pending_updates: VecDeque, initial_failed_updates: Vec, initial_successful_updates: Vec, initial_version: u64, entries: &Vec, ) -> ( - VecDeque, + VecDeque, Vec, Vec, u64, @@ -988,8 +1129,15 @@ fn calculate_update_fields( } => { version = *component_version; } - OplogEntry::PendingUpdate { description, .. } => { - pending_updates.push_back(description.clone()); + OplogEntry::PendingUpdate { + timestamp, + description, + .. + } => { + pending_updates.push_back(TimestampedUpdateDescription { + timestamp: *timestamp, + description: description.clone(), + }); } OplogEntry::FailedUpdate { timestamp, diff --git a/golem-worker-service-base/src/service/worker/default.rs b/golem-worker-service-base/src/service/worker/default.rs index 9dd1446872..ed25679362 100644 --- a/golem-worker-service-base/src/service/worker/default.rs +++ b/golem-worker-service-base/src/service/worker/default.rs @@ -21,8 +21,8 @@ use golem_api_grpc::proto::golem::workerexecutor::{ }; use golem_common::model::{ - AccountId, CallingConvention, FilterComparator, InvocationKey, TemplateId, WorkerFilter, - WorkerStatus, + AccountId, CallingConvention, FilterComparator, InvocationKey, TemplateId, Timestamp, + WorkerFilter, WorkerStatus, }; use golem_service_base::model::{ GolemErrorUnknown, PromiseId, ResourceLimits, WorkerId, WorkerMetadata, @@ -1418,6 +1418,10 @@ where status: golem_common::model::WorkerStatus::Running, template_version: 0, retry_count: 0, + pending_invocation_count: 0, + updates: vec![], + created_at: Timestamp::now_utc(), + last_error: None, }) } diff --git a/openapi/golem-service.yaml b/openapi/golem-service.yaml index 596c3c966c..b88db655e0 100644 --- a/openapi/golem-service.yaml +++ b/openapi/golem-service.yaml @@ -1844,6 +1844,20 @@ components: type: string required: - errors + FailedUpdate: + type: object + properties: + timestamp: + type: string + format: date-time + targetVersion: + type: integer + format: uint64 + details: + type: string + required: + - timestamp + - targetVersion FilterComparator: type: string enum: @@ -2367,6 +2381,18 @@ components: - Options - Trace - Head + PendingUpdate: + type: object + properties: + timestamp: + type: string + format: date-time + targetVersion: + type: integer + format: uint64 + required: + - timestamp + - targetVersion PromiseId: type: object properties: @@ -2425,6 +2451,60 @@ components: - NotEqual - Like - NotLike + SuccessfulUpdate: + type: object + properties: + timestamp: + type: string + format: date-time + targetVersion: + type: integer + format: uint64 + required: + - timestamp + - targetVersion + UpdateRecord: + discriminator: + propertyName: type + mapping: + pendingUpdate: '#/components/schemas/UpdateRecord_PendingUpdate' + successfulUpdate: '#/components/schemas/UpdateRecord_SuccessfulUpdate' + failedUpdate: '#/components/schemas/UpdateRecord_FailedUpdate' + type: object + oneOf: + - $ref: '#/components/schemas/UpdateRecord_PendingUpdate' + - $ref: '#/components/schemas/UpdateRecord_SuccessfulUpdate' + - $ref: '#/components/schemas/UpdateRecord_FailedUpdate' + UpdateRecord_FailedUpdate: + allOf: + - type: object + properties: + type: + example: failedUpdate + type: string + required: + - type + - $ref: '#/components/schemas/FailedUpdate' + UpdateRecord_PendingUpdate: + allOf: + - type: object + properties: + type: + example: pendingUpdate + type: string + required: + - type + - $ref: '#/components/schemas/PendingUpdate' + UpdateRecord_SuccessfulUpdate: + allOf: + - type: object + properties: + type: + example: successfulUpdate + type: string + required: + - type + - $ref: '#/components/schemas/SuccessfulUpdate' ValidationErrorsBody: type: object properties: @@ -2648,6 +2728,18 @@ components: retryCount: type: integer format: uint64 + pendingInvocationCount: + type: integer + format: uint64 + updates: + type: array + items: + $ref: '#/components/schemas/UpdateRecord' + createdAt: + type: string + format: date-time + lastError: + type: string required: - workerId - args @@ -2655,6 +2747,9 @@ components: - status - templateVersion - retryCount + - pendingInvocationCount + - updates + - createdAt WorkerNameFilter: type: object properties: