Skip to content

Commit

Permalink
Implement actor client metadata. (#1165)
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <[email protected]>
Co-authored-by: Cassie Coyle <[email protected]>
  • Loading branch information
artursouza and cicoyle authored Dec 3, 2024
1 parent bd1667b commit fcdf3c3
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 20 deletions.
28 changes: 25 additions & 3 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.Map;

/**
* Holds a client for Dapr sidecar communication. ActorClient should be reused.
*/
Expand Down Expand Up @@ -59,7 +62,7 @@ public ActorClient(ResiliencyOptions resiliencyOptions) {
* @param overrideProperties Override properties.
*/
public ActorClient(Properties overrideProperties) {
this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
this(overrideProperties, null);
}

/**
Expand All @@ -69,21 +72,38 @@ public ActorClient(Properties overrideProperties) {
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
this(overrideProperties, null, resiliencyOptions);
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param overrideProperties Override properties.
* @param metadata gRPC metadata or HTTP headers for actor invocation.
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
}

/**
* Instantiates a new channel for Dapr sidecar communication.
*
* @param grpcManagedChannel gRPC channel.
* @param metadata gRPC metadata or HTTP headers for actor invocation.
* @param resiliencyOptions Client resiliency options.
* @param daprApiToken Dapr API token.
*/
private ActorClient(
ManagedChannel grpcManagedChannel,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
this.daprClient = buildDaprClient(grpcManagedChannel, metadata, resiliencyOptions, daprApiToken);
}

/**
Expand Down Expand Up @@ -137,10 +157,12 @@ private static ManagedChannel buildManagedChannel(Properties overrideProperties)
*/
private static DaprClient buildDaprClient(
Channel grpcManagedChannel,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
return new DaprClientImpl(
DaprGrpc.newStub(grpcManagedChannel),
metadata == null ? null : Collections.unmodifiableMap(metadata),
resiliencyOptions,
daprApiToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

Expand All @@ -57,19 +58,30 @@ class DaprClientImpl implements DaprClient {
*/
private final DaprClientGrpcInterceptors grpcInterceptors;

/**
* Metadata for actor invocation requests.
*/
private final Map<String, String> metadata;

/**
* Internal constructor.
*
* @param grpcClient Dapr's GRPC client.
* @param metadata gRPC metadata or HTTP headers for actor server to receive.
* @param resiliencyOptions Client resiliency options (optional).
* @param daprApiToken Dapr API token (optional).
*/
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
DaprClientImpl(
DaprGrpc.DaprStub grpcClient,
Map<String, String> metadata,
ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this.client = grpcClient;
this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
this.metadata = metadata == null ? Map.of() : metadata;
}

/**
Expand All @@ -82,6 +94,7 @@ public Mono<byte[]> invoke(String actorType, String actorId, String methodName,
.setActorType(actorType)
.setActorId(actorId)
.setMethod(methodName)
.putAllMetadata(this.metadata)
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
.build();
return Mono.deferContextual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void setup() throws IOException {
InProcessChannelBuilder.forName(serverName).directExecutor().build());

// Create a HelloWorldClient using the in-process channel;
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null, null);
}

@Test
Expand Down
12 changes: 10 additions & 2 deletions sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,19 @@ public DaprClientBuilder newDaprClientBuilder() {
}

public ActorClient newActorClient() {
return this.newActorClient(null);
return this.newActorClient(null, null);
}

public ActorClient newActorClient(Map<String, String> metadata) {
return this.newActorClient(metadata, null);
}

public ActorClient newActorClient(ResiliencyOptions resiliencyOptions) {
return new ActorClient(new Properties(this.getPropertyOverrides()), resiliencyOptions);
return this.newActorClient(null, resiliencyOptions);
}

public ActorClient newActorClient(Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
return new ActorClient(new Properties(this.getPropertyOverrides()), metadata, resiliencyOptions);
}

public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {
Expand Down
38 changes: 30 additions & 8 deletions sdk-tests/src/test/java/io/dapr/it/actors/ActorExceptionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.app.MyActor;
import io.dapr.it.actors.app.MyActorService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionSubstring;

Expand All @@ -30,23 +35,24 @@ public class ActorExceptionIT extends BaseIT {

private static Logger logger = LoggerFactory.getLogger(ActorExceptionIT.class);

@Test
public void exceptionTest() throws Exception {
private static DaprRun run;

@BeforeAll
public static void start() throws Exception {
// The call below will fail if service cannot start successfully.
var run = startDaprApp(
run = startDaprApp(
ActorExceptionIT.class.getSimpleName(),
MyActorService.SUCCESS_MESSAGE,
MyActorService.class,
true,
60000);
}

logger.debug("Creating proxy builder");
@Test
public void exceptionTest() throws Exception {
ActorProxyBuilder<MyActor> proxyBuilder =
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient()));
logger.debug("Creating actorId");
ActorId actorId1 = new ActorId("1");
logger.debug("Building proxy");
MyActor proxy = proxyBuilder.build(actorId1);
MyActor proxy = proxyBuilder.build(new ActorId("1"));

callWithRetry(() -> {
assertThrowsDaprExceptionSubstring(
Expand All @@ -55,4 +61,20 @@ public void exceptionTest() throws Exception {
() -> proxy.throwException());
}, 10000);
}

@Test
public void exceptionDueToMetadataTest() throws Exception {
// Setting this HTTP header via actor metadata will cause the Actor HTTP server to error.
Map<String, String> metadata = Map.of("Content-Length", "9999");
ActorProxyBuilder<MyActor> proxyBuilderMetadataOverride =
new ActorProxyBuilder("MyActorTest", MyActor.class, deferClose(run.newActorClient(metadata)));

MyActor proxyWithMetadata = proxyBuilderMetadataOverride.build(new ActorId("2"));
callWithRetry(() -> {
assertThrowsDaprExceptionSubstring(
"INTERNAL",
"ContentLength=9999 with Body length 13",
() -> proxyWithMetadata.say("hello world"));
}, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor;
import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor;
import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor;
import io.dapr.internal.grpc.interceptors.DaprMetadataReceiverInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor;
import io.dapr.internal.resiliency.TimeoutPolicy;
Expand All @@ -35,10 +35,18 @@ public class DaprClientGrpcInterceptors {

private final TimeoutPolicy timeoutPolicy;

/**
* Instantiates a holder of all gRPC interceptors.
*/
public DaprClientGrpcInterceptors() {
this(null, null);
}

/**
* Instantiates a holder of all gRPC interceptors.
* @param daprApiToken Dapr API token.
* @param timeoutPolicy Timeout Policy.
*/
public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) {
this.daprApiToken = daprApiToken;
this.timeoutPolicy = timeoutPolicy;
Expand Down Expand Up @@ -118,7 +126,7 @@ public <T extends AbstractStub<T>> T intercept(
new DaprApiTokenInterceptor(this.daprApiToken),
new DaprTimeoutInterceptor(this.timeoutPolicy),
new DaprTracingInterceptor(context),
new DaprMetadataInterceptor(metadataConsumer));
new DaprMetadataReceiverInterceptor(metadataConsumer));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.dapr.internal.grpc.interceptors;

import io.dapr.client.Headers;
import io.dapr.config.Properties;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
/**
* Consumes gRPC metadata.
*/
public class DaprMetadataInterceptor implements ClientInterceptor {
public class DaprMetadataReceiverInterceptor implements ClientInterceptor {

private final Consumer<Metadata> metadataConsumer;

/**
* Creates an instance of the consumer for gRPC metadata.
* @param metadataConsumer gRPC metadata consumer
*/
public DaprMetadataInterceptor(Consumer<Metadata> metadataConsumer) {
public DaprMetadataReceiverInterceptor(Consumer<Metadata> metadataConsumer) {
this.metadataConsumer = metadataConsumer;
}

Expand Down

0 comments on commit fcdf3c3

Please sign in to comment.