diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index fb69c9687..36ce77182 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -140,6 +140,10 @@ public void use() { System.getProperties().setProperty( Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); + System.getProperties().setProperty( + Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort()); + System.getProperties().setProperty( + Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort()); } public void switchToGRPC() { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 717e3309a..1f7c485fc 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.net.URI; /** * A builder for the DaprClient, @@ -162,19 +163,28 @@ private DaprClient buildDaprClient(DaprApiProtocol protocol) { * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number. */ private DaprClient buildDaprClientGrpc() { + final ManagedChannel channel = buildGrpcManagedChanel(); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); + DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); + return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer); + } + + private ManagedChannel buildGrpcManagedChanel() { + String host = Properties.SIDECAR_IP.get(); int port = Properties.GRPC_PORT.get(); - if (port <= 0) { - throw new IllegalArgumentException("Invalid port."); + boolean insecure = true; + String grpcEndpoint = Properties.GRPC_ENDPOINT.get(); + if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) { + URI uri = URI.create(grpcEndpoint); + insecure = uri.getScheme().equalsIgnoreCase("http"); + port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443); } - ManagedChannel channel = ManagedChannelBuilder.forAddress( - Properties.SIDECAR_IP.get(), port).usePlaintext().userAgent(Version.getSdkVersion()).build(); - Closeable closeableChannel = () -> { - if (channel != null && !channel.isShutdown()) { - channel.shutdown(); - } - }; - DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); - return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer); + ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(host, port) + .userAgent(Version.getSdkVersion()); + if (insecure) { + builder = builder.usePlaintext(); + } + return builder.build(); } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 9b795c7d7..49cf4aca9 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -50,7 +50,6 @@ import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.DefaultContentTypeConverter; -import io.dapr.utils.NetworkUtils; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; import io.dapr.v1.DaprGrpc; @@ -69,7 +68,6 @@ import reactor.core.publisher.MonoSink; import reactor.util.context.ContextView; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -92,7 +90,7 @@ public class DaprClientGrpc extends AbstractDaprClient { /** * The GRPC managed channel to be used. */ - private Closeable channel; + private final GrpcChannelFacade channel; /** * The async gRPC stub. @@ -102,19 +100,19 @@ public class DaprClientGrpc extends AbstractDaprClient { /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * - * @param closeableChannel A closeable for a Managed GRPC channel + * @param channel Facade for the managed GRPC channel * @param asyncStub async gRPC stub * @param objectSerializer Serializer for transient request/response objects. * @param stateSerializer Serializer for state objects. * @see DaprClientBuilder */ DaprClientGrpc( - Closeable closeableChannel, + GrpcChannelFacade channel, DaprGrpc.DaprStub asyncStub, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { super(objectSerializer, stateSerializer); - this.channel = closeableChannel; + this.channel = channel; this.asyncStub = intercept(asyncStub); } @@ -145,13 +143,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State */ @Override public Mono waitForSidecar(int timeoutInMilliseconds) { - return Mono.fromRunnable(() -> { - try { - NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + return this.channel.waitForChannelReady(timeoutInMilliseconds); } /** @@ -193,7 +185,6 @@ public Mono publishEvent(PublishEventRequest request) { } /** - * * {@inheritDoc} */ @Override @@ -209,7 +200,7 @@ public Mono> publishEvents(BulkPublishRequest requ throw new IllegalArgumentException("pubsubName and topic name cannot be null or empty"); } - for (BulkPublishEntry entry: request.getEntries()) { + for (BulkPublishEntry entry : request.getEntries()) { Object event = entry.getEvent(); byte[] data; String contentType = entry.getContentType(); @@ -251,7 +242,7 @@ public Mono> publishEvents(BulkPublishRequest requ } Map> entryMap = new HashMap<>(); - for (BulkPublishEntry entry: request.getEntries()) { + for (BulkPublishEntry entry : request.getEntries()) { entryMap.put(entry.getEntryId(), entry); } return Mono.deferContextual( @@ -299,17 +290,17 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 return Mono.deferContextual( - context -> this.createMono( - it -> intercept(context, asyncStub).invokeService(envelope, it) - ) - ).flatMap( - it -> { - try { - return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); - } catch (IOException e) { - throw DaprException.propagate(e); - } - } + context -> this.createMono( + it -> intercept(context, asyncStub).invokeService(envelope, it) + ) + ).flatMap( + it -> { + try { + return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); + } catch (IOException e) { + throw DaprException.propagate(e); + } + } ); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -346,17 +337,17 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) DaprProtos.InvokeBindingRequest envelope = builder.build(); return Mono.deferContextual( - context -> this.createMono( - it -> intercept(context, asyncStub).invokeBinding(envelope, it) - ) - ).flatMap( - it -> { - try { - return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); - } catch (IOException e) { - throw DaprException.propagate(e); - } - } + context -> this.createMono( + it -> intercept(context, asyncStub).invokeBinding(envelope, it) + ) + ).flatMap( + it -> { + try { + return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); + } catch (IOException e) { + throw DaprException.propagate(e); + } + } ); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -442,12 +433,12 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe DaprProtos.GetBulkStateRequest envelope = builder.build(); return Mono.deferContextual( - context -> this.createMono(it -> intercept(context, asyncStub) - .getBulkState(envelope, it) - ) - ).map( - it -> - it + context -> this.createMono(it -> intercept(context, asyncStub) + .getBulkState(envelope, it) + ) + ).map( + it -> + it .getItemsList() .stream() .map(b -> { @@ -705,8 +696,8 @@ public Mono> getSecret(GetSecretRequest request) { } DaprProtos.GetSecretRequest.Builder requestBuilder = DaprProtos.GetSecretRequest.newBuilder() - .setStoreName(secretStoreName) - .setKey(key); + .setStoreName(secretStoreName) + .setKey(key); if (metadata != null) { requestBuilder.putAllMetadata(metadata); @@ -740,18 +731,18 @@ public Mono>> getBulkSecret(GetBulkSecretRequest return Mono.deferContextual( context -> - this.createMono( - it -> intercept(context, asyncStub).getBulkSecret(envelope, it) - ) + this.createMono( + it -> intercept(context, asyncStub).getBulkSecret(envelope, it) + ) ).map(it -> { Map secretsMap = it.getDataMap(); if (secretsMap == null) { return Collections.emptyMap(); } return secretsMap - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap())); + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, s -> s.getValue().getSecretsMap())); }); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -805,7 +796,7 @@ public Mono> queryState(QueryStateRequest request, Typ try { return buildQueryStateKeyValue(v, type); } catch (Exception e) { - throw DaprException.propagate(e); + throw DaprException.propagate(e); } }) .collect(Collectors.toList()); @@ -900,7 +891,7 @@ private Mono> getConfiguration(DaprProtos.GetConf Iterator> itr = it.getItems().entrySet().iterator(); while (itr.hasNext()) { Map.Entry entry = itr.next(); - configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey())); + configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey())); } return Collections.unmodifiableMap(configMap); } @@ -934,15 +925,15 @@ public Flux subscribeConfiguration(SubscribeConf return this.createFlux( it -> intercept(asyncStub).subscribeConfiguration(envelope, it) ).map( - it -> { - Map configMap = new HashMap<>(); - Iterator> itr = it.getItemsMap().entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey())); + it -> { + Map configMap = new HashMap<>(); + Iterator> itr = it.getItemsMap().entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey())); + } + return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap)); } - return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap)); - } ); } catch (Exception ex) { return DaprException.wrapFlux(ex); @@ -990,8 +981,8 @@ public Mono unsubscribeConfiguration(Unsubscri private ConfigurationItem buildConfigurationItem( CommonProtos.ConfigurationItem configurationItem, String key) { return new ConfigurationItem( - key, - configurationItem.getValue(), + key, + configurationItem.getValue(), configurationItem.getVersion(), configurationItem.getMetadataMap() ); diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 830bfad61..9b1fc374a 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -32,6 +32,7 @@ import reactor.util.context.ContextView; import java.io.IOException; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -141,14 +142,9 @@ public int getStatusCode() { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); /** - * Hostname used to communicate to Dapr's HTTP endpoint. + * Endpoint used to communicate to Dapr's HTTP endpoint. */ - private final String hostname; - - /** - * Port used to communicate to Dapr's HTTP endpoint. - */ - private final int port; + private final URI uri; /** * Http client used for all API calls. @@ -163,8 +159,18 @@ public int getStatusCode() { * @param httpClient RestClient used for all API calls in this new instance. */ DaprHttp(String hostname, int port, OkHttpClient httpClient) { - this.hostname = hostname; - this.port = port; + this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port); + this.httpClient = httpClient; + } + + /** + * Creates a new instance of {@link DaprHttp}. + * + * @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com") + * @param httpClient RestClient used for all API calls in this new instance. + */ + DaprHttp(String uri, OkHttpClient httpClient) { + this.uri = URI.create(uri); this.httpClient = httpClient; } @@ -273,9 +279,14 @@ private CompletableFuture doInvokeApi(String method, body = RequestBody.Companion.create(content, mediaType); } HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); - urlBuilder.scheme(DEFAULT_HTTP_SCHEME) - .host(this.hostname) - .port(this.port); + urlBuilder.scheme(uri.getScheme()) + .host(uri.getHost()); + if (uri.getPort() > 0) { + urlBuilder.port(uri.getPort()); + } + if (uri.getPath() != null) { + urlBuilder.addPathSegments(uri.getPath()); + } for (String pathSegment : pathSegments) { urlBuilder.addPathSegment(pathSegment); } diff --git a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java index 3c4c34820..73842b546 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java @@ -85,6 +85,11 @@ private DaprHttp buildDaprHttp() { } } + String endpoint = Properties.HTTP_ENDPOINT.get(); + if ((endpoint != null) && !endpoint.isEmpty()) { + return new DaprHttp(endpoint, OK_HTTP_CLIENT); + } + return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT); } } diff --git a/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java b/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java new file mode 100644 index 000000000..d7570f6d7 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/GrpcChannelFacade.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client; + +import io.dapr.v1.DaprGrpc; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import reactor.core.publisher.Mono; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Facade for common operations on gRPC channel. + * + * @see DaprGrpc + * @see DaprClient + */ +class GrpcChannelFacade implements Closeable { + + /** + * The GRPC managed channel to be used. + */ + private final ManagedChannel channel; + + + /** + * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder + * + * @param channel A Managed GRPC channel + * @see DaprClientBuilder + */ + GrpcChannelFacade(ManagedChannel channel) { + this.channel = channel; + } + + @Override + public void close() throws IOException { + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } + } + + public Mono waitForChannelReady(int timeoutInMilliseconds) { + return Mono.fromRunnable(() -> { + boolean isReady = false; + long startTime = System.currentTimeMillis(); + while (!isReady && System.currentTimeMillis() - startTime < timeoutInMilliseconds) { + isReady = this.channel.getState(true) == ConnectivityState.READY; + if (!isReady) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Waiting for gRPC channel ready interrupted.", e); + } + } + } + + if (!isReady) { + throw new RuntimeException("Timeout waiting for gRPC channel to be ready."); + } + }); + } +} diff --git a/sdk/src/main/java/io/dapr/config/Properties.java b/sdk/src/main/java/io/dapr/config/Properties.java index d0886f9d3..08bd911d6 100644 --- a/sdk/src/main/java/io/dapr/config/Properties.java +++ b/sdk/src/main/java/io/dapr/config/Properties.java @@ -99,6 +99,22 @@ public class Properties { "DAPR_GRPC_PORT", DEFAULT_GRPC_PORT); + /** + * GRPC endpoint for remote sidecar connectivity. + */ + public static final Property GRPC_ENDPOINT = new StringProperty( + "dapr.grpc.endpoint", + "DAPR_GRPC_ENDPOINT", + null); + + /** + * GRPC endpoint for remote sidecar connectivity. + */ + public static final Property HTTP_ENDPOINT = new StringProperty( + "dapr.http.endpoint", + "DAPR_HTTP_ENDPOINT", + null); + /** * Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car. * @deprecated This attribute will be deleted at SDK version 1.10. diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index 2b747a3bf..fedae450d 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -163,14 +163,9 @@ public ServerCall.Listener interceptCall(ServerCall { - if (channel != null && !channel.isShutdown()) { - channel.shutdown(); - } - }; DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); client = new DaprClientGrpc( - closeableChannel, asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); } @Test diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 434fb6ca2..3e5ced5e1 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -45,6 +45,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.mockito.stubbing.Answer; import reactor.core.publisher.Mono; @@ -91,52 +92,39 @@ public class DaprClientGrpcTest { private static final String SECRET_STORE_NAME = "MySecretStore"; - private Closeable closeable; + private GrpcChannelFacade channel; private DaprGrpc.DaprStub daprStub; private DaprClient client; private ObjectSerializer serializer; @Before public void setup() throws IOException { - closeable = mock(Closeable.class); + channel = mock(GrpcChannelFacade.class); daprStub = mock(DaprGrpc.DaprStub.class); when(daprStub.withInterceptors(any())).thenReturn(daprStub); DaprClient grpcClient = new DaprClientGrpc( - closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); client = new DaprClientProxy(grpcClient); serializer = new ObjectSerializer(); - doNothing().when(closeable).close(); + doNothing().when(channel).close(); } @After public void tearDown() throws Exception { client.close(); - verify(closeable).close(); - verifyNoMoreInteractions(closeable); + verify(channel).close(); } @Test - public void waitForSidecarTimeout() throws Exception { - int port = findFreePort(); - System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port)); + public void waitForSidecarTimeout() { + Mockito.doReturn(Mono.error(new RuntimeException())).when(channel).waitForChannelReady(1); assertThrows(RuntimeException.class, () -> client.waitForSidecar(1).block()); } @Test - public void waitForSidecarTimeoutOK() throws Exception { - try (ServerSocket serverSocket = new ServerSocket(0)) { - final int port = serverSocket.getLocalPort(); - System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port)); - Thread t = new Thread(() -> { - try { - try (Socket socket = serverSocket.accept()) { - } - } catch (IOException e) { - } - }); - t.start(); - client.waitForSidecar(10000).block(); - } + public void waitForSidecarOK() { + Mockito.doReturn(Mono.empty()).when(channel).waitForChannelReady(10000); + client.waitForSidecar(10000).block(); } @Test @@ -172,7 +160,7 @@ public void publishEventCallbackExceptionThrownTest() { @Test public void publishEventSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer()); + client = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(Empty.getDefaultInstance()); @@ -290,7 +278,7 @@ public void invokeBindingIllegalArgumentExceptionTest() { @Test public void invokeBindingSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer()); + client = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; observer.onNext(Empty.getDefaultInstance()); @@ -1451,7 +1439,7 @@ public void executeTransactionIllegalArgumentExceptionTest() { @Test public void executeTransactionSerializerExceptionTest() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - client = new DaprClientGrpc(closeable, daprStub, mockSerializer, mockSerializer); + client = new DaprClientGrpc(channel, daprStub, mockSerializer, mockSerializer); String etag = "ETag1"; String key = "key1"; String data = "my data"; diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index dcf6051e4..77fb9ef4b 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -69,25 +69,25 @@ public class DaprPreviewClientGrpcTest { private static final String TOPIC_NAME = "testTopic"; - private Closeable closeable; + private GrpcChannelFacade channel; private DaprGrpc.DaprStub daprStub; private DaprPreviewClient previewClient; @Before public void setup() throws IOException { - closeable = mock(Closeable.class); + channel = mock(GrpcChannelFacade.class); daprStub = mock(DaprGrpc.DaprStub.class); when(daprStub.withInterceptors(any())).thenReturn(daprStub); previewClient = new DaprClientGrpc( - closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); - doNothing().when(closeable).close(); + channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + doNothing().when(channel).close(); } @After public void tearDown() throws Exception { previewClient.close(); - verify(closeable).close(); - verifyNoMoreInteractions(closeable); + verify(channel).close(); + verifyNoMoreInteractions(channel); } @Test @@ -143,7 +143,7 @@ public void publishEventsContentTypeMismatchException() throws IOException { @Test public void publishEventsSerializeException() throws IOException { DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - previewClient = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer()); + previewClient = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer()); doAnswer((Answer) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; diff --git a/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java b/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java new file mode 100644 index 000000000..6eb5ed4b7 --- /dev/null +++ b/sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client; + +import io.dapr.v1.DaprGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static io.dapr.utils.TestUtils.findFreePort; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class GrpcChannelFacadeTest { + + private static int port; + + public static Server server; + + @BeforeAll + public static void setup() throws IOException { + port = findFreePort(); + server = ServerBuilder.forPort(port) + .addService(new DaprGrpc.DaprImplBase() { + }) + .build(); + server.start(); + } + + @AfterAll + public static void teardown() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + @Test + public void waitForSidecarTimeout() throws Exception { + int unusedPort = findFreePort(); + ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort) + .usePlaintext().build(); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); + + assertThrows(RuntimeException.class, () -> channelFacade.waitForChannelReady(1).block()); + } + + @Test + public void waitForSidecarOK() { + ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port) + .usePlaintext().build(); + final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); + channelFacade.waitForChannelReady(10000).block(); + } + +}