From 388a328c7f6c1a7df745e51586fa101a01261920 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Tue, 9 Apr 2024 17:32:03 +0900 Subject: [PATCH] Add `KubernetesEndpointGroup` (#5001) Motivation: It is tricky to send requests to a Kubernetes cluster from outside servers without ingress. There is no way to send traffic directly to the pod, but we can send traffic to the port of nodes (NodePort) where the pods are located. This PR proposes a new EndpointGroup that can send requests with CSLB using NodeIP and NodePort to pods in Kubernetes. This way is not an ideal CSLB where servers and clients communicate directly, but it will be a safer way to send traffic without going through ingress which can be SPOF. Modifications: - Add `KubernetesEndpointGroup` on top of `KubernetesClient` to dynamically obtain Kubernetes resources. - [Permission](https://kubernetes.io/docs/reference/access-authn-authz/rbac) to watch `services`, `nodes`, `pods` is required to fetch endpoints. - `service.ports[*].nodePort` is used to create the port of `Endpoint`. - [Watch API](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) is used to track changes in Kubernetes with a minimal delay. - `ADDED` and `MODIFIED` events are used to update resouces. - `DELETED` is used to remove the resouce. - `BOOKMARK` event is not used and `ERROR` may be ignorable. - Test `KubernetesEndpointGroup` with both a real Kubernetes cluster and a mock Kubernetes server. Result: - You can use `KubernetesEndpointGroup` to perform client-side load-balancing when sending requests. - Fixes #4497 ```java // Create a KubernetesEndpointGroup that fetches the endpoints of the 'my-service' service in the 'default' // namespace. The Kubernetes client will be created with the default configuration in the $HOME/.kube/config. KubernetesClient kubernetesClient = new KubernetesClientBuilder().build(); KubernetesEndpointGroup .builder(kubernetesClient) .namespace("default") .serviceName("my-service") .build(); // If you want to use a custom configuration, you can create a KubernetesEndpointGroup as follows: // The custom configuration would be useful when you want to access Kubernetes from outside the cluster. Config config = new ConfigBuilder() .withMasterUrl("https://my-k8s-master") .withOauthToken("my-token") .build(); KubernetesEndpointGroup .builder(config) .namespace("my-namespace") .serviceName("my-service") .build(); ``` --- kubernetes/build.gradle.kts | 1 + .../client/kubernetes/ArmeriaHttpClient.java | 2 +- .../kubernetes/ArmeriaHttpResponse.java | 7 +- .../endpoints/KubernetesEndpointGroup.java | 447 ++++++++++++++++++ .../KubernetesEndpointGroupBuilder.java | 141 ++++++ .../kubernetes/endpoints/package-info.java | 23 + .../client/kubernetes/package-info.java | 2 +- .../ArmeriaHttpInterceptorTest.java | 3 + .../KubernetesAvailableCondition.java | 41 ++ ...ubernetesEndpointGroupIntegrationTest.java | 71 +++ ...KubernetesEndpointGroupMockServerTest.java | 365 ++++++++++++++ .../InternalTestingBlockHoundIntegration.java | 1 + 12 files changed, 1100 insertions(+), 4 deletions(-) create mode 100644 kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java create mode 100644 kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java create mode 100644 kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/package-info.java create mode 100644 kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesAvailableCondition.java create mode 100644 kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupIntegrationTest.java create mode 100644 kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java diff --git a/kubernetes/build.gradle.kts b/kubernetes/build.gradle.kts index 8f3d7f2ee15..ea96167e123 100644 --- a/kubernetes/build.gradle.kts +++ b/kubernetes/build.gradle.kts @@ -3,4 +3,5 @@ dependencies { api(libs.kubernetes.client.impl) testImplementation(variantOf(libs.kubernetes.client.api) { classifier("tests") }) testImplementation(libs.kubernetes.server.mock) + testImplementation(libs.kubernetes.junit.jupiter) } diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClient.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClient.java index 9761090593c..29a7e6a88d1 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClient.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpClient.java @@ -93,7 +93,7 @@ public CompletableFuture> consumeBytesDirect( return splitResponse.headers().thenApply(responseHeaders -> { final AsyncBodySubscriber subscriber = new AsyncBodySubscriber(consumer); splitResponse.body().subscribe(subscriber, ctx.eventLoop()); - return new ArmeriaHttpResponse(responseHeaders, subscriber); + return new ArmeriaHttpResponse(request, responseHeaders, subscriber); }); } diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpResponse.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpResponse.java index c50056df8f4..49f1fe38a6a 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpResponse.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpResponse.java @@ -29,17 +29,20 @@ import io.fabric8.kubernetes.client.http.AsyncBody; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; +import io.fabric8.kubernetes.client.http.StandardHttpRequest; import io.netty.util.AsciiString; final class ArmeriaHttpResponse implements HttpResponse { + private final StandardHttpRequest request; private final ResponseHeaders responseHeaders; private final AsyncBody body; @Nullable private Map> headers; - ArmeriaHttpResponse(ResponseHeaders responseHeaders, AsyncBody body) { + ArmeriaHttpResponse(StandardHttpRequest request, ResponseHeaders responseHeaders, AsyncBody body) { + this.request = request; this.responseHeaders = responseHeaders; this.body = body; } @@ -61,7 +64,7 @@ public AsyncBody body() { @Override public HttpRequest request() { - return null; + return request; } @Override diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java new file mode 100644 index 00000000000..cdaeac87c8d --- /dev/null +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java @@ -0,0 +1,447 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; + +import org.jctools.maps.NonBlockingHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.util.ShutdownHooks; + +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.NodeAddress; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServicePort; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; + +/** + * A {@link DynamicEndpointGroup} that fetches a node IP and a node port for each Pod from Kubernetes. + * + *

Note that the Kubernetes service must have a type of NodePort + * or 'LoadBalancer' + * to expose a node port for client side load balancing. + * + *

{@link KubernetesEndpointGroup} watches the nodes, services and pods in the Kubernetes cluster and updates + * the endpoints, so the credentials in the {@link Config} used to create {@link KubernetesClient} should + * have permission to watch {@code services}, {@code nodes} and {@code pods}. Otherwise, the + * {@link KubernetesEndpointGroup} will not be able to fetch the endpoints. + * + *

For instance, the following RBAC + * configuration is required: + *

{@code
+ * apiVersion: rbac.authorization.k8s.io/v1
+ * kind: ClusterRole
+ * metadata:
+ *   name: my-cluster-role
+ * rules:
+ * - apiGroups: [""]
+ *   resources: ["pods", "services", "nodes"]
+ *   verbs: ["watch"]
+ * }
+ * + *

Example: + *

{@code
+ * // Create a KubernetesEndpointGroup that fetches the endpoints of the 'my-service' service in the 'default'
+ * // namespace. The Kubernetes client will be created with the default configuration in the $HOME/.kube/config.
+ * KubernetesClient kubernetesClient = new KubernetesClientBuilder().build();
+ * KubernetesEndpointGroup
+ *   .builder(kubernetesClient)
+ *   .namespace("default")
+ *   .serviceName("my-service")
+ *   .build();
+ *
+ * // If you want to use a custom configuration, you can create a KubernetesEndpointGroup as follows:
+ * // The custom configuration would be useful when you want to access Kubernetes from outside the cluster.
+ * Config config =
+ *   new ConfigBuilder()
+ *     .withMasterUrl("https://my-k8s-master")
+ *     .withOauthToken("my-token")
+ *     .build();
+ * KubernetesEndpointGroup
+ *   .builder(config)
+ *   .namespace("my-namespace")
+ *   .serviceName("my-service")
+ *   .build();
+ * }
+ */ +@UnstableApi +public final class KubernetesEndpointGroup extends DynamicEndpointGroup { + + private static final Logger logger = LoggerFactory.getLogger(KubernetesEndpointGroup.class); + + private static final KubernetesClient DEFAULT_CLIENT = new KubernetesClientBuilder().build(); + + static { + ShutdownHooks.addClosingTask(DEFAULT_CLIENT); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroup} with the specified {@link KubernetesClient}, + * {@code namespace} and {@code serviceName}. + * + *

Note that the {@link KubernetesClient} will not be automatically closed when the + * {@link KubernetesEndpointGroup} is closed. + */ + public static KubernetesEndpointGroup of(KubernetesClient kubernetesClient, String namespace, + String serviceName) { + return builder(kubernetesClient) + .namespace(namespace) + .serviceName(serviceName) + .build(); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroup} with the specified {@link Config}, + * {@code namespace} and {@code serviceName}. + */ + public static KubernetesEndpointGroup of(Config config, String namespace, String serviceName) { + requireNonNull(config, "config"); + return builder(new KubernetesClientBuilder().withConfig(config).build(), true) + .namespace(namespace) + .serviceName(serviceName) + .build(); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroup} with the specified {@code serviceName}. + * The default configuration in the {@code $HOME/.kube/config} will be used to create a + * {@link KubernetesClient}. + */ + public static KubernetesEndpointGroup of(String serviceName) { + return builder(DEFAULT_CLIENT, false) + .serviceName(serviceName) + .build(); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroup} with the specified {@code namespace} and + * {@code serviceName}. + * The default configuration in the $HOME/.kube/config will be used to create a {@link KubernetesClient}. + */ + public static KubernetesEndpointGroup of(String namespace, String serviceName) { + return builder(DEFAULT_CLIENT, false) + .namespace(namespace) + .serviceName(serviceName) + .build(); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroupBuilder} with the specified + * {@link KubernetesClient}. + * + *

Note that the {@link KubernetesClient} will not be automatically closed when the + * {@link KubernetesEndpointGroup} is closed. + */ + public static KubernetesEndpointGroupBuilder builder(KubernetesClient kubernetesClient) { + return new KubernetesEndpointGroupBuilder(kubernetesClient, false); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroupBuilder} with the specified + * {@link KubernetesClient}. + * + * @param autoClose whether to close the {@link KubernetesClient} when the {@link KubernetesEndpointGroup} + * is closed. + */ + public static KubernetesEndpointGroupBuilder builder(KubernetesClient kubernetesClient, boolean autoClose) { + return new KubernetesEndpointGroupBuilder(kubernetesClient, autoClose); + } + + /** + * Returns a newly created {@link KubernetesEndpointGroupBuilder} with the specified Kubernetes + * {@link Config}. + */ + public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { + requireNonNull(kubeConfig, "kubeConfig"); + return builder(new KubernetesClientBuilder().withConfig(kubeConfig).build(), true); + } + + private final KubernetesClient client; + private final boolean autoClose; + @Nullable + private final String namespace; + private final String serviceName; + @Nullable + private final String portName; + private final Predicate nodeAddressFilter; + + private final Watch nodeWatch; + private final Watch serviceWatch; + @Nullable + private volatile Watch podWatch; + + private final Map podToNode = new NonBlockingHashMap<>(); + private final Map nodeToIp = new NonBlockingHashMap<>(); + @Nullable + private Service service; + @Nullable + private Integer nodePort; + + private volatile boolean closed; + + KubernetesEndpointGroup(KubernetesClient client, @Nullable String namespace, String serviceName, + @Nullable String portName, Predicate nodeAddressFilter, + boolean autoClose, EndpointSelectionStrategy selectionStrategy, + boolean allowEmptyEndpoints, long selectionTimeoutMillis) { + super(selectionStrategy, allowEmptyEndpoints, selectionTimeoutMillis); + this.client = client; + this.namespace = namespace; + this.serviceName = serviceName; + this.portName = portName; + this.nodeAddressFilter = nodeAddressFilter; + this.autoClose = autoClose; + nodeWatch = watchNodes(); + serviceWatch = watchService(); + } + + /** + * Watches the service. {@link Watcher} will retry automatically on failures by {@link KubernetesClient}. + */ + private Watch watchService() { + final Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Service service0) { + if (closed) { + return; + } + + switch (action) { + case ADDED: + case MODIFIED: + final List ports = service0.getSpec().getPorts(); + final Integer nodePort0 = + ports.stream() + .filter(p -> portName == null || portName.equals(p.getName())) + .map(ServicePort::getNodePort) + .filter(Objects::nonNull) + .findFirst().orElse(null); + if (nodePort0 == null) { + if (portName != null) { + logger.warn("No node port matching '{}' in the service: {}", portName, + service0); + } else { + logger.warn( + "No node port in the service. Either 'NodePort' or 'LoadBalancer' " + + "should be set as the type for your Kubernetes service to expose " + + "a node port. type:{}, service:{}", service0.getSpec().getType(), + service0); + } + return; + } + service = service0; + nodePort = nodePort0; + + Watch podWatch0 = podWatch; + if (podWatch0 != null) { + podWatch0.close(); + } + podWatch0 = watchPod(service0.getSpec().getSelector()); + if (closed) { + podWatch0.close(); + } else { + podWatch = podWatch0; + } + break; + case DELETED: + logger.warn("{} service is deleted. (namespace: {})", serviceName, namespace); + // This situation should not occur in production. + break; + case ERROR: + case BOOKMARK: + // Do nothing. + break; + } + } + + @Override + public void onClose(WatcherException cause) { + if (closed) { + return; + } + logger.warn("{} service watcher is closed. (namespace: {})", namespace, serviceName, cause); + } + }; + + if (namespace == null) { + return client.services().withName(serviceName).watch(watcher); + } else { + return client.services().inNamespace(namespace).withName(serviceName).watch(watcher); + } + } + + private Watch watchPod(Map selector) { + final Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Pod resource) { + if (closed) { + return; + } + if (action == Action.ERROR || action == Action.BOOKMARK) { + return; + } + final String podName = resource.getMetadata().getName(); + final String nodeName = resource.getSpec().getNodeName(); + if (podName == null || nodeName == null) { + logger.debug("Pod or node name is null. pod: {}, node: {}", podName, nodeName); + return; + } + + switch (action) { + case ADDED: + case MODIFIED: + podToNode.put(podName, nodeName); + break; + case DELETED: + podToNode.remove(podName); + break; + default: + } + maybeUpdateEndpoints(); + } + + @Override + public void onClose(WatcherException cause) { + if (closed) { + return; + } + + logger.warn("Pod watcher for {}/{} is closed.", namespace, serviceName, cause); + } + }; + + if (namespace == null) { + return client.pods().withLabels(selector).watch(watcher); + } else { + return client.pods().inNamespace(namespace).withLabels(selector).watch(watcher); + } + } + + /** + * Fetches the internal IPs of the node. + */ + private Watch watchNodes() { + final Watcher watcher = new Watcher() { + @Override + public void eventReceived(Action action, Node node) { + if (closed) { + return; + } + + if (action == Action.ERROR || action == Action.BOOKMARK) { + return; + } + + final String nodeName = node.getMetadata().getName(); + switch (action) { + case ADDED: + case MODIFIED: + final String nodeIp = node.getStatus().getAddresses().stream() + .filter(nodeAddressFilter) + .map(NodeAddress::getAddress) + .findFirst().orElse(null); + if (nodeIp == null) { + logger.debug("No matching IP address is found in {}. node: {}", nodeName, node); + nodeToIp.remove(nodeName); + return; + } + nodeToIp.put(nodeName, nodeIp); + break; + case DELETED: + nodeToIp.remove(nodeName); + break; + } + maybeUpdateEndpoints(); + } + + @Override + public void onClose(WatcherException cause) { + if (closed) { + return; + } + logger.warn("Node watcher for {}/{} is closed.", namespace, serviceName, cause); + } + }; + + return client.nodes().watch(watcher); + } + + private void maybeUpdateEndpoints() { + if (service == null) { + // No event received for the service yet. + return; + } + + if (nodeToIp.isEmpty()) { + // No event received for the nodes yet. + return; + } + + if (podToNode.isEmpty()) { + // No event received for the pods yet. + return; + } + + assert nodePort != null; + final List endpoints = + podToNode.values().stream() + .map(nodeName -> { + final String nodeIp = nodeToIp.get(nodeName); + if (nodeIp == null) { + return null; + } + return Endpoint.of(nodeIp, nodePort); + }) + .filter(Objects::nonNull) + .collect(toImmutableList()); + setEndpoints(endpoints); + } + + @Override + protected void doCloseAsync(CompletableFuture future) { + closed = true; + serviceWatch.close(); + nodeWatch.close(); + final Watch podWatch = this.podWatch; + if (podWatch != null) { + podWatch.close(); + } + if (autoClose) { + client.close(); + } + super.doCloseAsync(future); + } +} diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java new file mode 100644 index 00000000000..18f7ed174c3 --- /dev/null +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java @@ -0,0 +1,141 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.time.Duration; +import java.util.function.Predicate; + +import com.google.common.base.Strings; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.AbstractDynamicEndpointGroupBuilder; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; +import com.linecorp.armeria.common.Flags; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; + +import io.fabric8.kubernetes.api.model.NodeAddress; +import io.fabric8.kubernetes.client.KubernetesClient; + +/** + * A builder for creating a new {@link KubernetesEndpointGroup}. + */ +@UnstableApi +public final class KubernetesEndpointGroupBuilder extends AbstractDynamicEndpointGroupBuilder { + + private final KubernetesClient kubernetesClient; + private final boolean autoClose; + private EndpointSelectionStrategy selectionStrategy = EndpointSelectionStrategy.weightedRoundRobin(); + + @Nullable + private String namespace; + @Nullable + private String serviceName; + @Nullable + private String portName; + + private Predicate nodeAddressFilter = nodeAddress -> + "InternalIP".equals(nodeAddress.getType()) && !Strings.isNullOrEmpty(nodeAddress.getAddress()); + + KubernetesEndpointGroupBuilder(KubernetesClient kubernetesClient, boolean autoClose) { + super(Flags.defaultResponseTimeoutMillis()); + allowEmptyEndpoints(false); + this.kubernetesClient = requireNonNull(kubernetesClient, "kubernetesClient"); + this.autoClose = autoClose; + } + + /** + * Sets the namespace + * of a Kubernetes cluster. + */ + public KubernetesEndpointGroupBuilder namespace(String namespace) { + this.namespace = requireNonNull(namespace, "namespace"); + return this; + } + + /** + * Sets the target service + * name from which {@link Endpoint}s should be fetched. + */ + public KubernetesEndpointGroupBuilder serviceName(String serviceName) { + this.serviceName = requireNonNull(serviceName, "serviceName"); + return this; + } + + /** + * Sets the name of the port + * from which NodePort + * should be fetched from. If not set, the first node port will be used. + */ + public KubernetesEndpointGroupBuilder portName(String portName) { + this.portName = requireNonNull(portName, "portName"); + return this; + } + + /** + * Sets the {@link Predicate} to filter the addresses + * of a Kubernetes node. + * The first selected {@link NodeAddress} of a node will be used to create the {@link Endpoint}. + * If unspecified, the default is to select an {@code InternalIP} address that is not empty. + */ + public KubernetesEndpointGroupBuilder nodeAddressFilter(Predicate nodeAddressFilter) { + requireNonNull(nodeAddressFilter, "nodeAddressFilter"); + this.nodeAddressFilter = nodeAddressFilter; + return this; + } + + /** + * Sets the {@link EndpointSelectionStrategy} of the {@link KubernetesEndpointGroupBuilder}. + */ + public KubernetesEndpointGroupBuilder selectionStrategy(EndpointSelectionStrategy selectionStrategy) { + this.selectionStrategy = requireNonNull(selectionStrategy, "selectionStrategy"); + return this; + } + + /** + * Sets whether to allow an empty {@link Endpoint} list. + * If unspecified, the default is {@code false} that disallows an empty {@link Endpoint} list. + */ + @Override + public KubernetesEndpointGroupBuilder allowEmptyEndpoints(boolean allowEmptyEndpoints) { + return (KubernetesEndpointGroupBuilder) super.allowEmptyEndpoints(allowEmptyEndpoints); + } + + @Override + public KubernetesEndpointGroupBuilder selectionTimeout(Duration selectionTimeout) { + return (KubernetesEndpointGroupBuilder) super.selectionTimeout(selectionTimeout); + } + + @Override + public KubernetesEndpointGroupBuilder selectionTimeoutMillis(long selectionTimeoutMillis) { + return (KubernetesEndpointGroupBuilder) super.selectionTimeoutMillis(selectionTimeoutMillis); + } + + /** + * Returns a newly-created {@link KubernetesEndpointGroup} based on the properties of this builder. + */ + public KubernetesEndpointGroup build() { + checkState(serviceName != null, "serviceName not set"); + return new KubernetesEndpointGroup(kubernetesClient, namespace, serviceName, portName, + nodeAddressFilter, autoClose, + selectionStrategy, shouldAllowEmptyEndpoints(), + selectionTimeoutMillis()); + } +} diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/package-info.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/package-info.java new file mode 100644 index 00000000000..f7efd8a03bb --- /dev/null +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * {@link com.linecorp.armeria.client.endpoint.EndpointGroup} for Kubernetes services. + */ +@NonNullByDefault +package com.linecorp.armeria.client.kubernetes.endpoints; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/package-info.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/package-info.java index c57f9e1a230..9472fe65e4a 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/package-info.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/package-info.java @@ -5,7 +5,7 @@ * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpInterceptorTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpInterceptorTest.java index dbddd95120f..354209c7038 100644 --- a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpInterceptorTest.java +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/ArmeriaHttpInterceptorTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -47,6 +48,8 @@ import io.fabric8.kubernetes.client.http.Interceptor; import io.fabric8.mockwebserver.DefaultMockServer; +// Remove @Disabled when https://github.com/fabric8io/kubernetes-client/pull/5852 is merged and released. +@Disabled class ArmeriaHttpInterceptorTest extends AbstractInterceptorTest { private static DefaultMockServer mockServer; diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesAvailableCondition.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesAvailableCondition.java new file mode 100644 index 00000000000..acf61ef598c --- /dev/null +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesAvailableCondition.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +@SuppressWarnings("unused") +final class KubernetesAvailableCondition { + + private static final Logger logger = LoggerFactory.getLogger(KubernetesAvailableCondition.class); + + static boolean isRunning() { + try (KubernetesClient client = new KubernetesClientBuilder().build()) { + client.namespaces().list(); + return true; + } catch (Exception cause) { + logger.trace("Kubernetes is not running", cause); + return false; + } + } + + private KubernetesAvailableCondition() {} +} diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupIntegrationTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupIntegrationTest.java new file mode 100644 index 00000000000..9abd5ebe183 --- /dev/null +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupIntegrationTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newDeployment; +import static com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupMockServerTest.newService; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; + +import io.fabric8.junit.jupiter.api.KubernetesTest; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.KubernetesClient; + +@EnabledIf("com.linecorp.armeria.client.kubernetes.endpoints.KubernetesAvailableCondition#isRunning") +@KubernetesTest +class KubernetesEndpointGroupIntegrationTest { + + private KubernetesClient client; + + @Test + void createEndpointsWithNodeIpAndPort() { + // Use the node port allocated by the k8s cluster + final Service service = newService(null); + final Deployment deployment = newDeployment(); + client.apps().deployments().resource(deployment).create(); + final Service service0 = client.services().resource(service).create(); + + try (KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client) + .serviceName(service0.getMetadata().getName()) + .build()) { + await().untilAsserted(() -> { + assertThat(endpointGroup.whenReady()).isDone(); + assertThat(endpointGroup.endpoints()).isNotEmpty(); + final Integer nodePort = service0.getSpec().getPorts().get(0).getNodePort(); + final List nodeIps = client.nodes().list().getItems().stream().map(node -> { + return node.getStatus().getAddresses().stream() + .filter(address -> "InternalIP".equals(address.getType())) + .findFirst().get().getAddress(); + }).collect(toImmutableList()); + + assertThat(endpointGroup.endpoints()) + .allSatisfy(endpoint -> { + assertThat(endpoint.ipAddr()).isIn(nodeIps); + assertThat(endpoint.port()).isEqualTo(nodePort); + }); + }); + } + } +} diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java new file mode 100644 index 00000000000..b1bbae90299 --- /dev/null +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java @@ -0,0 +1,365 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.annotation.Nullable; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.NodeAddress; +import io.fabric8.kubernetes.api.model.NodeAddressBuilder; +import io.fabric8.kubernetes.api.model.NodeBuilder; +import io.fabric8.kubernetes.api.model.NodeStatus; +import io.fabric8.kubernetes.api.model.NodeStatusBuilder; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.PodTemplateSpec; +import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.api.model.ServicePort; +import io.fabric8.kubernetes.api.model.ServicePortBuilder; +import io.fabric8.kubernetes.api.model.ServiceSpec; +import io.fabric8.kubernetes.api.model.ServiceSpecBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; + +@EnableKubernetesMockClient(crud = true) +class KubernetesEndpointGroupMockServerTest { + + private static KubernetesClient staticClient; + + private KubernetesClient client; + + @AfterAll + static void afterAll() { + // A workaround for the issue that the static client is leaked. + // Remove once https://github.com/fabric8io/kubernetes-client/pull/5854 is released. + staticClient.close(); + } + + @Test + void createEndpointsWithNodeIpAndPort() throws InterruptedException { + // Prepare Kubernetes resources + final List nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2"), newNode("3.3.3.3")); + final Deployment deployment = newDeployment(); + final int nodePort = 30000; + final Service service = newService(nodePort); + final List pods = nodes.stream() + .map(node -> node.getMetadata().getName()) + .map(nodeName -> newPod(deployment.getSpec().getTemplate(), nodeName)) + .collect(toImmutableList()); + + // Create Kubernetes resources + for (Node node : nodes) { + client.nodes().resource(node).create(); + } + client.pods().resource(pods.get(0)).create(); + client.pods().resource(pods.get(1)).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + final KubernetesEndpointGroup endpointGroup = KubernetesEndpointGroup.of(client, "test", + "nginx-service"); + endpointGroup.whenReady().join(); + + // Initial state + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + // Wait until all endpoints are ready + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", nodePort), + Endpoint.of("2.2.2.2", nodePort) + ); + }); + + // Add a new pod + client.pods().resource(pods.get(2)).create(); + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", nodePort), + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort) + ); + }); + + // Remove a pod + client.pods().resource(pods.get(0)).delete(); + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort) + ); + }); + + // Add a new node and a new pod + final Node node4 = newNode("4.4.4.4"); + client.nodes().resource(node4).create(); + final Pod pod4 = newPod(deployment.getSpec().getTemplate(), node4.getMetadata().getName()); + client.pods().resource(pod4).create(); + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort), + Endpoint.of("4.4.4.4", nodePort) + ); + }); + + // Add an empty node + final Node node5 = newNode("5.5.5.5"); + client.nodes().resource(node5).create(); + Thread.sleep(1000); + // A node where no pod is running should not be added to the EndpointGroup + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort), + Endpoint.of("4.4.4.4", nodePort) + ); + }); + } + + @Test + void shouldUsePortNameToGetNodePort() { + final List nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2"), newNode("3.3.3.3")); + final Deployment deployment = newDeployment(); + final int httpNodePort = 30000; + final Service service = newService(httpNodePort); + final List pods = nodes.stream() + .map(node -> node.getMetadata().getName()) + .map(nodeName -> newPod(deployment.getSpec().getTemplate(), nodeName)) + .collect(toImmutableList()); + + final int httpsNodePort = httpNodePort + 1; + final ServiceSpec serviceSpec = + service.getSpec() + .toBuilder() + .withPorts(new ServicePortBuilder() + .withPort(80) + .withNodePort(httpNodePort) + .withName("http") + .build(), + new ServicePortBuilder() + .withPort(443) + .withNodePort(httpsNodePort) + .withName("https") + .build()) + .build(); + final Service service0 = service.toBuilder() + .withSpec(serviceSpec) + .build(); + for (Node node : nodes) { + client.nodes().resource(node).create(); + } + for (Pod pod : pods) { + client.pods().resource(pod).create(); + } + client.apps().deployments().resource(deployment).create(); + client.services().resource(service0).create(); + + final String serviceName = service0.getMetadata().getName(); + try (KubernetesEndpointGroup endpointGroup = KubernetesEndpointGroup.builder(client, false) + .serviceName(serviceName) + .portName("https") + .build()) { + await().untilAsserted(() -> { + assertThat(endpointGroup.whenReady()).isDone(); + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", httpsNodePort), + Endpoint.of("2.2.2.2", httpsNodePort), + Endpoint.of("3.3.3.3", httpsNodePort)); + }); + } + + try (KubernetesEndpointGroup endpointGroup = KubernetesEndpointGroup.builder(client, false) + .serviceName(serviceName) + .portName("http") + .build()) { + await().untilAsserted(() -> { + assertThat(endpointGroup.whenReady()).isDone(); + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("1.1.1.1", httpNodePort), + Endpoint.of("2.2.2.2", httpNodePort), + Endpoint.of("3.3.3.3", httpNodePort)); + }); + } + } + + @Test + void createEndpointsWithNodeExternalIpAndPort() throws InterruptedException { + final List nodes = ImmutableList.of(newNode("1.1.1.1"), + newNode("2.2.2.2", "ExternalIP"), + newNode("3.3.3.3", "ExternalIP")); + final Deployment deployment = newDeployment(); + final int nodePort = 30000; + final Service service = newService(nodePort); + final List pods = nodes.stream() + .map(node -> node.getMetadata().getName()) + .map(nodeName -> newPod(deployment.getSpec().getTemplate(), nodeName)) + .collect(toImmutableList()); + + // Create Kubernetes resources + for (Node node : nodes) { + client.nodes().resource(node).create(); + } + client.pods().resource(pods.get(0)).create(); + client.pods().resource(pods.get(1)).create(); + client.pods().resource(pods.get(2)).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + final KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client) + .namespace("test") + .serviceName("nginx-service") + .nodeAddressFilter( + nodeAddress -> "ExternalIP".equals(nodeAddress.getType())) + .build(); + endpointGroup.whenReady().join(); + + await().untilAsserted(() -> { + final List endpoints = endpointGroup.endpoints(); + assertThat(endpoints).containsExactlyInAnyOrder( + Endpoint.of("2.2.2.2", nodePort), + Endpoint.of("3.3.3.3", nodePort) + ); + }); + endpointGroup.close(); + } + + private static Node newNode(String ip, String type) { + final NodeAddress nodeAddress = new NodeAddressBuilder() + .withType(type) + .withAddress(ip) + .build(); + final NodeStatus nodeStatus = new NodeStatusBuilder() + .withAddresses(nodeAddress) + .build(); + final ObjectMeta metadata = new ObjectMetaBuilder() + .withName("node-" + ip) + .build(); + return new NodeBuilder() + .withMetadata(metadata) + .withStatus(nodeStatus) + .build(); + } + + private static Node newNode(String ip) { + return newNode(ip, "InternalIP"); + } + + static Service newService(@Nullable Integer nodePort) { + final ObjectMeta metadata = new ObjectMetaBuilder() + .withName("nginx-service") + .build(); + final ServicePort servicePort = new ServicePortBuilder() + .withPort(80) + .withNodePort(nodePort) + .build(); + final ServiceSpec serviceSpec = new ServiceSpecBuilder() + .withPorts(servicePort) + .withSelector(ImmutableMap.of("app", "nginx")) + .withType("NodePort") + .build(); + return new ServiceBuilder() + .withMetadata(metadata) + .withSpec(serviceSpec) + .build(); + } + + static Deployment newDeployment() { + final ObjectMeta metadata = new ObjectMetaBuilder() + .withName("nginx-deployment") + .build(); + final LabelSelector selector = new LabelSelectorBuilder() + .withMatchLabels(ImmutableMap.of("app", "nginx")) + .build(); + final DeploymentSpec deploymentSpec = new DeploymentSpecBuilder() + .withReplicas(4) + .withSelector(selector) + .withTemplate(newPodTemplate()) + .build(); + return new DeploymentBuilder() + .withMetadata(metadata) + .withSpec(deploymentSpec) + .build(); + } + + private static PodTemplateSpec newPodTemplate() { + final ObjectMeta metadata = new ObjectMetaBuilder() + .withLabels(ImmutableMap.of("app", "nginx")) + .build(); + final Container container = new ContainerBuilder() + .withName("nginx") + .withImage("nginx:1.14.2") + .withPorts(new ContainerPortBuilder() + .withContainerPort(8080) + .build()) + .build(); + final PodSpec spec = new PodSpecBuilder() + .withContainers(container) + .build(); + return new PodTemplateSpecBuilder() + .withMetadata(metadata) + .withSpec(spec) + .build(); + } + + private static Pod newPod(PodTemplateSpec template, String newNodeName) { + final PodSpec spec = template.getSpec() + .toBuilder() + .withNodeName(newNodeName) + .build(); + final ObjectMeta metadata = template.getMetadata() + .toBuilder() + .withName("nginx-pod-" + newNodeName) + .build(); + return new PodBuilder() + .withMetadata(metadata) + .withSpec(spec) + .build(); + } +} diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java index 753c43428f4..e0361e41a99 100644 --- a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java @@ -74,6 +74,7 @@ public void applyTo(Builder builder) { "com.linecorp.armeria.internal.testing.InternalTestingBlockHoundIntegration", "writeBlockingMethod"); builder.allowBlockingCallsInside("com.linecorp.armeria.client.ClientFactory", "ofDefault"); + builder.allowBlockingCallsInside("io.envoyproxy.controlplane.cache.SimpleCache", "createWatch"); // prints the exception which makes it easier to debug issues builder.blockingMethodCallback(this::writeBlockingMethod);