Skip to content

Commit

Permalink
Side effects flush behaviour test (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Aug 4, 2022
1 parent 985e1dc commit 598db23
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.e2e.functions.collections;

import dev.restate.e2e.functions.utils.ServiceRunner;

import java.io.IOException;

public class Main {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ service Coordinator {
rpc Complex (ComplexRequest) returns (ComplexResponse);
rpc Timeout (Duration) returns (TimeoutResponse);
rpc InvokeSequentially(InvokeSequentiallyRequest) returns (google.protobuf.Empty);
rpc InvokeSideEffects(InvokeSideEffectsRequest) returns (InvokeSideEffectsResult);
}

message Duration {
Expand All @@ -43,3 +44,11 @@ message TimeoutResponse {
message InvokeSequentiallyRequest {
repeated bool execute_as_background_call = 1;
}

message InvokeSideEffectsRequest {
bool last_is_callback = 1;
}

message InvokeSideEffectsResult {
int32 invoked_times = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -135,4 +136,26 @@ public void invokeSequentially(
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}

@Override
public void invokeSideEffects(
InvokeSideEffectsRequest request, StreamObserver<InvokeSideEffectsResult> responseObserver) {
RestateContext ctx = RestateContext.current();

AtomicInteger invokedSideEffects = new AtomicInteger(0);

ctx.sideEffect(() -> invokedSideEffects.incrementAndGet());
ctx.sideEffect(() -> invokedSideEffects.incrementAndGet());
if (request.getLastIsCallback()) {
ctx.callback(Void.TYPE, ignored -> invokedSideEffects.incrementAndGet());
} else {
ctx.sideEffect(() -> invokedSideEffects.incrementAndGet());
}

responseObserver.onNext(
InvokeSideEffectsResult.newBuilder()
.setInvokedTimes(invokedSideEffects.intValue())
.build());
responseObserver.onCompleted();
}
}
6 changes: 2 additions & 4 deletions tests/src/test/kotlin/dev/restate/e2e/Containers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import org.testcontainers.containers.GenericContainer

object Containers {
val COLLECTIONS_FUNCTION_SPEC =
FunctionSpec.builder(
"restatedev/e2e-collections",
ListServiceGrpc.getServiceDescriptor())
.build()
FunctionSpec.builder("restatedev/e2e-collections", ListServiceGrpc.getServiceDescriptor())
.build()

val COUNTER_FUNCTION_SPEC =
FunctionSpec.builder(
Expand Down
28 changes: 16 additions & 12 deletions tests/src/test/kotlin/dev/restate/e2e/OrderingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,45 @@ import dev.restate.e2e.functions.coordinator.InvokeSequentiallyRequest
import dev.restate.e2e.utils.InjectBlockingStub
import dev.restate.e2e.utils.RestateDeployer
import dev.restate.e2e.utils.RestateDeployerExtension
import java.util.stream.Stream
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import java.util.stream.Stream

class OrderingTest {
companion object {
@RegisterExtension
val deployerExt: RestateDeployerExtension =
RestateDeployerExtension(
RestateDeployer.Builder().withFunction(Containers.COORDINATOR_FUNCTION_SPEC).withFunction(Containers.COLLECTIONS_FUNCTION_SPEC).build())
RestateDeployer.Builder()
.withFunction(Containers.COORDINATOR_FUNCTION_SPEC)
.withFunction(Containers.COLLECTIONS_FUNCTION_SPEC)
.build())

@JvmStatic
fun ordering(): Stream<Arguments> {
return Stream.of(
Arguments.of(booleanArrayOf(true, false, true)),
Arguments.of(booleanArrayOf(false, true, false))
)
Arguments.of(booleanArrayOf(true, false, true)),
Arguments.of(booleanArrayOf(false, true, false)))
}
}

@ParameterizedTest
@MethodSource
fun ordering(ordering: BooleanArray, @InjectBlockingStub coordinatorClient: CoordinatorBlockingStub, @InjectBlockingStub listClient: ListServiceBlockingStub) {
fun ordering(
ordering: BooleanArray,
@InjectBlockingStub coordinatorClient: CoordinatorBlockingStub,
@InjectBlockingStub listClient: ListServiceBlockingStub
) {
coordinatorClient.invokeSequentially(
InvokeSequentiallyRequest.newBuilder()
.addAllExecuteAsBackgroundCall(ordering.asIterable())
.build()
)
InvokeSequentiallyRequest.newBuilder()
.addAllExecuteAsBackgroundCall(ordering.asIterable())
.build())

val listClientRequest = Request.newBuilder().setListName("invokeSequentially").build()

assertThat(listClient.clear(listClientRequest).valuesList)
.containsExactly("0", "1", "2")
assertThat(listClient.clear(listClientRequest).valuesList).containsExactly("0", "1", "2")
}
}
37 changes: 37 additions & 0 deletions tests/src/test/kotlin/dev/restate/e2e/SideEffectTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package dev.restate.e2e

import dev.restate.e2e.functions.coordinator.CoordinatorGrpc.CoordinatorBlockingStub
import dev.restate.e2e.functions.coordinator.InvokeSideEffectsRequest
import dev.restate.e2e.utils.InjectBlockingStub
import dev.restate.e2e.utils.RestateDeployer
import dev.restate.e2e.utils.RestateDeployerExtension
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension

class SideEffectTest {
companion object {
@RegisterExtension
val deployerExt: RestateDeployerExtension =
RestateDeployerExtension(
RestateDeployer.Builder().withFunction(Containers.COORDINATOR_FUNCTION_SPEC).build())
}

@Test
fun sideEffectFlush(@InjectBlockingStub coordinatorClient: CoordinatorBlockingStub) {
assertThat(
coordinatorClient.invokeSideEffects(
InvokeSideEffectsRequest.newBuilder().setLastIsCallback(false).build()))
.extracting { it.invokedTimes }
.isEqualTo(1)
}

@Test
fun sideEffectThenCallbackFlush(@InjectBlockingStub coordinatorClient: CoordinatorBlockingStub) {
assertThat(
coordinatorClient.invokeSideEffects(
InvokeSideEffectsRequest.newBuilder().setLastIsCallback(true).build()))
.extracting { it.invokedTimes }
.isEqualTo(1)
}
}

0 comments on commit 598db23

Please sign in to comment.