Skip to content

Commit

Permalink
Derived idempotency keys (#965)
Browse files Browse the repository at this point in the history
* Derived idempotency keys

* Test

* Fix sizes
  • Loading branch information
vigoo authored Sep 24, 2024
1 parent 010b665 commit b15e2ee
Show file tree
Hide file tree
Showing 40 changed files with 362 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ tracing-subscriber = { version = "0.3.18", features = [
] }
tracing-test = "0.2.5"
url = "2.5.0"
uuid = { version = "1.7.0", features = ["serde", "v4"] }
uuid = { version = "1.7.0", features = ["serde", "v4", "v5"] }
warp = "0.3.6"
wasm-wave = "=0.6.0"
wasmtime = { version = "=21.0.1", features = ["component-model"] }
Expand Down
2 changes: 1 addition & 1 deletion golem-cli/tests/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn text_component_list(
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
| URN | Name | Version | Size | Exports count |
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
| {} | {} | 0 | 71828 | 2 |
| {} | {} | 0 | 71228 | 2 |
+----------------------------------------------------+-------------------------------+---------+-------+---------------+
",
component.component_urn,
Expand Down
74 changes: 71 additions & 3 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use poem_openapi::{Enum, Object, Union};
use rand::prelude::IteratorRandom;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::Value;
use uuid::Uuid;
use uuid::{uuid, Uuid};

pub mod component_metadata;
pub mod exports;
Expand Down Expand Up @@ -788,6 +788,8 @@ pub struct IdempotencyKey {
}

impl IdempotencyKey {
const ROOT_NS: Uuid = uuid!("9C19B15A-C83D-46F7-9BC3-EAD7923733F4");

pub fn new(value: String) -> Self {
Self { value }
}
Expand All @@ -801,6 +803,25 @@ impl IdempotencyKey {
pub fn fresh() -> Self {
Self::from_uuid(Uuid::new_v4())
}

/// Generates a deterministic new idempotency key using a base idempotency key and an oplog index.
///
/// The base idempotency key determines the "namespace" of the generated key UUIDv5. If
/// the base idempotency key is already an UUID, it is directly used as the namespace of the v5 algorithm,
/// while the name part is derived from the given oplog index.
///
/// If the base idempotency key is not an UUID (as it can be an arbitrary user-provided string), then first
/// we generate a UUIDv5 in the ROOT_NS namespace and use that as unique namespace for generating
/// the new idempotency key.
pub fn derived(base: &IdempotencyKey, oplog_index: OplogIndex) -> Self {
let namespace = if let Ok(base_uuid) = Uuid::parse_str(&base.value) {
base_uuid
} else {
Uuid::new_v5(&Self::ROOT_NS, base.value.as_bytes())
};
let name = format!("oplog-index-{}", oplog_index);
Self::from_uuid(Uuid::new_v5(&namespace, name.as_bytes()))
}
}

impl From<golem_api_grpc::proto::golem::worker::IdempotencyKey> for IdempotencyKey {
Expand Down Expand Up @@ -2342,9 +2363,11 @@ mod tests {
use std::time::SystemTime;
use std::vec;

use crate::model::oplog::OplogIndex;
use crate::model::{
AccountId, ComponentId, FilterComparator, ShardId, StringFilterComparator, TargetWorkerId,
Timestamp, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord,
AccountId, ComponentId, FilterComparator, IdempotencyKey, ShardId, StringFilterComparator,
TargetWorkerId, Timestamp, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus,
WorkerStatusRecord,
};
use bincode::{Decode, Encode};
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -2620,4 +2643,49 @@ mod tests {
}
}
}

#[test]
fn derived_idempotency_key() {
let base1 = IdempotencyKey::fresh();
let base2 = IdempotencyKey::fresh();
let base3 = IdempotencyKey {
value: "base3".to_string(),
};

assert_ne!(base1, base2);

let idx1 = OplogIndex::from_u64(2);
let idx2 = OplogIndex::from_u64(11);

let derived11a = IdempotencyKey::derived(&base1, idx1);
let derived12a = IdempotencyKey::derived(&base1, idx2);
let derived21a = IdempotencyKey::derived(&base2, idx1);
let derived22a = IdempotencyKey::derived(&base2, idx2);

let derived11b = IdempotencyKey::derived(&base1, idx1);
let derived12b = IdempotencyKey::derived(&base1, idx2);
let derived21b = IdempotencyKey::derived(&base2, idx1);
let derived22b = IdempotencyKey::derived(&base2, idx2);

let derived31 = IdempotencyKey::derived(&base3, idx1);
let derived32 = IdempotencyKey::derived(&base3, idx2);

assert_eq!(derived11a, derived11b);
assert_eq!(derived12a, derived12b);
assert_eq!(derived21a, derived21b);
assert_eq!(derived22a, derived22b);

assert_ne!(derived11a, derived12a);
assert_ne!(derived11a, derived21a);
assert_ne!(derived11a, derived22a);
assert_ne!(derived12a, derived21a);
assert_ne!(derived12a, derived22a);
assert_ne!(derived21a, derived22a);

assert_ne!(derived11a, derived31);
assert_ne!(derived21a, derived31);
assert_ne!(derived12a, derived32);
assert_ne!(derived22a, derived32);
assert_ne!(derived31, derived32);
}
}
17 changes: 14 additions & 3 deletions golem-worker-executor-base/src/durable_host/golem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ use crate::preview2::golem::api::host::{
WorkerMetadata,
};
use crate::services::HasWorker;
use crate::workerctx::{StatusManagement, WorkerCtx};
use crate::workerctx::{InvocationManagement, StatusManagement, WorkerCtx};
use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType};
use golem_common::model::regions::OplogRegion;
use golem_common::model::{ComponentId, OwnedWorkerId, PromiseId, ScanCursor, WorkerId};
use golem_common::model::{
ComponentId, IdempotencyKey, OwnedWorkerId, PromiseId, ScanCursor, WorkerId,
};

#[async_trait]
impl<Ctx: WorkerCtx> HostGetWorkers for DurableWorkerCtx<Ctx> {
Expand Down Expand Up @@ -446,13 +448,22 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for DurableWorkerCtx<Ctx> {
async fn generate_idempotency_key(&mut self) -> anyhow::Result<golem::api::host::Uuid> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("golem::api", "generate_idempotency_key");

let current_idempotency_key = self
.get_current_idempotency_key()
.await
.unwrap_or(IdempotencyKey::fresh());
let oplog_index = self.state.current_oplog_index().await;

// NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs
let uuid = Durability::<Ctx, (u64, u64), SerializableError>::custom_wrap(
self,
WrappedFunctionType::WriteRemote,
"golem api::generate_idempotency_key",
|_ctx| {
Box::pin(async move {
let uuid = Uuid::new_v4();
let key = IdempotencyKey::derived(&current_idempotency_key, oplog_index);
let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid
Ok::<Uuid, GolemError>(uuid)
})
},
Expand Down
32 changes: 28 additions & 4 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::services::oplog::OplogOps;
use crate::services::rpc::{RpcDemand, RpcError};
use crate::workerctx::WorkerCtx;
use crate::workerctx::{InvocationManagement, WorkerCtx};
use anyhow::anyhow;
use async_trait::async_trait;
use golem_common::model::oplog::{OplogEntry, WrappedFunctionType};
Expand Down Expand Up @@ -87,13 +87,21 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();

let current_idempotency_key = self
.get_current_idempotency_key()
.await
.unwrap_or(IdempotencyKey::fresh());
let oplog_index = self.state.current_oplog_index().await;

// NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs
let uuid = Durability::<Ctx, (u64, u64), SerializableError>::custom_wrap(
self,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::invoke-and-await idempotency key",
|_ctx| {
Box::pin(async move {
let uuid = Uuid::new_v4();
let key = IdempotencyKey::derived(&current_idempotency_key, oplog_index);
let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid
Ok::<Uuid, GolemError>(uuid)
})
},
Expand Down Expand Up @@ -152,13 +160,21 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();

let current_idempotency_key = self
.get_current_idempotency_key()
.await
.unwrap_or(IdempotencyKey::fresh());
let oplog_index = self.state.current_oplog_index().await;

// NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs
let uuid = Durability::<Ctx, (u64, u64), SerializableError>::custom_wrap(
self,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::invoke-and-await idempotency key",
|_ctx| {
Box::pin(async move {
let uuid = Uuid::new_v4();
let key = IdempotencyKey::derived(&current_idempotency_key, oplog_index);
let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid
Ok::<Uuid, GolemError>(uuid)
})
},
Expand Down Expand Up @@ -221,13 +237,21 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();

let current_idempotency_key = self
.get_current_idempotency_key()
.await
.unwrap_or(IdempotencyKey::fresh());
let oplog_index = self.state.current_oplog_index().await;

// NOTE: Now that IdempotencyKey::derived is used, we no longer need to persist this, but we do to avoid breaking existing oplogs
let uuid = Durability::<Ctx, (u64, u64), SerializableError>::custom_wrap(
self,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::invoke-and-await idempotency key",
|_ctx| {
Box::pin(async move {
let uuid = Uuid::new_v4();
let key = IdempotencyKey::derived(&current_idempotency_key, oplog_index);
let uuid = Uuid::parse_str(&key.value.to_string()).unwrap(); // this is guaranteed to be a uuid
Ok::<Uuid, GolemError>(uuid)
})
},
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/src/services/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl<Ctx: WorkerCtx> Rpc for DirectWorkerInvocationRpc<Ctx> {
self_args: &[String],
self_env: &[(String, String)],
) -> Result<(), RpcError> {
let idempotency_key = idempotency_key.unwrap_or(IdempotencyKey::fresh());
let idempotency_key = idempotency_key.unwrap_or(IdempotencyKey::fresh()); // TODO

if self
.shard_service()
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ async fn get_worker_metadata() {
value: "test-account".to_string()
}
);
check!(metadata2.last_known_status.component_size == 60756);
check!(metadata2.last_known_status.component_size == 60157);
check!(metadata2.last_known_status.total_linear_memory_size == 1245184);
}

Expand Down
102 changes: 102 additions & 0 deletions golem-worker-executor-base/tests/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::common::{start, TestContext};
use assert2::check;
use bytes::Bytes;
use golem_common::model::{IdempotencyKey, TargetWorkerId};
use golem_test_framework::dsl::{
drain_connection, stdout_event_starting_with, stdout_events, worker_error_message,
TestDslUnsafe,
Expand Down Expand Up @@ -720,3 +721,104 @@ async fn golem_rust_infallible_transaction() {
]
);
}

#[tokio::test]
#[tracing::instrument]
async fn idempotency_keys_in_ephemeral_workers() {
let context = TestContext::new();
let executor = start(&context).await.unwrap();

let component_id = executor.store_ephemeral_component("runtime-service").await;

let target_worker_id = TargetWorkerId {
component_id,
worker_name: None,
};

let idempotency_key1 = IdempotencyKey::fresh();
let idempotency_key2 = IdempotencyKey::fresh();

let result11 = executor
.invoke_and_await(
target_worker_id.clone(),
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();
let result21 = executor
.invoke_and_await_with_key(
target_worker_id.clone(),
&idempotency_key1,
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();
let result31 = executor
.invoke_and_await_with_key(
target_worker_id.clone(),
&idempotency_key2,
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();
let result12 = executor
.invoke_and_await(
target_worker_id.clone(),
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();
let result22 = executor
.invoke_and_await_with_key(
target_worker_id.clone(),
&idempotency_key1,
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();
let result32 = executor
.invoke_and_await_with_key(
target_worker_id.clone(),
&idempotency_key2,
"golem:it/api.{generate-idempotency-keys}",
vec![],
)
.await
.unwrap();

drop(executor);

fn returned_keys_are_different(value: &[Value]) -> bool {
if value.len() == 1 {
if let Value::Tuple(items) = &value[0] {
if items.len() == 2 {
items[0] != items[1]
} else {
false
}
} else {
false
}
} else {
false
}
}

check!(returned_keys_are_different(&result11));
check!(returned_keys_are_different(&result21));
check!(returned_keys_are_different(&result31));
check!(returned_keys_are_different(&result12));
check!(returned_keys_are_different(&result22));
check!(returned_keys_are_different(&result32));

check!(result11 != result12); // when not providing idempotency key it should return different keys
check!(result11 != result21);
check!(result11 != result31);
check!(result21 == result22); // same idempotency key should lead to the same result
check!(result31 == result32);
}
Binary file modified test-components/blob-store-service.wasm
Binary file not shown.
Binary file modified test-components/clock-service.wasm
Binary file not shown.
Binary file modified test-components/clocks.wasm
Binary file not shown.
Binary file modified test-components/directories.wasm
Binary file not shown.
Binary file modified test-components/durability-overhead.wasm
Binary file not shown.
Binary file modified test-components/environment-service.wasm
Binary file not shown.
Binary file modified test-components/failing-component.wasm
Binary file not shown.
Binary file modified test-components/file-service.wasm
Binary file not shown.
Binary file modified test-components/file-write-read-delete.wasm
Binary file not shown.
Binary file modified test-components/flags-service.wasm
Binary file not shown.
Binary file modified test-components/golem-rust-tests.wasm
Binary file not shown.
Binary file modified test-components/http-client-2.wasm
Binary file not shown.
Binary file modified test-components/http-client.wasm
Binary file not shown.
Binary file modified test-components/interruption.wasm
Binary file not shown.
Binary file modified test-components/key-value-service.wasm
Binary file not shown.
Binary file modified test-components/networking.wasm
Binary file not shown.
Binary file modified test-components/option-service.wasm
Binary file not shown.
Binary file modified test-components/read-stdin.wasm
Binary file not shown.
Binary file modified test-components/runtime-service.wasm
Binary file not shown.
Loading

0 comments on commit b15e2ee

Please sign in to comment.