From ff525b60c4bb4dd2498c5a89f14529fa512379d8 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 18 Sep 2024 15:03:06 +0200 Subject: [PATCH] Add tests for cancel invocation/get invocation id feature --- .../dev/restate/sdktesting/contracts/Proxy.kt | 3 +- .../restate/sdktesting/contracts/TestUtils.kt | 3 ++ .../sdktesting/tests/CancelInvocation.kt | 48 +++++++++++++++++-- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/dev/restate/sdktesting/contracts/Proxy.kt b/src/main/kotlin/dev/restate/sdktesting/contracts/Proxy.kt index ec9a1c2..56ccf8f 100644 --- a/src/main/kotlin/dev/restate/sdktesting/contracts/Proxy.kt +++ b/src/main/kotlin/dev/restate/sdktesting/contracts/Proxy.kt @@ -40,7 +40,8 @@ interface Proxy { // Bytes are encoded as array of numbers @Handler suspend fun call(context: Context, request: ProxyRequest): ByteArray - @Handler suspend fun oneWayCall(context: Context, request: ProxyRequest) + // Returns the invocation id of the call + @Handler suspend fun oneWayCall(context: Context, request: ProxyRequest): String @Handler suspend fun manyCalls(context: Context, requests: List) } diff --git a/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtils.kt b/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtils.kt index 3c3c40b..9519e67 100644 --- a/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtils.kt +++ b/src/main/kotlin/dev/restate/sdktesting/contracts/TestUtils.kt @@ -75,6 +75,9 @@ interface TestUtilsService { */ @Handler suspend fun countExecutedSideEffects(context: Context, increments: Int): Int + /** Cancel invocation using the context. */ + @Handler suspend fun cancelInvocation(context: Context, invocationId: String) + /** Read an environment variable */ @Handler suspend fun getEnvVariable(context: Context, env: String): String diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/CancelInvocation.kt b/src/main/kotlin/dev/restate/sdktesting/tests/CancelInvocation.kt index 5851fab..a1b2242 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/CancelInvocation.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/CancelInvocation.kt @@ -12,6 +12,7 @@ import dev.restate.admin.api.InvocationApi import dev.restate.admin.client.ApiClient import dev.restate.admin.model.TerminationMode import dev.restate.sdk.client.Client +import dev.restate.sdk.kotlin.KtSerdes import dev.restate.sdktesting.contracts.* import dev.restate.sdktesting.infra.* import java.net.URL @@ -34,13 +35,15 @@ class CancelInvocation { .withServices( CancelTestRunnerDefinitions.SERVICE_NAME, CancelTestBlockingServiceDefinitions.SERVICE_NAME, - AwakeableHolderDefinitions.SERVICE_NAME)) + AwakeableHolderDefinitions.SERVICE_NAME, + ProxyDefinitions.SERVICE_NAME, + TestUtilsServiceDefinitions.SERVICE_NAME)) } } - @ParameterizedTest(name = "cancel blocked invocation on {0}") + @ParameterizedTest(name = "cancel blocked invocation on {0} from Admin API") @EnumSource(value = BlockingOperation::class) - fun cancelInvocation( + fun cancelInvocationFromAdminAPI( blockingOperation: BlockingOperation, @InjectClient ingressClient: Client, @InjectMetaURL metaURL: URL, @@ -69,4 +72,43 @@ class CancelInvocation { // Check that the singleton service is unlocked blockingServiceClient.isUnlocked() } + + @ParameterizedTest(name = "cancel blocked invocation on {0} from Context") + @EnumSource(value = BlockingOperation::class) + fun cancelInvocationFromContext( + blockingOperation: BlockingOperation, + @InjectClient ingressClient: Client, + ) = runTest { + val key = UUID.randomUUID().toString() + val cancelTestClient = CancelTestRunnerClient.fromClient(ingressClient, key) + val blockingServiceClient = CancelTestBlockingServiceClient.fromClient(ingressClient, key) + val proxyClient = ProxyClient.fromClient(ingressClient) + val testUtilsClient = TestUtilsServiceClient.fromClient(ingressClient) + + val id = + proxyClient.oneWayCall( + ProxyRequest( + serviceName = CancelTestRunnerDefinitions.SERVICE_NAME, + virtualObjectKey = key, + handlerName = "startTest", + message = KtSerdes.json().serialize(blockingOperation))) + + val awakeableHolderClient = AwakeableHolderClient.fromClient(ingressClient, "cancel") + + await until { runBlocking { awakeableHolderClient.hasAwakeable() } } + + awakeableHolderClient.unlock("cancel") + + // The termination signal might arrive before the blocking call to the cancel singleton was + // made, so we need to retry. + await.ignoreException(TimeoutCancellationException::class.java).until { + runBlocking { + testUtilsClient.cancelInvocation(id) + withTimeout(1.seconds) { cancelTestClient.verifyTest() } + } + } + + // Check that the singleton service is unlocked + blockingServiceClient.isUnlocked() + } }