Skip to content

Commit

Permalink
Explicit commit (#320)
Browse files Browse the repository at this point in the history
* New host functions, unimplemented

* Implementation of oplog_commit

* Test and fix

* golem-wit 0.2.1

* Fix after wit update

* Format
  • Loading branch information
vigoo authored Mar 26, 2024
1 parent 7b5a90d commit f8c975c
Show file tree
Hide file tree
Showing 34 changed files with 246 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions golem-common/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{atomic, Arc};
use std::time::Instant;
Expand All @@ -21,11 +22,11 @@ use bytes::Bytes;
use fred::clients::Transaction;
use fred::prelude::{RedisPool as FredRedisPool, *};
use fred::types::{
Limit, MultipleKeys, MultipleOrderedPairs, MultipleValues, MultipleZaddValues, Ordering,
RedisKey, RedisMap, XCap, ZRange, ZSort, XID,
InfoKind, Limit, MultipleKeys, MultipleOrderedPairs, MultipleValues, MultipleZaddValues,
Ordering, RedisKey, RedisMap, XCap, ZRange, ZSort, XID,
};
use serde::de::DeserializeOwned;
use tracing::Level;
use tracing::{debug, Level};

use crate::metrics::redis::{record_redis_failure, record_redis_success};
use crate::serialization::{deserialize, serialize};
Expand Down Expand Up @@ -609,6 +610,30 @@ impl<'a> RedisLabelledApi<'a> {

self.record(start, "MULTI", trx.trx.exec(true).await)
}

pub async fn wait(&self, replicas: i64, timeout: i64) -> RedisResult<i64> {
self.ensure_connected().await?;
let start = Instant::now();
self.record(start, "WAIT", self.pool.wait(replicas, timeout).await)
}

pub async fn info_connected_slaves(&self) -> RedisResult<u8> {
self.ensure_connected().await?;
let start = Instant::now();
let info: String = self.record(
start,
"INFO",
self.pool.info(Some(InfoKind::Replication)).await,
)?;
let info: HashMap<&str, &str> =
HashMap::from_iter(info.lines().filter_map(|line| line.trim().split_once(':')));
debug!("Redis replication info: {:?}", info);
let connected_slaves = info
.get("connected_slaves")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
Ok(connected_slaves)
}
}

pub struct RedisTransaction {
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fs-set-times = "0.20.1"
futures = { workspace = true }
futures-util = { workspace = true }
gethostname = "0.4.3"
golem-wit = "0.2.0"
golem-wit = { version = "0.2.1" }
http = { workspace = true }
http_02 = { package = "http", version = "0.2.11" }
http-body = "1.0.0" # keep in sync with wasmtime
Expand Down
69 changes: 68 additions & 1 deletion golem-worker-executor-base/src/durable_host/golem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::anyhow;
use async_trait::async_trait;
use std::time::Duration;
use tracing::debug;
use uuid::Uuid;

Expand All @@ -22,7 +23,7 @@ use crate::durable_host::DurableWorkerCtx;
use crate::metrics::wasm::record_host_function_call;
use crate::model::InterruptKind;
use crate::preview2::golem;
use crate::preview2::golem::api::host::OplogIndex;
use crate::preview2::golem::api::host::{OplogIndex, PersistenceLevel, RetryPolicy};
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::OplogEntry;
use golem_common::model::regions::OplogRegion;
Expand Down Expand Up @@ -138,6 +139,72 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for DurableWorkerCtx<Ctx> {
Ok(())
}
}

async fn oplog_commit(&mut self, replicas: u8) -> anyhow::Result<()> {
if self.is_live() {
let timeout = Duration::from_secs(1);
debug!(
"Worker {} committing oplog to {} replicas",
self.worker_id, replicas
);
loop {
// Applying a timeout to make sure the worker remains interruptible
if self.commit_oplog_to_replicas(replicas, timeout).await {
debug!(
"Worker {} committed oplog to {} replicas",
self.worker_id, replicas
);
return Ok(());
} else {
debug!(
"Worker {} failed to commit oplog to {} replicas, retrying",
self.worker_id, replicas
);
}

if let Some(kind) = self.check_interrupt() {
return Err(kind.into());
}
}
} else {
Ok(())
}
}

