Skip to content

Commit

Permalink
Support remote endpoint. (#877)
Browse files Browse the repository at this point in the history
* Support remote endpoint.

Signed-off-by: Artur Souza <[email protected]>

* Use GRPC_ENDPOINT and HTTP_ENDPOINT in integration tests.

Signed-off-by: Artur Souza <[email protected]>

* Fix happy path for waiting for sidecar test.

Signed-off-by: Artur Souza <[email protected]>

---------

Signed-off-by: Artur Souza <[email protected]>
Co-authored-by: Mukundan Sundararajan <[email protected]>
  • Loading branch information
artursouza and mukundansundar authored Jun 19, 2023
1 parent d8dbb0e commit 201dbc5
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 126 deletions.
4 changes: 4 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void use() {
System.getProperties().setProperty(
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort());
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort());
}

public void switchToGRPC() {
Expand Down
32 changes: 21 additions & 11 deletions sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.net.URI;

/**
* A builder for the DaprClient,
Expand Down Expand Up @@ -162,19 +163,28 @@ private DaprClient buildDaprClient(DaprApiProtocol protocol) {
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
*/
private DaprClient buildDaprClientGrpc() {
final ManagedChannel channel = buildGrpcManagedChanel();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer);
}

private ManagedChannel buildGrpcManagedChanel() {
String host = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
boolean insecure = true;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
URI uri = URI.create(grpcEndpoint);
insecure = uri.getScheme().equalsIgnoreCase("http");
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
}
ManagedChannel channel = ManagedChannelBuilder.forAddress(
Properties.SIDECAR_IP.get(), port).usePlaintext().userAgent(Version.getSdkVersion()).build();
Closeable closeableChannel = () -> {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
};
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer);
ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(host, port)
.userAgent(Version.getSdkVersion());
if (insecure) {
builder = builder.usePlaintext();
}
return builder.build();
}

/**
Expand Down
119 changes: 55 additions & 64 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprGrpc;
Expand All @@ -69,7 +68,6 @@
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -92,7 +90,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
/**
* The GRPC managed channel to be used.
*/
private Closeable channel;
private final GrpcChannelFacade channel;

/**
* The async gRPC stub.
Expand All @@ -102,19 +100,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param closeableChannel A closeable for a Managed GRPC channel
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder
*/
DaprClientGrpc(
Closeable closeableChannel,
GrpcChannelFacade channel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.channel = channel;
this.asyncStub = intercept(asyncStub);
}

Expand Down Expand Up @@ -145,13 +143,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State
*/
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
return this.channel.waitForChannelReady(timeoutInMilliseconds);
}

/**
Expand Down Expand Up @@ -193,7 +185,6 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
}

/**
*
* {@inheritDoc}
*/
@Override
Expand All @@ -209,7 +200,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
throw new IllegalArgumentException("pubsubName and topic name cannot be null or empty");
}

for (BulkPublishEntry<?> entry: request.getEntries()) {
for (BulkPublishEntry<?> entry : request.getEntries()) {
Object event = entry.getEvent();
byte[] data;
String contentType = entry.getContentType();
Expand Down Expand Up @@ -251,7 +242,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
}

Map<String, BulkPublishEntry<T>> entryMap = new HashMap<>();
for (BulkPublishEntry<T> entry: request.getEntries()) {
for (BulkPublishEntry<T> entry : request.getEntries()) {
entryMap.put(entry.getEntryId(), entry);
}
return Mono.deferContextual(
Expand Down Expand Up @@ -299,17 +290,17 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -346,17 +337,17 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -442,12 +433,12 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
DaprProtos.GetBulkStateRequest envelope = builder.build();

return Mono.deferContextual(
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
.getBulkState(envelope, it)
)
).map(
it ->
it
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
.getBulkState(envelope, it)
)
).map(
it ->
it
.getItemsList()
.stream()
.map(b -> {
Expand Down Expand Up @@ -705,8 +696,8 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
}

DaprProtos.GetSecretRequest.Builder requestBuilder = DaprProtos.GetSecretRequest.newBuilder()
.setStoreName(secretStoreName)
.setKey(key);
.setStoreName(secretStoreName)
.setKey(key);

if (metadata != null) {
requestBuilder.putAllMetadata(metadata);
Expand Down Expand Up @@ -740,18 +731,18 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest

return Mono.deferContextual(
context ->
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
)
this.<DaprProtos.GetBulkSecretResponse>createMono(
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
)
).map(it -> {
Map<String, DaprProtos.SecretResponse> secretsMap = it.getDataMap();
if (secretsMap == null) {
return Collections.emptyMap();
}
return secretsMap
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap()));
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap()));
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
Expand Down Expand Up @@ -805,7 +796,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
try {
return buildQueryStateKeyValue(v, type);
} catch (Exception e) {
throw DaprException.propagate(e);
throw DaprException.propagate(e);
}
})
.collect(Collectors.toList());
Expand Down Expand Up @@ -900,7 +891,7 @@ private Mono<Map<String, ConfigurationItem>> getConfiguration(DaprProtos.GetConf
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItems().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
}
return Collections.unmodifiableMap(configMap);
}
Expand Down Expand Up @@ -934,15 +925,15 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf
return this.<DaprProtos.SubscribeConfigurationResponse>createFlux(
it -> intercept(asyncStub).subscribeConfiguration(envelope, it)
).map(
it -> {
Map<String, ConfigurationItem> configMap = new HashMap<>();
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
it -> {
Map<String, ConfigurationItem> configMap = new HashMap<>();
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
}
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
}
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
}
);
} catch (Exception ex) {
return DaprException.wrapFlux(ex);
Expand Down Expand Up @@ -990,8 +981,8 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
private ConfigurationItem buildConfigurationItem(
CommonProtos.ConfigurationItem configurationItem, String key) {
return new ConfigurationItem(
key,
configurationItem.getValue(),
key,
configurationItem.getValue(),
configurationItem.getVersion(),
configurationItem.getMetadataMap()
);
Expand Down
35 changes: 23 additions & 12 deletions sdk/src/main/java/io/dapr/client/DaprHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import reactor.util.context.ContextView;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -141,14 +142,9 @@ public int getStatusCode() {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Hostname used to communicate to Dapr's HTTP endpoint.
* Endpoint used to communicate to Dapr's HTTP endpoint.
*/
private final String hostname;

/**
* Port used to communicate to Dapr's HTTP endpoint.
*/
private final int port;
private final URI uri;

/**
* Http client used for all API calls.
Expand All @@ -163,8 +159,18 @@ public int getStatusCode() {
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
this.hostname = hostname;
this.port = port;
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
this.httpClient = httpClient;
}

/**
* Creates a new instance of {@link DaprHttp}.
*
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String uri, OkHttpClient httpClient) {
this.uri = URI.create(uri);
this.httpClient = httpClient;
}

Expand Down Expand Up @@ -273,9 +279,14 @@ private CompletableFuture<Response> doInvokeApi(String method,
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
.host(this.hostname)
.port(this.port);
urlBuilder.scheme(uri.getScheme())
.host(uri.getHost());
if (uri.getPort() > 0) {
urlBuilder.port(uri.getPort());
}
if (uri.getPath() != null) {
urlBuilder.addPathSegments(uri.getPath());
}
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ private DaprHttp buildDaprHttp() {
}
}

String endpoint = Properties.HTTP_ENDPOINT.get();
if ((endpoint != null) && !endpoint.isEmpty()) {
return new DaprHttp(endpoint, OK_HTTP_CLIENT);
}

return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
}
}
Loading

0 comments on commit 201dbc5

Please sign in to comment.