Skip to content

Commit

Permalink
Fix workflow API test, adapt to latest changes in SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 21, 2024
1 parent c9f3c76 commit 798304a
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 45 deletions.
3 changes: 2 additions & 1 deletion contracts/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ dependencies {
annotationProcessor(libs.restate.sdk.api.gen)

api(libs.restate.sdk.api)
api(libs.restate.sdk.workflow.api)
api(libs.restate.sdk.jackson)

implementation(libs.jackson.parameter.names)
implementation(libs.jackson.java8)
implementation(libs.jackson.datetime)
}

tasks.withType<JavaCompile> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

package my.restate.e2e.services;

import dev.restate.sdk.SharedWorkflowContext;
import dev.restate.sdk.WorkflowContext;
import dev.restate.sdk.annotation.Shared;
import dev.restate.sdk.annotation.Workflow;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.workflow.WorkflowContext;
import dev.restate.sdk.workflow.WorkflowSharedContext;
import java.util.Optional;

@Workflow
public interface WorkflowAPIBlockAndWait {
Expand All @@ -22,7 +22,8 @@ public interface WorkflowAPIBlockAndWait {
String blockAndWait(WorkflowContext context, String input);

@Shared
void unblock(WorkflowSharedContext context, String output);
void unblock(SharedWorkflowContext context, String output);

StateKey<String> MY_STATE = StateKey.string("my-state");
@Shared
Optional<String> getState(SharedWorkflowContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void main(String[] args) {
case MapObjectDefinitions.SERVICE_NAME:
restateHttpEndpointBuilder.bind(new MapObjectImpl());
break;
case WorkflowAPIBlockAndWaitClient.WORKFLOW_NAME:
case WorkflowAPIBlockAndWaitDefinitions.SERVICE_NAME:
restateHttpEndpointBuilder.bind(new WorkflowAPIBlockAndWaitImpl());
break;
case CoordinatorDefinitions.SERVICE_NAME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@

package my.restate.e2e.services;

import dev.restate.sdk.SharedWorkflowContext;
import dev.restate.sdk.WorkflowContext;
import dev.restate.sdk.common.DurablePromiseKey;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.workflow.DurablePromiseKey;
import dev.restate.sdk.workflow.WorkflowContext;
import dev.restate.sdk.workflow.WorkflowSharedContext;
import java.util.Optional;

public class WorkflowAPIBlockAndWaitImpl implements WorkflowAPIBlockAndWait {

private static final DurablePromiseKey<String> MY_DURABLE_PROMISE =
DurablePromiseKey.string("durable-promise");
private static final StateKey<String> MY_STATE = StateKey.string("my-state");

@Override
public String blockAndWait(WorkflowContext context, String input) {
Expand All @@ -37,7 +40,12 @@ public String blockAndWait(WorkflowContext context, String input) {
}

@Override
public void unblock(WorkflowSharedContext context, String output) {
public void unblock(SharedWorkflowContext context, String output) {
context.durablePromiseHandle(MY_DURABLE_PROMISE).resolve(output);
}

@Override
public Optional<String> getState(SharedWorkflowContext context) {
return context.get(MY_STATE);
}
}
6 changes: 4 additions & 2 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ include(

dependencyResolutionManagement {
repositories {
mavenLocal()
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
Expand All @@ -40,7 +39,6 @@ dependencyResolutionManagement {
.versionRef("restate")
library("restate-sdk-jackson", "dev.restate", "sdk-serde-jackson").versionRef("restate")
library("restate-sdk-http-vertx", "dev.restate", "sdk-http-vertx").versionRef("restate")
library("restate-sdk-workflow-api", "dev.restate", "sdk-workflow-api").versionRef("restate")
library("restate-sdk-request-identity", "dev.restate", "sdk-request-identity")
.versionRef("restate")

Expand All @@ -58,6 +56,10 @@ dependencyResolutionManagement {
"com.fasterxml.jackson.module",
"jackson-module-parameter-names")
.versionRef("jackson")
library("jackson-java8", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8")
.versionRef("jackson")
library("jackson-datetime", "com.fasterxml.jackson.datatype", "jackson-datatype-jsr310")
.versionRef("jackson")
library("jackson-kotlin", "com.fasterxml.jackson.module", "jackson-module-kotlin")
.versionRef("jackson")
library("jackson-toml", "com.fasterxml.jackson.dataformat", "jackson-dataformat-toml")
Expand Down
3 changes: 2 additions & 1 deletion tests/src/test/kotlin/dev/restate/e2e/Containers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ object Containers {
.build()

val JAVA_WORKFLOW_SERVICE_SPEC =
javaServicesContainer("java-workflow", WorkflowAPIBlockAndWaitClient.WORKFLOW_NAME).build()
javaServicesContainer("java-workflow", WorkflowAPIBlockAndWaitDefinitions.SERVICE_NAME)
.build()

// -- Node containers

Expand Down
30 changes: 6 additions & 24 deletions tests/src/test/kotlin/dev/restate/e2e/WorkflowAPITest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import dev.restate.e2e.utils.InjectIngressURL
import dev.restate.e2e.utils.RestateDeployer
import dev.restate.e2e.utils.RestateDeployerExtension
import dev.restate.sdk.client.IngressClient
import dev.restate.sdk.workflow.WorkflowExecutionState
import java.net.URL
import java.util.*
import my.restate.e2e.services.WorkflowAPIBlockAndWait
import my.restate.e2e.services.WorkflowAPIBlockAndWaitClient
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
Expand Down Expand Up @@ -51,36 +49,20 @@ class JavaWorkflowAPITest {
fun setAndResolve(@InjectIngressClient ingressClient: IngressClient) {
val client =
WorkflowAPIBlockAndWaitClient.fromIngress(ingressClient, UUID.randomUUID().toString())
assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.STARTED)
val handle = client.submit("Francesco")

// Wait state is set
await untilCallTo
{
client.getState(WorkflowAPIBlockAndWait.MY_STATE)
} matches
{
it!!.isPresent
}
await untilCallTo { client.getState() } matches { it!!.isPresent }

client.unblock("Till")

await untilCallTo { client.output } matches { it!!.orElse("") == "Till" }
assertThat(handle.attach()).isEqualTo("Till")

// Can call get output again
assertThat(client.output).get().isEqualTo("Till")
assertThat(handle.output).isEqualTo("Till")

// Re-submit returns completed
assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.ALREADY_COMPLETED)
}

@Test
@DisplayName("Workflow cannot be submitted more than once")
@Execution(ExecutionMode.CONCURRENT)
fun manySubmit(@InjectIngressClient ingressClient: IngressClient) {
val client =
WorkflowAPIBlockAndWaitClient.fromIngress(ingressClient, UUID.randomUUID().toString())
assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.STARTED)
assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.ALREADY_STARTED)
// Re-submit should have no effect
assertThat(client.submit("Francesco").output).isEqualTo("Till")
}
}

Expand Down
11 changes: 4 additions & 7 deletions tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,13 @@ class IngressTest {
echoClient
.send()
.blockThenEcho(awakeableKey, CallRequestOptions().withIdempotency(myIdempotencyId))
val invocationHandle = ingressClient.invocationHandle(invocationId, CoreSerdes.JSON_STRING)

// Attach to request
val blockedFut =
ingressClient.invocationHandle(invocationId).attachAsync(CoreSerdes.JSON_STRING)
val blockedFut = invocationHandle.attachAsync()

// Get output throws exception
assertThatThrownBy {
ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING)
}
assertThatThrownBy { invocationHandle.getOutput() }
.asInstanceOf(type(IngressException::class.java))
.returns(470, IngressException::getStatusCode)

Expand All @@ -193,8 +191,7 @@ class IngressTest {
assertThat(blockedFut.get()).isEqualTo(response)

// Invoke get output
assertThat(ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING))
.isEqualTo(response)
assertThat(invocationHandle.getOutput()).isEqualTo(response)
}

@Test
Expand Down

0 comments on commit 798304a

Please sign in to comment.