async fn get_retry_policy(&mut self) -> anyhow::Result<RetryPolicy> {
unimplemented!()
}

async fn set_retry_policy(&mut self, _new_retry_policy: RetryPolicy) -> anyhow::Result<()> {
unimplemented!()
}

async fn mark_begin_operation(&mut self) -> anyhow::Result<OplogIndex> {
unimplemented!()
}

async fn mark_end_operation(&mut self, _begin: OplogIndex) -> anyhow::Result<()> {
unimplemented!()
}

async fn get_oplog_persistence_level(&mut self) -> anyhow::Result<PersistenceLevel> {
unimplemented!()
}

async fn set_oplog_persistence_level(
&mut self,
_new_persistence_level: PersistenceLevel,
) -> anyhow::Result<()> {
unimplemented!()
}

async fn get_idempotence_mode(&mut self) -> anyhow::Result<bool> {
unimplemented!()
}

async fn set_idempotence_mode(&mut self, _idempotent: bool) -> anyhow::Result<()> {
unimplemented!()
}
}

impl From<WorkerId> for golem::api::host::WorkerId {
Expand Down
13 changes: 13 additions & 0 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
self.private_state.commit_oplog().await
}

pub async fn commit_oplog_to_replicas(&mut self, replicas: u8, timeout: Duration) -> bool {
self.private_state
.commit_oplog_to_replicas(replicas, timeout)
.await
}

async fn get_oplog_entry_marker(&mut self) -> Result<(), GolemError> {
self.private_state.get_oplog_entry_marker().await
}
Expand Down Expand Up @@ -1089,6 +1095,13 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
self.oplog_service.append(worker_id, &arrays).await
}

pub async fn commit_oplog_to_replicas(&mut self, replicas: u8, timeout: Duration) -> bool {
self.commit_oplog().await;
self.oplog_service
.wait_for_replicas(replicas, timeout)
.await
}

