diff --git a/contracts/build.gradle.kts b/contracts/build.gradle.kts index d002daa0..c950fdad 100644 --- a/contracts/build.gradle.kts +++ b/contracts/build.gradle.kts @@ -13,10 +13,11 @@ dependencies { annotationProcessor(libs.restate.sdk.api.gen) api(libs.restate.sdk.api) - api(libs.restate.sdk.workflow.api) api(libs.restate.sdk.jackson) implementation(libs.jackson.parameter.names) + implementation(libs.jackson.java8) + implementation(libs.jackson.datetime) } tasks.withType { diff --git a/contracts/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWait.java b/contracts/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWait.java index bc093e53..b3a2b8a9 100644 --- a/contracts/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWait.java +++ b/contracts/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWait.java @@ -9,11 +9,11 @@ package my.restate.e2e.services; +import dev.restate.sdk.SharedWorkflowContext; +import dev.restate.sdk.WorkflowContext; import dev.restate.sdk.annotation.Shared; import dev.restate.sdk.annotation.Workflow; -import dev.restate.sdk.common.StateKey; -import dev.restate.sdk.workflow.WorkflowContext; -import dev.restate.sdk.workflow.WorkflowSharedContext; +import java.util.Optional; @Workflow public interface WorkflowAPIBlockAndWait { @@ -22,7 +22,8 @@ public interface WorkflowAPIBlockAndWait { String blockAndWait(WorkflowContext context, String input); @Shared - void unblock(WorkflowSharedContext context, String output); + void unblock(SharedWorkflowContext context, String output); - StateKey MY_STATE = StateKey.string("my-state"); + @Shared + Optional getState(SharedWorkflowContext context); } diff --git a/services/java-services/src/main/java/my/restate/e2e/services/Main.java b/services/java-services/src/main/java/my/restate/e2e/services/Main.java index c9681b81..415206fc 100644 --- a/services/java-services/src/main/java/my/restate/e2e/services/Main.java +++ b/services/java-services/src/main/java/my/restate/e2e/services/Main.java @@ -61,7 +61,7 @@ public static void main(String[] args) { case MapObjectDefinitions.SERVICE_NAME: restateHttpEndpointBuilder.bind(new MapObjectImpl()); break; - case WorkflowAPIBlockAndWaitClient.WORKFLOW_NAME: + case WorkflowAPIBlockAndWaitDefinitions.SERVICE_NAME: restateHttpEndpointBuilder.bind(new WorkflowAPIBlockAndWaitImpl()); break; case CoordinatorDefinitions.SERVICE_NAME: diff --git a/services/java-services/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWaitImpl.java b/services/java-services/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWaitImpl.java index afc57c68..999e6d20 100644 --- a/services/java-services/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWaitImpl.java +++ b/services/java-services/src/main/java/my/restate/e2e/services/WorkflowAPIBlockAndWaitImpl.java @@ -9,15 +9,18 @@ package my.restate.e2e.services; +import dev.restate.sdk.SharedWorkflowContext; +import dev.restate.sdk.WorkflowContext; +import dev.restate.sdk.common.DurablePromiseKey; +import dev.restate.sdk.common.StateKey; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.workflow.DurablePromiseKey; -import dev.restate.sdk.workflow.WorkflowContext; -import dev.restate.sdk.workflow.WorkflowSharedContext; +import java.util.Optional; public class WorkflowAPIBlockAndWaitImpl implements WorkflowAPIBlockAndWait { private static final DurablePromiseKey MY_DURABLE_PROMISE = DurablePromiseKey.string("durable-promise"); + private static final StateKey MY_STATE = StateKey.string("my-state"); @Override public String blockAndWait(WorkflowContext context, String input) { @@ -37,7 +40,12 @@ public String blockAndWait(WorkflowContext context, String input) { } @Override - public void unblock(WorkflowSharedContext context, String output) { + public void unblock(SharedWorkflowContext context, String output) { context.durablePromiseHandle(MY_DURABLE_PROMISE).resolve(output); } + + @Override + public Optional getState(SharedWorkflowContext context) { + return context.get(MY_STATE); + } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 1a6616e4..5d482564 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -23,7 +23,6 @@ include( dependencyResolutionManagement { repositories { - mavenLocal() mavenCentral() // OSSRH Snapshots repo maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") } @@ -40,7 +39,6 @@ dependencyResolutionManagement { .versionRef("restate") library("restate-sdk-jackson", "dev.restate", "sdk-serde-jackson").versionRef("restate") library("restate-sdk-http-vertx", "dev.restate", "sdk-http-vertx").versionRef("restate") - library("restate-sdk-workflow-api", "dev.restate", "sdk-workflow-api").versionRef("restate") library("restate-sdk-request-identity", "dev.restate", "sdk-request-identity") .versionRef("restate") @@ -58,6 +56,10 @@ dependencyResolutionManagement { "com.fasterxml.jackson.module", "jackson-module-parameter-names") .versionRef("jackson") + library("jackson-java8", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8") + .versionRef("jackson") + library("jackson-datetime", "com.fasterxml.jackson.datatype", "jackson-datatype-jsr310") + .versionRef("jackson") library("jackson-kotlin", "com.fasterxml.jackson.module", "jackson-module-kotlin") .versionRef("jackson") library("jackson-toml", "com.fasterxml.jackson.dataformat", "jackson-dataformat-toml") diff --git a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt index 2bc49f2b..dcc7bbea 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt @@ -62,7 +62,8 @@ object Containers { .build() val JAVA_WORKFLOW_SERVICE_SPEC = - javaServicesContainer("java-workflow", WorkflowAPIBlockAndWaitClient.WORKFLOW_NAME).build() + javaServicesContainer("java-workflow", WorkflowAPIBlockAndWaitDefinitions.SERVICE_NAME) + .build() // -- Node containers diff --git a/tests/src/test/kotlin/dev/restate/e2e/WorkflowAPITest.kt b/tests/src/test/kotlin/dev/restate/e2e/WorkflowAPITest.kt index eaf70bae..048888f4 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/WorkflowAPITest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/WorkflowAPITest.kt @@ -16,10 +16,8 @@ import dev.restate.e2e.utils.InjectIngressURL import dev.restate.e2e.utils.RestateDeployer import dev.restate.e2e.utils.RestateDeployerExtension import dev.restate.sdk.client.IngressClient -import dev.restate.sdk.workflow.WorkflowExecutionState import java.net.URL import java.util.* -import my.restate.e2e.services.WorkflowAPIBlockAndWait import my.restate.e2e.services.WorkflowAPIBlockAndWaitClient import org.assertj.core.api.Assertions.assertThat import org.awaitility.kotlin.await @@ -51,36 +49,20 @@ class JavaWorkflowAPITest { fun setAndResolve(@InjectIngressClient ingressClient: IngressClient) { val client = WorkflowAPIBlockAndWaitClient.fromIngress(ingressClient, UUID.randomUUID().toString()) - assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.STARTED) + val handle = client.submit("Francesco") // Wait state is set - await untilCallTo - { - client.getState(WorkflowAPIBlockAndWait.MY_STATE) - } matches - { - it!!.isPresent - } + await untilCallTo { client.getState() } matches { it!!.isPresent } client.unblock("Till") - await untilCallTo { client.output } matches { it!!.orElse("") == "Till" } + assertThat(handle.attach()).isEqualTo("Till") // Can call get output again - assertThat(client.output).get().isEqualTo("Till") + assertThat(handle.output).isEqualTo("Till") - // Re-submit returns completed - assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.ALREADY_COMPLETED) - } - - @Test - @DisplayName("Workflow cannot be submitted more than once") - @Execution(ExecutionMode.CONCURRENT) - fun manySubmit(@InjectIngressClient ingressClient: IngressClient) { - val client = - WorkflowAPIBlockAndWaitClient.fromIngress(ingressClient, UUID.randomUUID().toString()) - assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.STARTED) - assertThat(client.submit("Francesco")).isEqualTo(WorkflowExecutionState.ALREADY_STARTED) + // Re-submit should have no effect + assertThat(client.submit("Francesco").output).isEqualTo("Till") } } diff --git a/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt b/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt index 8173c817..8862bca0 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/runtime/IngressTest.kt @@ -169,15 +169,13 @@ class IngressTest { echoClient .send() .blockThenEcho(awakeableKey, CallRequestOptions().withIdempotency(myIdempotencyId)) + val invocationHandle = ingressClient.invocationHandle(invocationId, CoreSerdes.JSON_STRING) // Attach to request - val blockedFut = - ingressClient.invocationHandle(invocationId).attachAsync(CoreSerdes.JSON_STRING) + val blockedFut = invocationHandle.attachAsync() // Get output throws exception - assertThatThrownBy { - ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING) - } + assertThatThrownBy { invocationHandle.getOutput() } .asInstanceOf(type(IngressException::class.java)) .returns(470, IngressException::getStatusCode) @@ -193,8 +191,7 @@ class IngressTest { assertThat(blockedFut.get()).isEqualTo(response) // Invoke get output - assertThat(ingressClient.invocationHandle(invocationId).getOutput(CoreSerdes.JSON_STRING)) - .isEqualTo(response) + assertThat(invocationHandle.getOutput()).isEqualTo(response) } @Test