Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement retry and timeout policy for gRPC client. #889

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -101,14 +102,20 @@ jobs:
docker stop dapr_placement
cd dapr
./dist/linux_amd64/release/placement &
- name: Install Local kafka using docker-compose
- name: Install local Kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
docker ps
- name: Install Local mongo database using docker-compose
- name: Install local Mongo database using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |
mkdir -p /home/runner/.local/bin
wget -q ${{ env.TOXIPROXY_URL }} -O /home/runner/.local/bin/toxiproxy-server
chmod +x /home/runner/.local/bin/toxiproxy-server
/home/runner/.local/bin/toxiproxy-server --version
- name: Clean up files
run: mvn clean -B
- name: Build sdk
Expand Down
33 changes: 26 additions & 7 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
Expand Down Expand Up @@ -46,26 +47,41 @@ public class ActorClient implements AutoCloseable {
* Instantiates a new channel for Dapr sidecar communication.
*/
public ActorClient() {
this(Properties.API_PROTOCOL.get());
this(null);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(ResiliencyOptions resiliencyOptions) {
this(Properties.API_PROTOCOL.get(), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol) {
this(apiProtocol, buildManagedChannel(apiProtocol));
private ActorClient(DaprApiProtocol apiProtocol, ResiliencyOptions resiliencyOptions) {
this(apiProtocol, buildManagedChannel(apiProtocol), resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param apiProtocol Dapr's API protocol.
* @param grpcManagedChannel gRPC channel.
* @param resiliencyOptions Client resiliency options.
*/
private ActorClient(DaprApiProtocol apiProtocol, ManagedChannel grpcManagedChannel) {
private ActorClient(
DaprApiProtocol apiProtocol,
ManagedChannel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel);
this.daprClient = buildDaprClient(apiProtocol, grpcManagedChannel, resiliencyOptions);
}

/**
Expand Down Expand Up @@ -119,9 +135,12 @@ private static ManagedChannel buildManagedChannel(DaprApiProtocol apiProtocol) {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(DaprApiProtocol apiProtocol, Channel grpcManagedChannel) {
private static DaprClient buildDaprClient(
DaprApiProtocol apiProtocol,
Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) {
switch (apiProtocol) {
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel));
case GRPC: return new DaprGrpcClient(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions);
case HTTP: {
LOGGER.warn("HTTP client protocol is deprecated and will be removed in Dapr's Java SDK version 1.10.");
return new DaprHttpClient(new DaprHttpBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.CallOptions;
Expand All @@ -39,18 +42,33 @@
*/
class DaprGrpcClient implements DaprClient {

/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;

/**
* Retry policy for SDK calls to Dapr API.
*/
private final RetryPolicy retryPolicy;

/**
* The async gRPC stub.
*/
private DaprGrpc.DaprStub client;
private final DaprGrpc.DaprStub client;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional)
*/
DaprGrpcClient(DaprGrpc.DaprStub grpcClient) {
DaprGrpcClient(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) {
this.client = intercept(grpcClient);
this.timeoutPolicy = new TimeoutPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getTimeout());
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
}

/**
Expand Down Expand Up @@ -78,14 +96,14 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Expand Down Expand Up @@ -114,7 +132,8 @@ private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStu
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Represents the base class for actors.
Expand All @@ -28,8 +29,6 @@
*/
public abstract class AbstractActor {

private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();

/**
* Type of tracing messages.
*/
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class AbstractActor {
/**
* Internal control to assert method invocation on start and finish in this SDK.
*/
private boolean started;
private final AtomicBoolean started;

/**
* Instantiates a new Actor.
Expand All @@ -74,7 +73,7 @@ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) {
runtimeContext.getActorTypeInformation().getName(),
id);
this.actorTrace = runtimeContext.getActorTrace();
this.started = false;
this.started = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -250,14 +249,16 @@ protected Mono<Void> saveState() {

/**
* Resets the cached state of this Actor.
*
* @param force Forces the rollback, even if not in a call.
*/
void rollback() {
if (!this.started) {
void rollback(boolean force) {
if (!force && !this.started.get()) {
throw new IllegalStateException("Cannot reset state before starting call.");
}

this.resetState();
this.started = false;
this.started.set(false);
}

/**
Expand Down Expand Up @@ -302,11 +303,12 @@ Mono<Void> onDeactivateInternal() {
*/
Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (this.started) {
throw new IllegalStateException("Cannot invoke a method before completing previous call.");
if (this.started.get()) {
throw new IllegalStateException(
"Cannot invoke a method before completing previous call. " + getId().toString());
}

this.started = true;
this.started.set(true);
}).then(this.onPreActorMethod(actorMethodContext));
}

Expand All @@ -318,14 +320,13 @@ Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
*/
Mono<Void> onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
return Mono.fromRunnable(() -> {
if (!this.started) {
if (!this.started.get()) {
throw new IllegalStateException("Cannot complete a method before starting a call.");
}
}).then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> {
this.started = false;
}));
})
.then(this.onPostActorMethod(actorMethodContext))
.then(this.saveState())
.then(Mono.fromRunnable(() -> this.started.set(false)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,16 @@ private <T> Mono<T> invoke(ActorId actorId, ActorMethodContext context, Function
this.runtimeContext.getActorTypeInformation().getName()));
}

return actor.onPreActorMethodInternal(context)
return Mono.fromRunnable(() -> actor.rollback(true))
.onErrorMap(throwable -> {
actor.rollback(false);
return throwable;
})
.then(actor.onPreActorMethodInternal(context))
.then((Mono<Object>) func.apply(actor))
.switchIfEmpty(
actor.onPostActorMethodInternal(context))
.flatMap(r -> actor.onPostActorMethodInternal(context).thenReturn(r))
.onErrorMap(throwable -> {
actor.rollback();
return throwable;
})
.map(o -> (T) o);
} catch (Exception e) {
return Mono.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprGrpcClient(DaprGrpc.newStub(channel), null);
}

@Test
Expand Down
5 changes: 4 additions & 1 deletion sdk-tests/components/mongo-statestore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ spec:
- name: databaseName
value: local
- name: collectionName
value: testCollection
value: testCollection
scopes:
- grpcstateclientit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make it easier to run individual ITs without requiring MongoDB

- httpstateclientit
5 changes: 5 additions & 0 deletions sdk-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@
<version>1.3.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
<artifactId>toxiproxy-java</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>

<build>
Expand Down
9 changes: 7 additions & 2 deletions sdk-tests/src/test/java/io/dapr/it/BaseIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.actors.client.ActorClient;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.resiliency.ResiliencyOptions;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;

Expand Down Expand Up @@ -194,8 +195,12 @@ public static void cleanUp() throws Exception {
}
}

protected ActorClient newActorClient() {
ActorClient client = new ActorClient();
protected static ActorClient newActorClient() {
return newActorClient(null);
}

protected static ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
ActorClient client = new ActorClient(resiliencyOptions);
TO_BE_CLOSED.add(client);
return client;
}
Expand Down
18 changes: 18 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,25 @@ public void run() throws InterruptedException, IOException {
}
});

final Thread stderrReader = new Thread(() -> {
try {
try (InputStream stderr = this.process.getErrorStream()) {
try (InputStreamReader isr = new InputStreamReader(stderr)) {
try (BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
System.err.println(line);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
});

stdoutReader.start();
stderrReader.start();
// Waits for success to happen within 1 minute.
finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!success.get()) {
Expand Down
Loading