pub async fn get_oplog_size(&mut self) -> u64 {
self.oplog_service.get_size(&self.worker_id).await
}
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub trait Bootstrap<Ctx: WorkerCtx> {

let blob_store_service = blob_store::configured(&golem_config.blob_store_service).await;

let oplog_service = Arc::new(OplogServiceDefault::new(pool.clone()));
let oplog_service = Arc::new(OplogServiceDefault::new(pool.clone()).await);

let scheduler_service = SchedulerServiceDefault::new(
pool.clone(),
Expand Down
38 changes: 36 additions & 2 deletions golem-worker-executor-base/src/services/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -22,6 +23,7 @@ use golem_common::metrics::redis::record_redis_serialized_size;
use golem_common::model::oplog::OplogEntry;
use golem_common::model::WorkerId;
use golem_common::redis::RedisPool;
use tracing::error;

use crate::metrics::oplog::record_oplog_call;

Expand All @@ -34,16 +36,28 @@ pub trait OplogService {
async fn delete(&self, worker_id: &WorkerId);

async fn read(&self, worker_id: &WorkerId, idx: u64, n: u64) -> Vec<OplogEntry>;

/// Waits until Redis writes all changes into at least `replicas` replicas (or the maximum
/// available).
/// Returns true if the maximum possible number of replicas is reached within the timeout,
/// otherwise false.
async fn wait_for_replicas(&self, replicas: u8, timeout: Duration) -> bool;
}

#[derive(Clone, Debug)]
pub struct OplogServiceDefault {
redis: RedisPool,
replicas: u8,
}

impl OplogServiceDefault {
pub fn new(redis: RedisPool) -> Self {
Self { redis }
pub async fn new(redis: RedisPool) -> Self {
let replicas = redis
.with("oplog", "new")
.info_connected_slaves()
.await
.unwrap_or_else(|err| panic!("failed to get the number of replicas from Redis: {err}"));
Self { redis, replicas }
}
}

Expand Down Expand Up @@ -170,6 +184,22 @@ impl OplogService for OplogServiceDefault {

entries
}

async fn wait_for_replicas(&self, replicas: u8, timeout: Duration) -> bool {
let replicas = replicas.min(self.replicas);
match self
.redis
.with("oplog", "wait_for_replicas")
.wait(replicas as i64, timeout.as_millis() as i64)
.await
{
Ok(n) => n as u8 == replicas,
Err(err) => {
error!("Failed to execute WAIT command: {:?}", err);
false
}
}
}
}

fn get_oplog_redis_key(worker_id: &WorkerId) -> String {
Expand Down Expand Up @@ -211,4 +241,8 @@ impl OplogService for OplogServiceMock {
async fn read(&self, _worker_id: &WorkerId, _idx: u64, _n: u64) -> Vec<OplogEntry> {
unimplemented!()
}

async fn wait_for_replicas(&self, _replicas: u8, _timeout: Duration) -> bool {
unimplemented!()
}
}
26 changes: 26 additions & 0 deletions golem-worker-executor-base/tests/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,29 @@ async fn jump() {
]
);
}

#[tokio::test]
async fn explicit_oplog_commit() {
let context = common::TestContext::new();
let mut executor = common::start(&context).await.unwrap();

let template_id = executor.store_template(Path::new("../test-templates/runtime-service.wasm"));

let worker_id = executor
.start_worker(&template_id, "runtime-service-explicit-oplog-commit")
.await;

executor.log_output(&worker_id).await;

// Note: we can only test with replicas=0 because we don't have redis slaves in the test environment currently
let result = executor
.invoke_and_await(
&worker_id,
"golem:it/api/explicit-commit",
vec![common::val_u8(0)],
)
.await;

drop(executor);
check!(result.is_ok());
}
Binary file modified test-templates/blob-store-service.wasm
Binary file not shown.
Binary file modified test-templates/clock-service.wasm
Binary file not shown.
Binary file modified test-templates/clocks.wasm
Binary file not shown.
Binary file modified test-templates/directories.wasm
Binary file not shown.
Binary file modified test-templates/environment-service.wasm
Binary file not shown.
Binary file modified test-templates/failing-component.wasm
Binary file not shown.
Binary file modified test-templates/file-service.wasm
Binary file not shown.
Binary file modified test-templates/file-write-read-delete.wasm
Binary file not shown.
Binary file modified test-templates/flags-service.wasm
Binary file not shown.
Binary file modified test-templates/http-client-2.wasm
Binary file not shown.
Binary file modified test-templates/http-client.wasm
Binary file not shown.
Binary file modified test-templates/interruption.wasm
Binary file not shown.
Binary file modified test-templates/key-value-service.wasm
Binary file not shown.
Binary file modified test-templates/networking.wasm
Binary file not shown.
Binary file modified test-templates/option-service.wasm
Binary file not shown.
Binary file modified test-templates/read-stdin.wasm
Binary file not shown.
Binary file modified test-templates/runtime-service.wasm
Binary file not shown.
4 changes: 3 additions & 1 deletion test-templates/runtime-service/cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ path = "wit"

[package.metadata.component.target.dependencies]
"golem:api" = { path = "wit/deps/golem" }
"golem:rpc" = { path = "wit/deps/wasm-rpc" }
"golem:rpc" = { path = "wit/deps/wasm-rpc" }
"wasi:clocks" = { path = "wit/deps/clocks" }
"wasi:io" = { path = "wit/deps/io" }
7 changes: 7 additions & 0 deletions test-templates/runtime-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ impl Guest for Component {

state // final value is 5
}

fn explicit_commit(replicas: u8) {
let now = std::time::SystemTime::now();
println!("Starting commit with {replicas} replicas at {now:?}");
oplog_commit(replicas);
println!("Finished commit");
}
}

fn remote_call(param: u64) -> bool {
Expand Down
Loading

0 comments on commit f8c975c

Please sign in to comment.