diff --git a/functions/collections/impl/src/main/java/dev/restate/e2e/functions/collections/Main.java b/functions/collections/impl/src/main/java/dev/restate/e2e/functions/collections/Main.java index 1ebe8348..16596c66 100644 --- a/functions/collections/impl/src/main/java/dev/restate/e2e/functions/collections/Main.java +++ b/functions/collections/impl/src/main/java/dev/restate/e2e/functions/collections/Main.java @@ -1,7 +1,6 @@ package dev.restate.e2e.functions.collections; import dev.restate.e2e.functions.utils.ServiceRunner; - import java.io.IOException; public class Main { diff --git a/functions/coordinator/contract/src/main/proto/coordinator.proto b/functions/coordinator/contract/src/main/proto/coordinator.proto index 818a3c9b..e20ca4ff 100644 --- a/functions/coordinator/contract/src/main/proto/coordinator.proto +++ b/functions/coordinator/contract/src/main/proto/coordinator.proto @@ -17,6 +17,7 @@ service Coordinator { rpc Complex (ComplexRequest) returns (ComplexResponse); rpc Timeout (Duration) returns (TimeoutResponse); rpc InvokeSequentially(InvokeSequentiallyRequest) returns (google.protobuf.Empty); + rpc InvokeSideEffects(InvokeSideEffectsRequest) returns (InvokeSideEffectsResult); } message Duration { @@ -43,3 +44,11 @@ message TimeoutResponse { message InvokeSequentiallyRequest { repeated bool execute_as_background_call = 1; } + +message InvokeSideEffectsRequest { + bool last_is_callback = 1; +} + +message InvokeSideEffectsResult { + int32 invoked_times = 1; +} \ No newline at end of file diff --git a/functions/coordinator/impl/src/main/java/dev/restate/e2e/functions/coordinator/CoordinatorService.java b/functions/coordinator/impl/src/main/java/dev/restate/e2e/functions/coordinator/CoordinatorService.java index 5ac37bee..f7d40d13 100644 --- a/functions/coordinator/impl/src/main/java/dev/restate/e2e/functions/coordinator/CoordinatorService.java +++ b/functions/coordinator/impl/src/main/java/dev/restate/e2e/functions/coordinator/CoordinatorService.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -135,4 +136,26 @@ public void invokeSequentially( responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } + + @Override + public void invokeSideEffects( + InvokeSideEffectsRequest request, StreamObserver responseObserver) { + RestateContext ctx = RestateContext.current(); + + AtomicInteger invokedSideEffects = new AtomicInteger(0); + + ctx.sideEffect(() -> invokedSideEffects.incrementAndGet()); + ctx.sideEffect(() -> invokedSideEffects.incrementAndGet()); + if (request.getLastIsCallback()) { + ctx.callback(Void.TYPE, ignored -> invokedSideEffects.incrementAndGet()); + } else { + ctx.sideEffect(() -> invokedSideEffects.incrementAndGet()); + } + + responseObserver.onNext( + InvokeSideEffectsResult.newBuilder() + .setInvokedTimes(invokedSideEffects.intValue()) + .build()); + responseObserver.onCompleted(); + } } diff --git a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt index e252d678..7c55f961 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/Containers.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/Containers.kt @@ -14,10 +14,8 @@ import org.testcontainers.containers.GenericContainer object Containers { val COLLECTIONS_FUNCTION_SPEC = - FunctionSpec.builder( - "restatedev/e2e-collections", - ListServiceGrpc.getServiceDescriptor()) - .build() + FunctionSpec.builder("restatedev/e2e-collections", ListServiceGrpc.getServiceDescriptor()) + .build() val COUNTER_FUNCTION_SPEC = FunctionSpec.builder( diff --git a/tests/src/test/kotlin/dev/restate/e2e/OrderingTest.kt b/tests/src/test/kotlin/dev/restate/e2e/OrderingTest.kt index 75962cfd..f9636729 100644 --- a/tests/src/test/kotlin/dev/restate/e2e/OrderingTest.kt +++ b/tests/src/test/kotlin/dev/restate/e2e/OrderingTest.kt @@ -7,41 +7,45 @@ import dev.restate.e2e.functions.coordinator.InvokeSequentiallyRequest import dev.restate.e2e.utils.InjectBlockingStub import dev.restate.e2e.utils.RestateDeployer import dev.restate.e2e.utils.RestateDeployerExtension +import java.util.stream.Stream import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.extension.RegisterExtension import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource -import java.util.stream.Stream class OrderingTest { companion object { @RegisterExtension val deployerExt: RestateDeployerExtension = RestateDeployerExtension( - RestateDeployer.Builder().withFunction(Containers.COORDINATOR_FUNCTION_SPEC).withFunction(Containers.COLLECTIONS_FUNCTION_SPEC).build()) + RestateDeployer.Builder() + .withFunction(Containers.COORDINATOR_FUNCTION_SPEC) + .withFunction(Containers.COLLECTIONS_FUNCTION_SPEC) + .build()) @JvmStatic fun ordering(): Stream { return Stream.of( - Arguments.of(booleanArrayOf(true, false, true)), - Arguments.of(booleanArrayOf(false, true, false)) - ) + Arguments.of(booleanArrayOf(true, false, true)), + Arguments.of(booleanArrayOf(false, true, false))) } } @ParameterizedTest @MethodSource - fun ordering(ordering: BooleanArray, @InjectBlockingStub coordinatorClient: CoordinatorBlockingStub, @InjectBlockingStub listClient: ListServiceBlockingStub) { + fun ordering( + ordering: BooleanArray, + @InjectBlockingStub coordinatorClient: CoordinatorBlockingStub, + @InjectBlockingStub listClient: ListServiceBlockingStub + ) { coordinatorClient.invokeSequentially( - InvokeSequentiallyRequest.newBuilder() - .addAllExecuteAsBackgroundCall(ordering.asIterable()) - .build() - ) + InvokeSequentiallyRequest.newBuilder() + .addAllExecuteAsBackgroundCall(ordering.asIterable()) + .build()) val listClientRequest = Request.newBuilder().setListName("invokeSequentially").build() - assertThat(listClient.clear(listClientRequest).valuesList) - .containsExactly("0", "1", "2") + assertThat(listClient.clear(listClientRequest).valuesList).containsExactly("0", "1", "2") } } diff --git a/tests/src/test/kotlin/dev/restate/e2e/SideEffectTest.kt b/tests/src/test/kotlin/dev/restate/e2e/SideEffectTest.kt new file mode 100644 index 00000000..d8d1ac84 --- /dev/null +++ b/tests/src/test/kotlin/dev/restate/e2e/SideEffectTest.kt @@ -0,0 +1,37 @@ +package dev.restate.e2e + +import dev.restate.e2e.functions.coordinator.CoordinatorGrpc.CoordinatorBlockingStub +import dev.restate.e2e.functions.coordinator.InvokeSideEffectsRequest +import dev.restate.e2e.utils.InjectBlockingStub +import dev.restate.e2e.utils.RestateDeployer +import dev.restate.e2e.utils.RestateDeployerExtension +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension + +class SideEffectTest { + companion object { + @RegisterExtension + val deployerExt: RestateDeployerExtension = + RestateDeployerExtension( + RestateDeployer.Builder().withFunction(Containers.COORDINATOR_FUNCTION_SPEC).build()) + } + + @Test + fun sideEffectFlush(@InjectBlockingStub coordinatorClient: CoordinatorBlockingStub) { + assertThat( + coordinatorClient.invokeSideEffects( + InvokeSideEffectsRequest.newBuilder().setLastIsCallback(false).build())) + .extracting { it.invokedTimes } + .isEqualTo(1) + } + + @Test + fun sideEffectThenCallbackFlush(@InjectBlockingStub coordinatorClient: CoordinatorBlockingStub) { + assertThat( + coordinatorClient.invokeSideEffects( + InvokeSideEffectsRequest.newBuilder().setLastIsCallback(true).build())) + .extracting { it.invokedTimes } + .isEqualTo(1) + } +}