From ee59ea75ac3e5feae84e432db42edb4dba480a1d Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 13 May 2024 15:50:04 +0200 Subject: [PATCH] Add test for attach/getOutput --- .../java/my/restate/e2e/services/Echo.java | 12 ++++ .../my/restate/e2e/services/EchoImpl.java | 13 ++++ .../java/my/restate/e2e/services/Main.java | 3 + settings.gradle.kts | 1 + .../dev/restate/e2e/runtime/IngressTest.kt | 62 +++++++++++++++++-- 5 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 contracts/src/main/java/my/restate/e2e/services/Echo.java create mode 100644 services/java-services/src/main/java/my/restate/e2e/services/EchoImpl.java diff --git a/contracts/src/main/java/my/restate/e2e/services/Echo.java b/contracts/src/main/java/my/restate/e2e/services/Echo.java new file mode 100644 index 00000000..5e3c3975 --- /dev/null +++ b/contracts/src/main/java/my/restate/e2e/services/Echo.java @@ -0,0 +1,12 @@ +package my.restate.e2e.services; + +import dev.restate.sdk.Context; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Service; + +@Service(name = "Echo") +public interface Echo { + + @Handler + String blockThenEcho(Context ctx, String awakeableKey); +} diff --git a/services/java-services/src/main/java/my/restate/e2e/services/EchoImpl.java b/services/java-services/src/main/java/my/restate/e2e/services/EchoImpl.java new file mode 100644 index 00000000..d454e58c --- /dev/null +++ b/services/java-services/src/main/java/my/restate/e2e/services/EchoImpl.java @@ -0,0 +1,13 @@ +package my.restate.e2e.services; + +import dev.restate.sdk.Context; +import dev.restate.sdk.common.CoreSerdes; + +public class EchoImpl implements Echo { + @Override + public String blockThenEcho(Context ctx, String awakeableKey) { + var a = ctx.awakeable(CoreSerdes.JSON_STRING); + AwakeableHolderClient.fromContext(ctx, awakeableKey).hold(a.id()); + return a.await(); + } +} diff --git a/services/java-services/src/main/java/my/restate/e2e/services/Main.java b/services/java-services/src/main/java/my/restate/e2e/services/Main.java index 8e7136a7..c9681b81 100644 --- a/services/java-services/src/main/java/my/restate/e2e/services/Main.java +++ b/services/java-services/src/main/java/my/restate/e2e/services/Main.java @@ -85,6 +85,9 @@ public static void main(String[] args) { case HeadersPassThroughTestDefinitions.SERVICE_NAME: restateHttpEndpointBuilder.bind(new HeadersPassThroughTestImpl()); break; + case EchoDefinitions.SERVICE_NAME: + restateHttpEndpointBuilder.bind(new EchoImpl()); + break; } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 2fa2b91c..1a6616e4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -23,6 +23,7 @@ include( dependencyResolutionManagement { repositories { + mavenLocal() mavenCentral() // OSSRH Snapshots repo maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") } diff --git a/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt b/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt index f2511829..29ea2927 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt @@ -17,14 +17,19 @@ import dev.restate.e2e.utils.InjectIngressClient import dev.restate.e2e.utils.InjectMetaURL import dev.restate.e2e.utils.RestateDeployer import dev.restate.e2e.utils.RestateDeployerExtension +import dev.restate.sdk.client.CallRequestOptions import dev.restate.sdk.client.IngressClient -import dev.restate.sdk.client.RequestOptions +import dev.restate.sdk.client.IngressException +import dev.restate.sdk.common.CoreSerdes import java.net.URL import java.util.* import java.util.concurrent.TimeUnit import my.restate.e2e.services.* import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.assertj.core.api.InstanceOfAssertFactories.type import org.awaitility.kotlin.await +import org.awaitility.kotlin.until import org.awaitility.kotlin.untilAsserted import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -45,7 +50,9 @@ class IngressTest { "java-counter", CounterDefinitions.SERVICE_NAME, ProxyCounterDefinitions.SERVICE_NAME, - HeadersPassThroughTestDefinitions.SERVICE_NAME)) + HeadersPassThroughTestDefinitions.SERVICE_NAME, + AwakeableHolderDefinitions.SERVICE_NAME, + EchoDefinitions.SERVICE_NAME)) .build()) } @@ -64,7 +71,7 @@ class IngressTest { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val requestOptions = RequestOptions().withIdempotency(myIdempotencyId) + val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId) val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName) @@ -100,7 +107,7 @@ class IngressTest { fun idempotentInvokeService(@InjectIngressClient ingressClient: IngressClient) { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val requestOptions = RequestOptions().withIdempotency(myIdempotencyId) + val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId) val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName) val proxyCounterClient = ProxyCounterClient.fromIngress(ingressClient) @@ -127,7 +134,7 @@ class IngressTest { fun idempotentInvokeSend(@InjectIngressClient ingressClient: IngressClient) { val counterRandomName = UUID.randomUUID().toString() val myIdempotencyId = UUID.randomUUID().toString() - val requestOptions = RequestOptions().withIdempotency(myIdempotencyId) + val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId) val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName) @@ -147,6 +154,49 @@ class IngressTest { .returns(4, CounterUpdateResponse::getNewValue) } + @Test + @Execution(ExecutionMode.CONCURRENT) + @Timeout(value = 15, unit = TimeUnit.SECONDS) + @DisplayName("Idempotent invocation to a virtual object using send") + fun idempotentSendThenAttach(@InjectIngressClient ingressClient: IngressClient) { + val awakeableKey = UUID.randomUUID().toString() + val myIdempotencyId = UUID.randomUUID().toString() + val response = "response" + + // Send request + val echoClient = EchoClient.fromIngress(ingressClient) + val invocationId = + echoClient + .send() + .blockThenEcho(awakeableKey, CallRequestOptions().withIdempotency(myIdempotencyId)) + + // Attach to request + val blockedFut = + ingressClient.invocationHandle(invocationId).attachAsync(CoreSerdes.JSON_STRING) + + // Get output throws exception + assertThatThrownBy { + ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING) + } + .asInstanceOf(type(IngressException::class.java)) + .returns(470, IngressException::getStatusCode) + + // Blocked fut should still be blocked + assertThat(blockedFut).isNotDone + + // Unblock + val awakeableHolderClient = AwakeableHolderClient.fromIngress(ingressClient, awakeableKey) + await until { awakeableHolderClient.hasAwakeable() } + awakeableHolderClient.unlock(response) + + // Attach should be completed + assertThat(blockedFut.get()).isEqualTo(response) + + // Invoke get output + assertThat(ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING)) + .isEqualTo(response) + } + @Test @Execution(ExecutionMode.CONCURRENT) @Timeout(value = 15, unit = TimeUnit.SECONDS) @@ -156,7 +206,7 @@ class IngressTest { assertThat( HeadersPassThroughTestClient.fromIngress(ingressClient) - .echoHeaders(RequestOptions().withHeader(headerName, headerValue))) + .echoHeaders(CallRequestOptions().withHeader(headerName, headerValue))) .containsEntry(headerName, headerValue) } }