Skip to content

Commit

Permalink
Add test for attach/getOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 13, 2024
1 parent 2890d37 commit ee59ea7
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 6 deletions.
12 changes: 12 additions & 0 deletions contracts/src/main/java/my/restate/e2e/services/Echo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package my.restate.e2e.services;

import dev.restate.sdk.Context;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.Service;

@Service(name = "Echo")
public interface Echo {

@Handler
String blockThenEcho(Context ctx, String awakeableKey);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package my.restate.e2e.services;

import dev.restate.sdk.Context;
import dev.restate.sdk.common.CoreSerdes;

public class EchoImpl implements Echo {
@Override
public String blockThenEcho(Context ctx, String awakeableKey) {
var a = ctx.awakeable(CoreSerdes.JSON_STRING);
AwakeableHolderClient.fromContext(ctx, awakeableKey).hold(a.id());
return a.await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public static void main(String[] args) {
case HeadersPassThroughTestDefinitions.SERVICE_NAME:
restateHttpEndpointBuilder.bind(new HeadersPassThroughTestImpl());
break;
case EchoDefinitions.SERVICE_NAME:
restateHttpEndpointBuilder.bind(new EchoImpl());
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ include(

dependencyResolutionManagement {
repositories {
mavenLocal()
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
Expand Down
62 changes: 56 additions & 6 deletions tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ import dev.restate.e2e.utils.InjectIngressClient
import dev.restate.e2e.utils.InjectMetaURL
import dev.restate.e2e.utils.RestateDeployer
import dev.restate.e2e.utils.RestateDeployerExtension
import dev.restate.sdk.client.CallRequestOptions
import dev.restate.sdk.client.IngressClient
import dev.restate.sdk.client.RequestOptions
import dev.restate.sdk.client.IngressException
import dev.restate.sdk.common.CoreSerdes
import java.net.URL
import java.util.*
import java.util.concurrent.TimeUnit
import my.restate.e2e.services.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.InstanceOfAssertFactories.type
import org.awaitility.kotlin.await
import org.awaitility.kotlin.until
import org.awaitility.kotlin.untilAsserted
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
Expand All @@ -45,7 +50,9 @@ class IngressTest {
"java-counter",
CounterDefinitions.SERVICE_NAME,
ProxyCounterDefinitions.SERVICE_NAME,
HeadersPassThroughTestDefinitions.SERVICE_NAME))
HeadersPassThroughTestDefinitions.SERVICE_NAME,
AwakeableHolderDefinitions.SERVICE_NAME,
EchoDefinitions.SERVICE_NAME))
.build())
}

Expand All @@ -64,7 +71,7 @@ class IngressTest {

val counterRandomName = UUID.randomUUID().toString()
val myIdempotencyId = UUID.randomUUID().toString()
val requestOptions = RequestOptions().withIdempotency(myIdempotencyId)
val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId)

val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName)

Expand Down Expand Up @@ -100,7 +107,7 @@ class IngressTest {
fun idempotentInvokeService(@InjectIngressClient ingressClient: IngressClient) {
val counterRandomName = UUID.randomUUID().toString()
val myIdempotencyId = UUID.randomUUID().toString()
val requestOptions = RequestOptions().withIdempotency(myIdempotencyId)
val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId)

val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName)
val proxyCounterClient = ProxyCounterClient.fromIngress(ingressClient)
Expand All @@ -127,7 +134,7 @@ class IngressTest {
fun idempotentInvokeSend(@InjectIngressClient ingressClient: IngressClient) {
val counterRandomName = UUID.randomUUID().toString()
val myIdempotencyId = UUID.randomUUID().toString()
val requestOptions = RequestOptions().withIdempotency(myIdempotencyId)
val requestOptions = CallRequestOptions().withIdempotency(myIdempotencyId)

val counterClient = CounterClient.fromIngress(ingressClient, counterRandomName)

Expand All @@ -147,6 +154,49 @@ class IngressTest {
.returns(4, CounterUpdateResponse::getNewValue)
}

@Test
@Execution(ExecutionMode.CONCURRENT)
@Timeout(value = 15, unit = TimeUnit.SECONDS)
@DisplayName("Idempotent invocation to a virtual object using send")
fun idempotentSendThenAttach(@InjectIngressClient ingressClient: IngressClient) {
val awakeableKey = UUID.randomUUID().toString()
val myIdempotencyId = UUID.randomUUID().toString()
val response = "response"

// Send request
val echoClient = EchoClient.fromIngress(ingressClient)
val invocationId =
echoClient
.send()
.blockThenEcho(awakeableKey, CallRequestOptions().withIdempotency(myIdempotencyId))

// Attach to request
val blockedFut =
ingressClient.invocationHandle(invocationId).attachAsync(CoreSerdes.JSON_STRING)

// Get output throws exception
assertThatThrownBy {
ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING)
}
.asInstanceOf(type(IngressException::class.java))
.returns(470, IngressException::getStatusCode)

// Blocked fut should still be blocked
assertThat(blockedFut).isNotDone

// Unblock
val awakeableHolderClient = AwakeableHolderClient.fromIngress(ingressClient, awakeableKey)
await until { awakeableHolderClient.hasAwakeable() }
awakeableHolderClient.unlock(response)

// Attach should be completed
assertThat(blockedFut.get()).isEqualTo(response)

// Invoke get output
assertThat(ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING))
.isEqualTo(response)
}

@Test
@Execution(ExecutionMode.CONCURRENT)
@Timeout(value = 15, unit = TimeUnit.SECONDS)
Expand All @@ -156,7 +206,7 @@ class IngressTest {

assertThat(
HeadersPassThroughTestClient.fromIngress(ingressClient)
.echoHeaders(RequestOptions().withHeader(headerName, headerValue)))
.echoHeaders(CallRequestOptions().withHeader(headerName, headerValue)))
.containsEntry(headerName, headerValue)
}
}

0 comments on commit ee59ea7

Please sign in to comment.