From c20a0ff71b1c30d59ac46238e24a287b3fc76704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20G=C3=B6k?= Date: Fri, 18 Oct 2024 23:30:22 +0300 Subject: [PATCH] Refactor KubernetesClient [HZG-225] (#3455) Refactors KubernetesClient and related classes as a preparation for https://hazelcast.atlassian.net/browse/HZG-225. KubernetesClient 1. Improve Javadoc (mainly line wrapping). 2. Decrease the number of constructors from 3 to 2: one taking KubernetesConfig and used in production, and one taking all variables and used in tests. 3. Simplify `buildKubernetesApiUrlProvider()`. 4. Make `extractStsName()` static. 5. Move `extractSts(JsonObject)` to `RuntimeContext#from(JsonObject, String)`. 6. Replace simple `String#format` usages with string concatenation. 7. Remove `StsMonitorThread#latestResourceVersion` since it is already accessible through `latestRuntimeContext.getResourceVersion()`. 8. Refactor `StsMonitorThread#onMessage(String)` and rename it to `onWatchEventReceived`. 9. Refactor and inline `parseStsList(JsonObject)`. RuntimeContext 1. Remove UNKNOWN since it is already defined in ClusterTopologyIntentTracker. StsMonitorTest 1. Refactor `buildSts(...)`. GitOrigin-RevId: 0efb501bd95947faf96678403f92f437e3f2321a --- .../KubernetesApiEndpointResolver.java | 9 +- .../kubernetes/KubernetesClient.java | 320 ++++++++---------- .../hazelcast/kubernetes/RuntimeContext.java | 40 ++- .../kubernetes/KubernetesClientTest.java | 10 +- .../hazelcast/kubernetes/StsMonitorTest.java | 79 ++--- 5 files changed, 203 insertions(+), 255 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesApiEndpointResolver.java b/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesApiEndpointResolver.java index fbfa039e9b73..c33e78b101f4 100644 --- a/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesApiEndpointResolver.java +++ b/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesApiEndpointResolver.java @@ -46,7 +46,7 @@ class KubernetesApiEndpointResolver KubernetesApiEndpointResolver(ILogger logger, KubernetesConfig config, ClusterTopologyIntentTracker tracker) { this(logger, config.getServiceName(), config.getServicePort(), config.getServiceLabelName(), config.getServiceLabelValue(), config.getPodLabelName(), config.getPodLabelValue(), - config.isResolveNotReadyAddresses(), buildKubernetesClient(config, tracker)); + config.isResolveNotReadyAddresses(), new KubernetesClient(config, tracker)); } /** @@ -66,13 +66,6 @@ class KubernetesApiEndpointResolver this.client = client; } - private static KubernetesClient buildKubernetesClient(KubernetesConfig config, ClusterTopologyIntentTracker tracker) { - return new KubernetesClient(config.getNamespace(), config.getKubernetesMasterUrl(), config.getTokenProvider(), - config.getKubernetesCaCertificate(), config.getKubernetesApiRetries(), config.getExposeExternallyMode(), - config.isUseNodeNameAsExternalAddress(), config.getServicePerPodLabelName(), - config.getServicePerPodLabelValue(), tracker); - } - @Override List resolveNodes() { if (serviceName != null && !serviceName.isEmpty()) { diff --git a/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesClient.java b/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesClient.java index c3cc68c5c574..188150c9baef 100644 --- a/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesClient.java +++ b/hazelcast/src/main/java/com/hazelcast/kubernetes/KubernetesClient.java @@ -29,6 +29,7 @@ import com.hazelcast.logging.Logger; import com.hazelcast.spi.exception.RestClientException; import com.hazelcast.spi.utils.RestClient; +import com.hazelcast.spi.utils.RestClient.WatchResponse; import com.hazelcast.spi.utils.RetryUtils; import javax.annotation.Nonnull; @@ -37,7 +38,6 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -45,7 +45,6 @@ import java.util.Objects; import static com.hazelcast.instance.impl.ClusterTopologyIntentTracker.UNKNOWN; -import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -56,9 +55,9 @@ /** * Responsible for connecting to the Kubernetes API. * - * @see Kubernetes API + * @see + * Kubernetes API */ -@SuppressWarnings("checkstyle:methodcount") class KubernetesClient { static final String SERVICE_TYPE_LOADBALANCER = "LoadBalancer"; static final String SERVICE_TYPE_NODEPORT = "NodePort"; @@ -68,7 +67,7 @@ class KubernetesClient { private static final int CONNECTION_TIMEOUT_SECONDS = 10; private static final int READ_TIMEOUT_SECONDS = 10; - private static final List NON_RETRYABLE_KEYWORDS = asList( + private static final List NON_RETRYABLE_KEYWORDS = List.of( "\"reason\":\"Forbidden\"", "\"reason\":\"NotFound\"", "Failure in generating SSLSocketFactory", @@ -98,36 +97,20 @@ class KubernetesClient { private boolean isKnownExceptionAlreadyLogged; private boolean isNodePortWarningAlreadyLogged; - KubernetesClient(String namespace, String kubernetesMaster, KubernetesTokenProvider tokenProvider, - String caCertificate, int retries, ExposeExternallyMode exposeExternallyMode, - boolean useNodeNameAsExternalAddress, String servicePerPodLabelName, - String servicePerPodLabelValue, @Nullable ClusterTopologyIntentTracker clusterTopologyIntentTracker) { - this.namespace = namespace; - this.kubernetesMaster = kubernetesMaster; - this.tokenProvider = tokenProvider; - this.caCertificate = caCertificate; - this.retries = retries; - this.exposeExternallyMode = exposeExternallyMode; - this.useNodeNameAsExternalAddress = useNodeNameAsExternalAddress; - this.servicePerPodLabelName = servicePerPodLabelName; - this.servicePerPodLabelValue = servicePerPodLabelValue; - this.clusterTopologyIntentTracker = clusterTopologyIntentTracker; - if (clusterTopologyIntentTracker != null) { - clusterTopologyIntentTracker.initialize(); - } - this.apiProvider = buildKubernetesApiUrlProvider(); - this.stsName = extractStsName(); - this.stsMonitorThread = (clusterTopologyIntentTracker != null && clusterTopologyIntentTracker.isEnabled()) - ? new StsMonitorThread() : null; + KubernetesClient(KubernetesConfig config, @Nullable ClusterTopologyIntentTracker clusterTopologyIntentTracker) { + this(config.getNamespace(), config.getKubernetesMasterUrl(), config.getTokenProvider(), + config.getKubernetesCaCertificate(), config.getKubernetesApiRetries(), config.getExposeExternallyMode(), + config.isUseNodeNameAsExternalAddress(), config.getServicePerPodLabelName(), + config.getServicePerPodLabelValue(), clusterTopologyIntentTracker, null, null); } - // constructor that allows overriding detected statefulset name for usage in tests - @SuppressWarnings("checkstyle:parameternumber") + // test usage only + @SuppressWarnings("ParameterNumber") KubernetesClient(String namespace, String kubernetesMaster, KubernetesTokenProvider tokenProvider, String caCertificate, int retries, ExposeExternallyMode exposeExternallyMode, - boolean useNodeNameAsExternalAddress, String servicePerPodLabelName, - String servicePerPodLabelValue, @Nullable ClusterTopologyIntentTracker clusterTopologyIntentTracker, - String stsName) { + boolean useNodeNameAsExternalAddress, String servicePerPodLabelName, String servicePerPodLabelValue, + @Nullable ClusterTopologyIntentTracker clusterTopologyIntentTracker, + @Nullable KubernetesApiProvider apiProvider, @Nullable String stsName) { this.namespace = namespace; this.kubernetesMaster = kubernetesMaster; this.tokenProvider = tokenProvider; @@ -141,32 +124,12 @@ class KubernetesClient { if (clusterTopologyIntentTracker != null) { clusterTopologyIntentTracker.initialize(); } - this.apiProvider = buildKubernetesApiUrlProvider(); - this.stsName = stsName; - this.stsMonitorThread = (clusterTopologyIntentTracker != null && clusterTopologyIntentTracker.isEnabled()) + this.apiProvider = apiProvider != null ? apiProvider : buildKubernetesApiUrlProvider(); + this.stsName = stsName != null ? stsName : extractStsName(); + this.stsMonitorThread = clusterTopologyIntentTracker != null && clusterTopologyIntentTracker.isEnabled() ? new StsMonitorThread() : null; } - // test usage only - KubernetesClient(String namespace, String kubernetesMaster, KubernetesTokenProvider tokenProvider, - String caCertificate, int retries, ExposeExternallyMode exposeExternallyMode, - boolean useNodeNameAsExternalAddress, String servicePerPodLabelName, - String servicePerPodLabelValue, KubernetesApiProvider apiProvider) { - this.namespace = namespace; - this.kubernetesMaster = kubernetesMaster; - this.tokenProvider = tokenProvider; - this.caCertificate = caCertificate; - this.retries = retries; - this.exposeExternallyMode = exposeExternallyMode; - this.useNodeNameAsExternalAddress = useNodeNameAsExternalAddress; - this.servicePerPodLabelName = servicePerPodLabelName; - this.servicePerPodLabelValue = servicePerPodLabelValue; - this.apiProvider = apiProvider; - this.stsMonitorThread = null; - this.stsName = extractStsName(); - this.clusterTopologyIntentTracker = null; - } - public void start() { if (stsMonitorThread != null) { stsMonitorThread.start(); @@ -203,18 +166,19 @@ KubernetesApiProvider buildKubernetesApiUrlProvider() { String.format("%s/apis/discovery.k8s.io/v1/namespaces/%s/endpointslices", kubernetesMaster, namespace); callGet(endpointSlicesUrlString); LOGGER.finest("Using EndpointSlices API to discover endpoints."); + return new KubernetesApiEndpointSlicesProvider(); } catch (Exception e) { LOGGER.finest("EndpointSlices are not available, using Endpoints API to discover endpoints."); return new KubernetesApiEndpointProvider(); } - return new KubernetesApiEndpointSlicesProvider(); } /** * Retrieves POD addresses in the specified {@code namespace}. - * * @return all POD addresses - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ List endpoints() { try { @@ -226,13 +190,15 @@ List endpoints() { } /** - * Retrieves POD addresses for all services in the specified {@code namespace} filtered by {@code serviceLabels} - * and {@code serviceLabelValues}. + * Retrieves POD addresses for all services in the specified {@code namespace} filtered by + * {@code serviceLabels} and {@code serviceLabelValues}. * * @param serviceLabels comma separated labels used to filter responses * @param serviceLabelValues comma separated label values used to filter responses * @return all POD addresses from the specified {@code namespace} filtered by the labels - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ List endpointsByServiceLabel(String serviceLabels, String serviceLabelValues) { try { @@ -250,7 +216,9 @@ List endpointsByServiceLabel(String serviceLabels, String serviceLabel * * @param endpointName endpoint name * @return all POD addresses from the specified {@code namespace} and the given {@code endpointName} - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ List endpointsByName(String endpointName) { try { @@ -263,13 +231,15 @@ List endpointsByName(String endpointName) { } /** - * Retrieves POD addresses for all services in the specified {@code namespace} filtered by {@code podLabels} - * and {@code podLabelValues}. + * Retrieves POD addresses for all services in the specified {@code namespace} filtered by + * {@code podLabels} and {@code podLabelValues}. * * @param podLabels comma separated labels used to filter responses * @param podLabelValues comma separated label values used to filter responses * @return all POD addresses from the specified {@code namespace} filtered by the labels - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ List endpointsByPodLabel(String podLabels, String podLabelValues) { try { @@ -289,7 +259,9 @@ List endpointsByPodLabel(String podLabels, String podLabelValues) { * * @param podName POD name * @return zone name - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ String zone(String podName) { String nodeUrlString = String.format("%s/api/v1/nodes/%s", kubernetesMaster, nodeName(podName)); @@ -301,7 +273,9 @@ String zone(String podName) { * * @param podName POD name * @return Node name - * @see Kubernetes Endpoint API + * + * @see + * Kubernetes Endpoint API */ String nodeName(String podName) { String podUrlString = String.format("%s/api/v1/namespaces/%s/pods/%s", kubernetesMaster, namespace, podName); @@ -323,7 +297,7 @@ boolean isNodePortWarningAlreadyLogged() { return isNodePortWarningAlreadyLogged; } - private String extractStsName() { + private static String extractStsName() { String stsName = HostnameUtil.getLocalHostname(); int dashIndex = stsName.lastIndexOf('-'); if (dashIndex > 0) { @@ -332,14 +306,6 @@ private String extractStsName() { return stsName; } - private RuntimeContext extractSts(JsonObject jsonObject) { - int specReplicas = jsonObject.get("spec").asObject().getInt("replicas", UNKNOWN); - int readyReplicas = jsonObject.get("status").asObject().getInt("readyReplicas", UNKNOWN); - String resourceVersion = jsonObject.get("metadata").asObject().getString("resourceVersion", null); - int replicas = jsonObject.get("status").asObject().getInt("currentReplicas", UNKNOWN); - return new RuntimeContext(specReplicas, readyReplicas, replicas, resourceVersion); - } - @Nullable private String extractNodeName(EndpointAddress endpointAddress, Map nodes) { String nodeName = nodes.get(endpointAddress); @@ -356,22 +322,15 @@ private String extractNodeName(EndpointAddress endpointAddress, Map * If it's not possible, then returns the input parameter. *

- * Assigning public IPs must meet one of the following requirements: - *

    - *
  • Each POD must be exposed with a separate LoadBalancer service OR
  • - *
  • Each POD must be exposed with a separate NodePort service and Kubernetes nodes must have external IPs
  • - *
- *

- * The algorithm to fetch public IPs is as follows: - *

    - *
  1. Use Kubernetes API (/endpoints) to find dedicated services for each POD
  2. - *
  3. For each POD: - *
      - *
    • If the corresponding service type is LoadBalancer, It extracts the External IP and Service Port
    • - *
    • If the service type is NodePort, It uses the Kubernetes API (/nodes) to find the External IP of the Node
    • - *
    - *
  4. - *
+ * Assigning public IPs must meet one of the following requirements:
    + *
  • Each POD must be exposed with a separate LoadBalancer service OR + *
  • Each POD must be exposed with a separate NodePort service and Kubernetes nodes must have external IPs + *

+ * The algorithm to fetch public IPs is as follows:

    + *
  1. Use Kubernetes API (/endpoints) to find dedicated services for each POD + *
  2. For each POD: