Skip to content

Commit

Permalink
Special oplog implementation for ephemeral workers (#972)
Browse files Browse the repository at this point in the history
* Special oplog implementation for ephemeral workers

* Fixes and perf improvement
  • Loading branch information
vigoo authored Sep 25, 2024
1 parent 15ca91a commit 36c92d2
Show file tree
Hide file tree
Showing 19 changed files with 459 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use wasmtime::component::Resource;
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::services::oplog::CommitLevel;
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::clocks::monotonic_clock::{Duration, Host, Instant, Pollable};
Expand Down Expand Up @@ -64,7 +65,7 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await?;
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::DurableOnly).await;
let when = now.saturating_add(when);
Host::subscribe_instant(&mut self.as_wasi_view(), when).await
}
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/durable_host/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crate::durable_host::DurableWorkerCtx;
use crate::error::GolemError;
use crate::model::PersistenceLevel;
use crate::services::oplog::OplogOps;
use crate::services::oplog::{CommitLevel, OplogOps};
use crate::workerctx::WorkerCtx;
use async_trait::async_trait;
use bincode::{Decode, Encode};
Expand Down Expand Up @@ -342,7 +342,7 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
WrappedFunctionType::WriteRemoteBatched(_)
)
{
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::DurableOnly).await;
}
}
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion golem-worker-executor-base/src/durable_host/golem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::preview2::golem::api::host::{
ComponentVersion, HostGetWorkers, PersistenceLevel, RetryPolicy, UpdateMode, Uri,
WorkerMetadata,
};
use crate::services::oplog::CommitLevel;
use crate::services::HasWorker;
use crate::workerctx::{InvocationManagement, StatusManagement, WorkerCtx};
use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType};
Expand Down Expand Up @@ -422,7 +423,7 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for DurableWorkerCtx<Ctx> {
record_host_function_call("golem::api", "set_oplog_persistence_level");
// commit all pending entries and change persistence level
if self.state.is_live() {
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::DurableOnly).await;
}
self.state.persistence_level = new_persistence_level.into();
debug!(
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/durable_host/http/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::durable_host::{Durability, DurableWorkerCtx, HttpRequestCloseOwner};
use crate::get_oplog_entry;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::services::oplog::OplogOps;
use crate::services::oplog::{CommitLevel, OplogOps};
use crate::workerctx::WorkerCtx;

impl<Ctx: WorkerCtx> HostFields for DurableWorkerCtx<Ctx> {
Expand Down Expand Up @@ -657,7 +657,7 @@ impl<Ctx: WorkerCtx> HostFutureIncomingResponse for DurableWorkerCtx<Ctx> {
)
.await
.unwrap_or_else(|err| panic!("failed to serialize http response: {err}"));
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::DurableOnly).await;
}

if !matches!(serializable_response, SerializableResponse::Pending) {
Expand Down
11 changes: 6 additions & 5 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView};
use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut};
use crate::durable_host::wasm_rpc::UrnExtensions;
use crate::metrics::wasm::{record_number_of_replayed_functions, record_resume_worker};
use crate::services::oplog::{Oplog, OplogOps, OplogService};
use crate::services::oplog::{CommitLevel, Oplog, OplogOps, OplogService};
use crate::services::rpc::Rpc;
use crate::services::scheduler::SchedulerService;
use crate::services::HasOplogService;
Expand Down Expand Up @@ -712,9 +712,10 @@ impl<Ctx: WorkerCtx> StatusManagement for DurableWorkerCtx<Ctx> {
async fn store_worker_status(&self, status: WorkerStatus) {
self.update_worker_status(|s| s.status = status.clone())
.await;
if status == WorkerStatus::Idle
if (status == WorkerStatus::Idle
|| status == WorkerStatus::Failed
|| status == WorkerStatus::Exited
|| status == WorkerStatus::Exited)
&& self.component_metadata().component_type == ComponentType::Durable
{
debug!("Scheduling oplog archive");
let at = Utc::now().add(self.state.config.oplog.archive_interval);
Expand Down Expand Up @@ -770,7 +771,7 @@ impl<Ctx: WorkerCtx> InvocationHooks for DurableWorkerCtx<Ctx> {
self.worker_id()
)
});
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::Always).await;
}
Ok(())
}
Expand Down Expand Up @@ -860,7 +861,7 @@ impl<Ctx: WorkerCtx> InvocationHooks for DurableWorkerCtx<Ctx> {
.unwrap_or_else(|err| {
panic!("could not encode function result for {full_function_name}: {err}")
});
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::Always).await;
let oplog_idx = self.state.oplog.current_oplog_index().await;

if let Some(idempotency_key) = self.state.get_current_idempotency_key() {
Expand Down
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::error::GolemError;
use crate::get_oplog_entry;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::services::oplog::OplogOps;
use crate::services::oplog::{CommitLevel, OplogOps};
use crate::services::rpc::{RpcDemand, RpcError};
use crate::workerctx::{InvocationManagement, WorkerCtx};
use anyhow::anyhow;
Expand Down Expand Up @@ -503,7 +503,7 @@ impl<Ctx: WorkerCtx> HostFutureInvokeResult for DurableWorkerCtx<Ctx> {
}
}
}
self.state.oplog.commit().await;
self.state.oplog.commit(CommitLevel::DurableOnly).await;
}

result
Expand Down
1 change: 1 addition & 0 deletions golem-worker-executor-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ pub trait Bootstrap<Ctx: WorkerCtx> {
primary,
oplog_archives,
golem_config.oplog.entry_count_limit,
golem_config.oplog.max_operations_before_commit_ephemeral,
))
}
};
Expand Down
2 changes: 2 additions & 0 deletions golem-worker-executor-base/src/services/golem_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ pub struct SchedulerConfig {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OplogConfig {
pub max_operations_before_commit: u64,
pub max_operations_before_commit_ephemeral: u64,
pub max_payload_size: usize,
pub indexed_storage_layers: usize,
pub blob_storage_layers: usize,
Expand Down Expand Up @@ -477,6 +478,7 @@ impl Default for OplogConfig {
fn default() -> Self {
Self {
max_operations_before_commit: 128,
max_operations_before_commit_ephemeral: 512,
max_payload_size: 64 * 1024,
indexed_storage_layers: 2,
blob_storage_layers: 1,
Expand Down
3 changes: 1 addition & 2 deletions golem-worker-executor-base/src/services/oplog/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use std::sync::Arc;

use async_trait::async_trait;
use evicting_cache_map::EvictingCacheMap;
use tokio::sync::RwLock;

use golem_common::model::oplog::{OplogEntry, OplogIndex};
use golem_common::model::{AccountId, ComponentId, OwnedWorkerId, ScanCursor, WorkerId};
use tokio::sync::RwLock;

use crate::error::GolemError;
use crate::services::oplog::multilayer::OplogArchive;
Expand Down
178 changes: 178 additions & 0 deletions golem-worker-executor-base/src/services/oplog/ephemeral.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2024 Golem Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::metrics::oplog::record_oplog_call;
use crate::services::oplog::multilayer::OplogArchive;
use crate::services::oplog::{CommitLevel, Oplog};
use async_mutex::Mutex;
use async_trait::async_trait;
use bytes::Bytes;
use golem_common::model::oplog::{OplogEntry, OplogIndex, OplogPayload};
use golem_common::model::OwnedWorkerId;
use std::collections::VecDeque;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;

pub struct EphemeralOplog {
owned_worker_id: OwnedWorkerId,
primary: Arc<dyn Oplog + Send + Sync>,
target: Arc<dyn OplogArchive + Send + Sync>,
state: Arc<Mutex<EphemeralOplogState>>,
close_fn: Option<Box<dyn FnOnce() + Send + Sync>>,
}

struct EphemeralOplogState {
buffer: VecDeque<OplogEntry>,
last_oplog_idx: OplogIndex,
last_committed_idx: OplogIndex,
max_operations_before_commit: u64,
target: Arc<dyn OplogArchive + Send + Sync>,
}

impl EphemeralOplogState {
async fn add(&mut self, entry: OplogEntry) {
self.buffer.push_back(entry);
if self.buffer.len() > self.max_operations_before_commit as usize {
self.commit().await;
}
self.last_oplog_idx = self.last_oplog_idx.next();
}

async fn commit(&mut self) {
let entries = self.buffer.drain(..).collect::<Vec<OplogEntry>>();

let mut pairs = Vec::new();
for entry in entries {
let oplog_idx = self.last_committed_idx.next();
pairs.push((oplog_idx, entry));
self.last_committed_idx = oplog_idx;
}

self.target.append(pairs).await
}
}

impl EphemeralOplog {
pub async fn new(
owned_worker_id: OwnedWorkerId,
last_oplog_idx: OplogIndex,
max_operations_before_commit: u64,
primary: Arc<dyn Oplog + Send + Sync>,
target: Arc<dyn OplogArchive + Send + Sync>,
close: Box<dyn FnOnce() + Send + Sync>,
) -> Self {
Self {
owned_worker_id,
primary,
target: target.clone(),
state: Arc::new(Mutex::new(EphemeralOplogState {
buffer: VecDeque::new(),
last_oplog_idx,
last_committed_idx: last_oplog_idx,
max_operations_before_commit,
target,
})),
close_fn: Some(close),
}
}
}

impl Drop for EphemeralOplog {
fn drop(&mut self) {
if let Some(close_fn) = self.close_fn.take() {
close_fn();
}
}
}

impl Debug for EphemeralOplog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EphemeralOplog")
.field("worker_id", &self.owned_worker_id)
.finish()
}
}

#[async_trait]
impl Oplog for EphemeralOplog {
async fn add(&self, entry: OplogEntry) {
record_oplog_call("add");
let mut state = self.state.lock().await;
state.add(entry).await
}

async fn drop_prefix(&self, last_dropped_id: OplogIndex) {
record_oplog_call("drop_prefix");
self.target.drop_prefix(last_dropped_id).await;
}

async fn commit(&self, level: CommitLevel) {
record_oplog_call("commit");
match level {
CommitLevel::Immediate => {
let mut state = self.state.lock().await;
state.commit().await
}
CommitLevel::Always => {
let clone = self.state.clone();
tokio::spawn(async move {
let mut state = clone.lock().await;
state.commit().await
});
}
CommitLevel::DurableOnly => {}
}
}

async fn current_oplog_index(&self) -> OplogIndex {
record_oplog_call("current_oplog_index");
let state = self.state.lock().await;
state.last_oplog_idx
}

async fn wait_for_replicas(&self, _replicas: u8, _timeout: Duration) -> bool {
record_oplog_call("wait_for_replicas");
// Not supported
false
}

async fn read(&self, oplog_index: OplogIndex) -> OplogEntry {
record_oplog_call("read");
let entries = self.target.read(oplog_index, 1).await;
if let Some(entry) = entries.get(&oplog_index) {
entry.clone()
} else {
panic!(
"Missing oplog entry {oplog_index} in {:?} for ephemeral oplog",
self.target
);
}
}

async fn length(&self) -> u64 {
record_oplog_call("length");
self.target.length().await
}

async fn upload_payload(&self, data: &[u8]) -> Result<OplogPayload, String> {
// Storing oplog payloads through the primary layer
self.primary.upload_payload(data).await
}

async fn download_payload(&self, payload: &OplogPayload) -> Result<Bytes, String> {
// Downloading oplog payloads through the primary layer
self.primary.download_payload(payload).await
}
}
Loading

0 comments on commit 36c92d2

Please sign in to comment.