From 39489653ae3a0dfd4b0011e396c338b31186eb99 Mon Sep 17 00:00:00 2001 From: Gerard Ryan Date: Sat, 10 Jul 2021 23:36:09 +0100 Subject: [PATCH 1/3] Add IngressController creation/reconciliation There are 3 main parts to this change: - Creation/Management of IngressControllers (multi-AZ & per-AZ) - Matching of Routes to IngressControllers - Setting of route mapping in ManagedKafka CR status -- Creation/Management of IngressControllers (multi-AZ & per-AZ) -- This adds an IngressController per AZ, and an additional multi-AZ one. The multi-AZ one will always be created on OpenShift, even if it's a single-AZ cluster. If the "kafka" property is set to "dev", then the IngressControllerManager will not be started, so no IngressControllers will be created/managed. We typically set "kafka=dev" for vanilla Kubernetes. The number of replicas for each IngressController will be 3 (if there are fewer than 3 nodes available in the zone (or in across zones, for the cross-AZ one), then the number of replicas will be whatever the number of nodes is. NOTE: There seems to be some strangeness where when the IngressControllers get created initially, the informer cache seems to be always behind, and keeps getting "onUpdate" events. If the operator gets a fresh IngressController list (or is restarted), then this stops and it goes back to normal. -- Matching of Routes to IngressControllers -- All of the Routes created as a result of the ManagedKafka (there are currently 5: admin server route, bootstrap route, and 3 broker routes), will be labelled such that they are matched to all of the IngressControllers (including the old one that was assumed to exist before this change). This can be seen in the status of the Route objects at run time, where there's an entry for each IngressController. This means that all IngressControllers are able to forward traffic for each Route. We'll create DNS CNAME records so that connections for each Route will go through the optimal IngressController that the operator chooses (see next section). -- Setting of route mapping in ManagedKafka CR status -- This new section in the status is to indicate which IngressController domains should be used in the CNAME DNS records for each Route. The operator will set the mapping as follows: - bootstrap will be handled by the multi-AZ IngressController - admin server will be handled by the multi-AZ IngressController - broker-0 will be handled by the IngressController in the same AZ as the broker-0 Pod - broker-1 will be handled by the IngressController in the same AZ as the broker-1 Pod - broker-2 will be handled by the IngressController in the same AZ as the broker-2 Pod --- .../resources/v1alpha1/ManagedKafkaRoute.java | 54 +++ .../v1alpha1/ManagedKafkaStatus.java | 9 + .../java/org/bf2/common/OperandUtils.java | 5 +- .../controllers/ManagedKafkaController.java | 15 + .../operator/managers/InformerManager.java | 20 +- .../managers/IngressControllerManager.java | 323 ++++++++++++++++++ .../operands/AbstractKafkaCluster.java | 5 +- .../bf2/operator/operands/AdminServer.java | 7 +- .../bf2/operator/operands/KafkaCluster.java | 28 +- .../operands/KafkaInstanceConfiguration.java | 10 + .../org/bf2/operator/operands/Labels.java | 24 ++ operator/src/main/kubernetes/kubernetes.yml | 24 ++ ...IngressControllerManagerPredicateTest.java | 39 +++ .../IngressControllerManagerTest.java | 58 ++++ .../operator/operands/KafkaClusterTest.java | 5 + .../expected/custom-config-strimzi.yml | 11 + .../src/test/resources/expected/strimzi.yml | 11 + 17 files changed, 634 insertions(+), 14 deletions(-) create mode 100644 api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaRoute.java create mode 100644 operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java create mode 100644 operator/src/main/java/org/bf2/operator/operands/Labels.java create mode 100644 operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerPredicateTest.java create mode 100644 operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java diff --git a/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaRoute.java b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaRoute.java new file mode 100644 index 000000000..3f72ba024 --- /dev/null +++ b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaRoute.java @@ -0,0 +1,54 @@ +package org.bf2.operator.resources.v1alpha1; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.sundr.builder.annotations.Buildable; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Buildable( + builderPackage = "io.fabric8.kubernetes.api.builder", + editableEnabled = false +) +@ToString +@EqualsAndHashCode +@JsonInclude(value = JsonInclude.Include.NON_NULL) +public class ManagedKafkaRoute { + + private String name; + private String prefix; + private String router; + + public ManagedKafkaRoute(String name, String prefix, String router) { + this.name = name; + this.prefix = prefix; + this.router = router; + } + + public ManagedKafkaRoute() { + this(null, null, null); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getRouter() { + return router; + } + + public void setRouter(String router) { + this.router = router; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } +} diff --git a/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaStatus.java b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaStatus.java index 7f5111745..d807709c3 100644 --- a/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaStatus.java +++ b/api/src/main/java/org/bf2/operator/resources/v1alpha1/ManagedKafkaStatus.java @@ -18,6 +18,7 @@ public class ManagedKafkaStatus { private List conditions; + private List routes; private ManagedKafkaCapacity capacity; private Versions versions; private String adminServerURI; @@ -32,6 +33,14 @@ public void setConditions(List conditions) { this.conditions = conditions; } + public List getRoutes() { + return routes; + } + + public void setRoutes(List routes) { + this.routes = routes; + } + public ManagedKafkaCapacity getCapacity() { return capacity; } diff --git a/common/src/main/java/org/bf2/common/OperandUtils.java b/common/src/main/java/org/bf2/common/OperandUtils.java index 77a1b991e..34d1d33da 100644 --- a/common/src/main/java/org/bf2/common/OperandUtils.java +++ b/common/src/main/java/org/bf2/common/OperandUtils.java @@ -13,6 +13,9 @@ public class OperandUtils { + public static final String K8S_NAME_LABEL = "app.kubernetes.io/name"; + public static final String MANAGED_BY_LABEL = "app.kubernetes.io/managed-by"; + public static final String STRIMZI_OPERATOR_NAME = "strimzi-cluster-operator"; public static final String FLEETSHARD_OPERATOR_NAME = "kas-fleetshard-operator"; /** @@ -33,7 +36,7 @@ public static void setAsOwner(HasMetadata owner, HasMetadata resource) { public static Map getDefaultLabels() { LinkedHashMap result = new LinkedHashMap<>(1); - result.put("app.kubernetes.io/managed-by", FLEETSHARD_OPERATOR_NAME); + result.put(MANAGED_BY_LABEL, FLEETSHARD_OPERATOR_NAME); return result; } diff --git a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java index 5a3d272ff..c709e696a 100644 --- a/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java +++ b/operator/src/main/java/org/bf2/operator/controllers/ManagedKafkaController.java @@ -11,6 +11,7 @@ import org.bf2.common.ConditionUtils; import org.bf2.common.ManagedKafkaResourceClient; import org.bf2.operator.events.ResourceEventSource; +import org.bf2.operator.managers.IngressControllerManager; import org.bf2.operator.operands.KafkaInstance; import org.bf2.operator.operands.OperandReadiness; import org.bf2.operator.resources.v1alpha1.ManagedKafka; @@ -18,12 +19,14 @@ import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition; import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition.Reason; import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition.Status; +import org.bf2.operator.resources.v1alpha1.ManagedKafkaRoute; import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatus; import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatusBuilder; import org.bf2.operator.resources.v1alpha1.VersionsBuilder; import org.jboss.logging.Logger; import org.jboss.logging.NDC; +import javax.enterprise.inject.Instance; import javax.inject.Inject; import java.util.ArrayList; @@ -43,6 +46,9 @@ public class ManagedKafkaController implements ResourceController @Inject KafkaInstance kafkaInstance; + @Inject + Instance ingressControllerManagerInstance; + @Override @Timed(value = "controller.delete", extraTags = {"resource", "ManagedKafka"}, description = "Time spent processing delete events") @Counted(value = "controller.delete", extraTags = {"resource", "ManagedKafka"}, description = "The number of delete events") @@ -131,6 +137,9 @@ private void updateManagedKafkaStatus(ManagedKafka managedKafka) { ConditionUtils.updateConditionStatus(ready, readiness.getStatus(), readiness.getReason(), readiness.getMessage()); + // routes should always be set on the CR status, even if it's just an empty list + status.setRoutes(List.of()); + if (Status.True.equals(readiness.getStatus())) { status.setCapacity(new ManagedKafkaCapacityBuilder(managedKafka.getSpec().getCapacity()).build()); if (!Reason.StrimziUpdating.equals(readiness.getReason())) { @@ -140,6 +149,12 @@ private void updateManagedKafkaStatus(ManagedKafka managedKafka) { } status.setAdminServerURI(kafkaInstance.getAdminServer().uri(managedKafka)); status.setServiceAccounts(managedKafka.getSpec().getServiceAccounts()); + + if (ingressControllerManagerInstance.isResolvable()) { + IngressControllerManager ingressControllerManager = ingressControllerManagerInstance.get(); + List routes = ingressControllerManager.getManagedKafkaRoutesFor(managedKafka); + status.setRoutes(routes); + } } } } diff --git a/operator/src/main/java/org/bf2/operator/managers/InformerManager.java b/operator/src/main/java/org/bf2/operator/managers/InformerManager.java index b3f631eeb..89cf2d5cc 100644 --- a/operator/src/main/java/org/bf2/operator/managers/InformerManager.java +++ b/operator/src/main/java/org/bf2/operator/managers/InformerManager.java @@ -26,6 +26,7 @@ import javax.inject.Inject; import java.util.List; +import java.util.stream.Stream; @Startup @ApplicationScoped @@ -59,14 +60,14 @@ boolean isOpenShift() { protected void onStart() { deploymentInformer = resourceInformerFactory.create(Deployment.class, filter(kubernetesClient.apps().deployments()), eventSource); - serviceInformer = resourceInformerFactory.create(Service.class, filter(kubernetesClient.services()), eventSource); + serviceInformer = resourceInformerFactory.create(Service.class, filterManagedByFleetshardOrStrimzi(kubernetesClient.services()), eventSource); configMapInformer = resourceInformerFactory.create(ConfigMap.class, filter(kubernetesClient.configMaps()), eventSource); secretInformer = resourceInformerFactory.create(Secret.class, filter(kubernetesClient.secrets()), eventSource); if (isOpenShift()) { - routeInformer = resourceInformerFactory.create(Route.class, filter(kubernetesClient.adapt(OpenShiftClient.class).routes()), eventSource); + routeInformer = resourceInformerFactory.create(Route.class, filterManagedByFleetshardOrStrimzi(kubernetesClient.adapt(OpenShiftClient.class).routes()), eventSource); } } @@ -75,6 +76,11 @@ protected void onStart() { return mixedOperation.inAnyNamespace().withLabels(OperandUtils.getDefaultLabels()); } + static FilterWatchListDeletable> filterManagedByFleetshardOrStrimzi( + MixedOperation, ?> mixedOperation) { + return mixedOperation.inAnyNamespace().withLabelIn(OperandUtils.MANAGED_BY_LABEL, OperandUtils.FLEETSHARD_OPERATOR_NAME, OperandUtils.STRIMZI_OPERATOR_NAME); + } + public Kafka getLocalKafka(String namespace, String name) { return kafkaInformer != null ? kafkaInformer.getByKey(Cache.namespaceKeyFunc(namespace, name)) : null; } @@ -104,6 +110,16 @@ public Route getLocalRoute(String namespace, String name) { } } + protected Stream getRoutesInNamespace(String namespace) { + if (isOpenShift()) { + return routeInformer.getList().stream() + .filter(r -> r.getMetadata().getNamespace().equals(namespace)); + } else { + log.warn("Not running on OpenShift cluster, Routes are not available"); + return Stream.empty(); + } + } + /** * Create the Kafka informer * NOTE: it's called when a Strimzi bundle is installed and Kafka related CRDs are available to be listed/watched diff --git a/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java new file mode 100644 index 000000000..91c9ce39a --- /dev/null +++ b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java @@ -0,0 +1,323 @@ +package org.bf2.operator.managers; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.NodeList; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.api.model.operator.v1.IngressController; +import io.fabric8.openshift.api.model.operator.v1.IngressControllerBuilder; +import io.fabric8.openshift.api.model.operator.v1.IngressControllerList; +import io.fabric8.openshift.client.OpenShiftClient; +import io.quarkus.arc.properties.UnlessBuildProperty; +import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduled; +import io.strimzi.api.kafka.model.Kafka; +import org.bf2.common.OperandUtils; +import org.bf2.common.ResourceInformer; +import org.bf2.common.ResourceInformerFactory; +import org.bf2.operator.operands.AbstractKafkaCluster; +import org.bf2.operator.operands.Labels; +import org.bf2.operator.resources.v1alpha1.ManagedKafka; +import org.bf2.operator.resources.v1alpha1.ManagedKafkaRoute; +import org.jboss.logging.Logger; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Startup +@ApplicationScoped +//excluding during smoke tests (when kafka=dev is set) running on Kubernetes +@UnlessBuildProperty(name = "kafka", stringValue = "dev", enableIfMissing = true) +public class IngressControllerManager { + + /** + * The label key for the multi-AZ IngressController + */ + public static final String KAS_MULTI_ZONE = "managedkafka.bf2.org/kas-multi-zone"; + + /** + * The node label identifying the AZ in which the node resides + */ + public static final String TOPOLOGY_KEY = "topology.kubernetes.io/zone"; + + protected static final String INGRESS_OPERATOR_NAMESPACE = "openshift-ingress-operator"; + + protected static final String INFRA_NODE_LABEL = "node-role.kubernetes.io/infra"; + + protected static final String WORKER_NODE_LABEL = "node-role.kubernetes.io/worker"; + + /** + * Domain part prefixed to domain reported on IngressController status. The CNAME DNS records + * need to point to a sub-domain on the IngressController domain, so we just add this. + */ + private static final String ROUTER_SUBDOMAIN = "ingresscontroller."; + + /** + * Predicate that will return true if the input string looks like a broker resource name. + */ + protected static final Predicate IS_BROKER = Pattern.compile(".+-kafka-\\d+$").asMatchPredicate(); + private static final Predicate IS_BROKER_ROUTE = r -> IS_BROKER.test(r.getMetadata().getName()); + + @Inject + Logger log; + + @Inject + OpenShiftClient openShiftClient; + + @Inject + InformerManager informerManager; + + @Inject + ResourceInformerFactory resourceInformerFactory; + + @Inject + Labels routeMatchLabels; + + ResourceInformer brokerPodInformer; + ResourceInformer nodeInformer; + ResourceInformer ingressControllerInformer; + + public List getManagedKafkaRoutesFor(ManagedKafka mk) { + String multiZoneRoute = getIngressControllerDomain("kas"); + + return Stream.concat( + Stream.of( + new ManagedKafkaRoute("bootstrap", "", multiZoneRoute), + new ManagedKafkaRoute("admin-server", "admin-server", multiZoneRoute)), + routesFor(mk) + .filter(IS_BROKER_ROUTE) + .map(r -> { + String namePrefix = mk.getMetadata().getName() + "-"; + String brokerName = r.getMetadata().getName().replaceFirst(namePrefix, ""); + String router = getIngressControllerDomain("kas-" + getZoneForBrokerRoute(r)); + + return new ManagedKafkaRoute(brokerName, brokerName, router); + })) + .sorted(Comparator.comparing(ManagedKafkaRoute::getName)) + .collect(Collectors.toList()); + } + + @PostConstruct + protected void onStart() { + NonNamespaceOperation> ingressControllers = + openShiftClient.operator().ingressControllers().inNamespace(INGRESS_OPERATOR_NAMESPACE); + + final FilterWatchListDeletable workerNodeFilter = openShiftClient.nodes() + .withLabel(WORKER_NODE_LABEL) + .withoutLabel(INFRA_NODE_LABEL); + nodeInformer = resourceInformerFactory.create(Node.class, workerNodeFilter, null); + + FilterWatchListDeletable brokerPodFilter = openShiftClient.pods().inAnyNamespace().withLabels(Map.of( + OperandUtils.MANAGED_BY_LABEL, OperandUtils.STRIMZI_OPERATOR_NAME, + OperandUtils.K8S_NAME_LABEL, "kafka")); + brokerPodInformer = resourceInformerFactory.create(Pod.class, brokerPodFilter, null); + + ingressControllerInformer = resourceInformerFactory.create(IngressController.class, ingressControllers, new ResourceEventHandler() { + + @Override + public void onAdd(IngressController obj) { + reconcileIngressControllers(); + } + + @Override + public void onUpdate(IngressController oldObj, IngressController newObj) { + reconcileIngressControllers(); + } + + @Override + public void onDelete(IngressController obj, boolean deletedFinalStateUnknown) { + reconcileIngressControllers(); + } + }); + + reconcileIngressControllers(); + } + + @Scheduled(every = "3m", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + void reconcileIngressControllers() { + if (nodeInformer == null || ingressControllerInformer == null) { + log.warn("One or more informers are not yet ready"); + return; + } + + String defaultDomain = ingressControllerInformer.getList().stream() + .filter(ic -> "default".equals(ic.getMetadata().getName())) + .map(ic -> ic.getStatus().getDomain()) + .findFirst() + .orElse("apps.testing.domain.tld"); + + + List zones = nodeInformer.getList().stream() + .filter(node -> node != null && node.getMetadata().getLabels() != null) + .map(node -> node.getMetadata().getLabels().get(TOPOLOGY_KEY)) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + Map zoneToIngressController = new HashMap<>(); + zones.stream().forEach(z -> zoneToIngressController.put(z, ingressControllerInformer.getByKey(Cache.namespaceKeyFunc(INGRESS_OPERATOR_NAMESPACE, "kas-" + z)))); + + List ingressControllers = ingressControllersFrom(zoneToIngressController, defaultDomain); + ingressControllers.add(buildDefaultIngressController(zones, defaultDomain)); + + ingressControllers.stream().forEach(expected -> { + String name = expected.getMetadata().getName(); + + if (ingressControllerInformer.getList().stream().anyMatch(i -> name.equals(i.getMetadata().getName()))) { + openShiftClient.operator().ingressControllers() + .inNamespace(expected.getMetadata().getNamespace()) + .withName(name) + .edit(i -> new IngressControllerBuilder(i) + .editMetadata().withLabels(expected.getMetadata().getLabels()).endMetadata() + .withSpec(expected.getSpec()) + .build()); + } else { + OperandUtils.createOrUpdate(openShiftClient.operator().ingressControllers(), expected); + } + + }); + } + + private List ingressControllersFrom(Map ingressControllers, String defaultDomain) { + return ingressControllers.entrySet().stream().map(e -> { + String zone = e.getKey(); + String kasZone = "kas-" + zone; + String domain = defaultDomain.replaceFirst("apps", kasZone); + int replicas = numReplicasForZone(zone); + + Map routeMatchLabel = Map.of("managedkafka.bf2.org/" + kasZone, "true"); + LabelSelector routeSelector = new LabelSelector(null, routeMatchLabel); + routeMatchLabels.putAll(routeMatchLabel); + + return buildIngressController(kasZone, domain, e.getValue(), replicas, routeSelector, zone); + }).collect(Collectors.toList()); + } + + private IngressController buildDefaultIngressController(List zones, String defaultDomain) { + IngressController existing = ingressControllerInformer.getByKey(Cache.namespaceKeyFunc(INGRESS_OPERATOR_NAMESPACE, "kas")); + int replicas = Math.min(3, Math.toIntExact(zones.stream().map(this::numReplicasForZone).count())); + + final Map routeMatchLabel = Map.of(KAS_MULTI_ZONE, "true"); + LabelSelector routeSelector = new LabelSelector(null, routeMatchLabel); + routeMatchLabels.putAll(routeMatchLabel); + + return buildIngressController("kas", defaultDomain.replaceFirst("apps", "kas"), existing, replicas, routeSelector, null); + } + + private IngressController buildIngressController(String name, String domain, + IngressController existing, int replicas, LabelSelector routeSelector, String topologyValue) { + + IngressControllerBuilder builder = Optional.ofNullable(existing).map(IngressControllerBuilder::new).orElse(new IngressControllerBuilder()); + + builder + .editOrNewMetadata() + .withName(name) + .withNamespace(INGRESS_OPERATOR_NAMESPACE) + .withLabels(OperandUtils.getDefaultLabels()) + .endMetadata() + .editOrNewSpec() + .withDomain(domain) + .withRouteSelector(routeSelector) + .withReplicas(replicas) + .withNewEndpointPublishingStrategy() + .withType("LoadBalancerService") + .withNewLoadBalancer() + .withScope("External") + .withNewProviderParameters() + .withType("AWS") + .withNewAws() + .withType("NLB") + .endAws() + .endProviderParameters() + .endLoadBalancer() + .endEndpointPublishingStrategy() + .endSpec(); + + if (topologyValue != null && !topologyValue.isEmpty()) { + builder + .editSpec() + .withNewNodePlacement() + .editOrNewNodeSelector() + .addToMatchLabels(TOPOLOGY_KEY, topologyValue) + .addToMatchLabels(WORKER_NODE_LABEL, "") + .endNodeSelector() + .endNodePlacement() + .endSpec(); + } + + return builder.build(); + } + + private int numReplicasForZone(String zone) { + int nodesInZone = Math.toIntExact(nodeInformer.getList().stream() + .filter(node -> zone.equals(node.getMetadata().getLabels().get(TOPOLOGY_KEY))) + .filter(Objects::nonNull) + .count()); + + // If there are fewer than 3 worker nodes in a zone we should use that number for the + // replica count, otherwise the IngressController won't be able to get to a healthy state. + return Math.min(3, nodesInZone); + } + + private String getIngressControllerDomain(String ingressControllerName) { + return ingressControllerInformer.getList().stream() + .filter(ic -> ic.getMetadata().getName().equals(ingressControllerName)) + .map(ic -> ROUTER_SUBDOMAIN + (ic.getStatus() != null ? ic.getStatus().getDomain() : ic.getSpec().getDomain())) + .findFirst() + .orElse(""); + } + + private Stream routesFor(ManagedKafka managedKafka) { + return informerManager.getRoutesInNamespace(managedKafka.getMetadata().getNamespace()) + .filter(route -> isOwnedBy(route, Kafka.RESOURCE_KIND, AbstractKafkaCluster.kafkaClusterName(managedKafka), AbstractKafkaCluster.kafkaClusterNamespace(managedKafka)) + || isOwnedBy(route, managedKafka.getKind(), managedKafka.getMetadata().getName(), managedKafka.getMetadata().getNamespace())); + } + + private String getZoneForBrokerRoute(Route route) { + Stream nodes = nodeInformer.getList().stream(); + + String serviceName = route.getSpec().getTo().getName(); + String namespace = route.getMetadata().getNamespace(); + Service svc = informerManager.getLocalService(namespace, serviceName); + + Map labels = svc.getSpec().getSelector(); + Stream pods = brokerPodInformer.getList().stream() + .filter(p -> p.getMetadata().getNamespace().equals(namespace) + && p.getMetadata().getLabels().entrySet().containsAll(labels.entrySet())); + + return pods + .findFirst() + .map(p -> p.getSpec().getNodeName()) + .flatMap(nodeName -> nodes.filter(n -> n.getMetadata().getName().equals(nodeName)).findFirst()) + .map(n -> n.getMetadata().getLabels().get(IngressControllerManager.TOPOLOGY_KEY)) + .orElse(""); + } + + private boolean isOwnedBy(HasMetadata owned, String ownerKind, String ownerName, String ownerNamespace) { + boolean sameNamespace = ownerNamespace.equals(owned.getMetadata().getNamespace()); + return sameNamespace && + owned.getMetadata().getOwnerReferences().stream() + .anyMatch(ref -> ref.getKind().equals(ownerKind) && ref.getName().equals(ownerName)); + } +} diff --git a/operator/src/main/java/org/bf2/operator/operands/AbstractKafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/AbstractKafkaCluster.java index 6674fbea6..146fa8637 100644 --- a/operator/src/main/java/org/bf2/operator/operands/AbstractKafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/AbstractKafkaCluster.java @@ -118,8 +118,8 @@ public OperandReadiness getReadiness(ManagedKafka managedKafka) { public boolean isReconciliationPaused(ManagedKafka managedKafka) { Kafka kafka = cachedKafka(managedKafka); boolean isReconciliationPaused = kafka != null && kafka.getStatus() != null - && hasKafkaCondition(kafka, c -> c.getType() != null && c.getType().equals("ReconciliationPaused") - && c.getStatus().equals("True")); + && hasKafkaCondition(kafka, c -> c.getType() != null && "ReconciliationPaused".equals(c.getType()) + && "True".equals(c.getStatus())); log.tracef("KafkaCluster isReconciliationPaused = %s", isReconciliationPaused); return isReconciliationPaused; } @@ -294,5 +294,4 @@ protected CertSecretSource buildSsoTlsCertSecretSource(ManagedKafka managedKafka .withCertificate("keycloak.crt") .build(); } - } diff --git a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java index e3da311b6..eee0f8dcd 100644 --- a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java +++ b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java @@ -92,6 +92,9 @@ public class AdminServer extends AbstractAdminServer { @Inject protected KafkaInstanceConfiguration config; + @Inject + protected Labels extraLabels; + void onStart(@Observes StartupEvent ev) { if (kubernetesClient.isAdaptable(OpenShiftClient.class)) { openShiftClient = kubernetesClient.adapt(OpenShiftClient.class); @@ -288,6 +291,7 @@ private Map buildLabels(String adminServerName) { private Map buildRouteLabels() { Map labels = OperandUtils.getDefaultLabels(); labels.put("ingressType", "sharded"); + labels.putAll(extraLabels.get()); return labels; } @@ -386,13 +390,12 @@ private List buildServicePorts(ManagedKafka managedKafka) { } private ResourceRequirements buildResources() { - ResourceRequirements resources = new ResourceRequirementsBuilder() + return new ResourceRequirementsBuilder() .addToRequests("memory", CONTAINER_MEMORY_REQUEST) .addToRequests("cpu", CONTAINER_CPU_REQUEST) .addToLimits("memory", CONTAINER_MEMORY_LIMIT) .addToLimits("cpu", CONTAINER_CPU_LIMIT) .build(); - return resources; } @Override diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java index 31e9563f0..3cc675af4 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java @@ -11,6 +11,7 @@ import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.fabric8.kubernetes.api.model.TopologySpreadConstraintBuilder; import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.api.Context; import io.quarkus.arc.DefaultBean; @@ -46,6 +47,7 @@ import io.strimzi.api.kafka.model.template.ZookeeperClusterTemplateBuilder; import org.bf2.common.OperandUtils; import org.bf2.operator.managers.DrainCleanerManager; +import org.bf2.operator.managers.IngressControllerManager; import org.bf2.operator.managers.StrimziManager; import org.bf2.operator.resources.v1alpha1.ManagedKafka; import org.bf2.operator.resources.v1alpha1.Versions; @@ -107,6 +109,9 @@ public class KafkaCluster extends AbstractKafkaCluster { @Inject protected StrimziManager strimziManager; + @Inject + protected Labels extraLabels; + @Override public void createOrUpdate(ManagedKafka managedKafka) { secretManager.createOrUpdate(managedKafka); @@ -305,13 +310,23 @@ private KafkaClusterTemplate buildKafkaTemplate(ManagedKafka managedKafka) { new PodAffinityTermBuilder().withTopologyKey("kubernetes.io/hostname").build() ).build(); + PodTemplateBuilder podTemplateBuilder = new PodTemplateBuilder() + .withAffinity(new AffinityBuilder().withPodAntiAffinity(podAntiAffinity).build()) + .withImagePullSecrets(imagePullSecretManager.getOperatorImagePullSecrets(managedKafka)); + + if (this.config.getKafka().isConstrainSpread()) { + podTemplateBuilder.withTopologySpreadConstraints(new TopologySpreadConstraintBuilder() + .withMaxSkew(1) + .withTopologyKey(IngressControllerManager.TOPOLOGY_KEY) + .withNewLabelSelector() + .withMatchLabels(Map.of("strimzi.io/name", managedKafka.getMetadata().getName() + "-kafka")) + .endLabelSelector() + .withWhenUnsatisfiable("ScheduleAnyway") + .build()); + } + KafkaClusterTemplateBuilder templateBuilder = new KafkaClusterTemplateBuilder() - .withPod(new PodTemplateBuilder() - .withAffinity(new AffinityBuilder() - .withPodAntiAffinity(podAntiAffinity) - .build()) - .withImagePullSecrets(imagePullSecretManager.getOperatorImagePullSecrets(managedKafka)) - .build()); + .withPod(podTemplateBuilder.build()); if (drainCleanerManager.isDrainCleanerWebhookFound()) { templateBuilder.withPodDisruptionBudget( @@ -623,6 +638,7 @@ private Map buildKafkaLabels(ManagedKafka managedKafka) { Map labels = OperandUtils.getDefaultLabels(); this.strimziManager.changeStrimziVersion(managedKafka, this, labels); labels.put("ingressType", "sharded"); + labels.putAll(extraLabels.get()); log.debugf("Kafka %s/%s labels: %s", managedKafka.getMetadata().getNamespace(), managedKafka.getMetadata().getName(), labels); return labels; diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java b/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java index 2dd57aca7..9ee88aa0e 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java @@ -74,6 +74,8 @@ public static class Kafka { protected String jvmXx = JVM_OPTIONS_XX; @JsonProperty("enable-quota") protected boolean enableQuota = true; + @JsonProperty("constrain-spread") + protected boolean constrainSpread = true; @JsonUnwrapped(prefix = "acl.") AccessControl acl = new AccessControl(); @@ -133,6 +135,14 @@ public void setJvmXx(String jvmXx) { this.jvmXx = jvmXx; } + public boolean isConstrainSpread() { + return constrainSpread; + } + + public void setConstrainSpread(boolean constrainSpread) { + this.constrainSpread = constrainSpread; + } + public int getConnectionAttemptsPerSec() { return connectionAttemptsPerSec; } diff --git a/operator/src/main/java/org/bf2/operator/operands/Labels.java b/operator/src/main/java/org/bf2/operator/operands/Labels.java new file mode 100644 index 000000000..80cfcc3e0 --- /dev/null +++ b/operator/src/main/java/org/bf2/operator/operands/Labels.java @@ -0,0 +1,24 @@ +package org.bf2.operator.operands; + +import javax.enterprise.context.ApplicationScoped; + +import java.util.HashMap; +import java.util.Map; + +@ApplicationScoped +public class Labels { + + private volatile Map labels; + + public Labels() { + labels = new HashMap<>(); + } + + public Map get() { + return labels; + } + + public void putAll(Map labels) { + this.labels.putAll(labels); + } +} diff --git a/operator/src/main/kubernetes/kubernetes.yml b/operator/src/main/kubernetes/kubernetes.yml index d82dc3f0d..e65103540 100644 --- a/operator/src/main/kubernetes/kubernetes.yml +++ b/operator/src/main/kubernetes/kubernetes.yml @@ -144,6 +144,30 @@ rules: - watch - patch - update + - apiGroups: + - "" + resources: + # the operator reads pods to get assigned node to get AZ information + - pods + # the operator reads nodes to get AZ information + - nodes + verbs: + - get + - list + - watch + - apiGroups: + - operator.openshift.io + resources: + # the operator creates IngressControllers in the openshift-ingress-operator namespace + - ingresscontrollers + verbs: + - get + - list + - watch + - create + - delete + - patch + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerPredicateTest.java b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerPredicateTest.java new file mode 100644 index 000000000..e836a3ce8 --- /dev/null +++ b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerPredicateTest.java @@ -0,0 +1,39 @@ +package org.bf2.operator.managers; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * This is a separate test class than IngressControllerManagerTest so that no + * needless setup is run for each test. + */ +public class IngressControllerManagerPredicateTest { + + @ParameterizedTest + @ValueSource(strings = { + "my-cluster-kafka-0", + "my-cluster-kafka-1", + "my-cluster-kafka-10", + }) + public void testIsBrokerMatches(String goodInput) { + assertTrue(IngressControllerManager.IS_BROKER.test(goodInput)); + } + + @ParameterizedTest + @ValueSource(strings = { + "kafka-1", + "my-cluster-kafka-", + "my-cluster-kafka-a", + "my-cluster-kafka-1a", + "my-cluster-kafka-a1", + "my-cluster-kafka-bootstrap", + "my-cluster-admin-server", + }) + public void testIsBrokerDoesntMatch(String badInput) { + assertFalse(IngressControllerManager.IS_BROKER.test(badInput)); + } + +} diff --git a/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java new file mode 100644 index 000000000..64acf0eee --- /dev/null +++ b/operator/src/test/java/org/bf2/operator/managers/IngressControllerManagerTest.java @@ -0,0 +1,58 @@ +package org.bf2.operator.managers; + +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.NodeBuilder; +import io.fabric8.openshift.api.model.operator.v1.IngressController; +import io.fabric8.openshift.client.OpenShiftClient; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.kubernetes.client.KubernetesServerTestResource; +import org.junit.jupiter.api.Test; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@QuarkusTestResource(KubernetesServerTestResource.class) +@QuarkusTest +public class IngressControllerManagerTest { + + @Inject + IngressControllerManager ingressControllerManager; + + @Inject + OpenShiftClient openShiftClient; + + @Test + public void testIngressControllerCreationWithNoZones() { + ingressControllerManager.reconcileIngressControllers(); + + List ingressControllers = openShiftClient.operator().ingressControllers().inNamespace(IngressControllerManager.INGRESS_OPERATOR_NAMESPACE).list().getItems(); + assertEquals(1, ingressControllers.size(), "Expected only one IngressController"); + assertEquals("kas", ingressControllers.get(0).getMetadata().getName(), "Expected the IngressController to be named kas"); + assertEquals(0, ingressControllers.get(0).getSpec().getReplicas(), "Expected 0 replicas because there are 0 nodes"); + } + + @Test + public void testIngressControllerCreationWith3Zones() { + + IntStream.range(0, 3).forEach(i -> { + Node node = new NodeBuilder() + .editOrNewMetadata() + .withName("z"+i) + .withLabels(Map.of(IngressControllerManager.WORKER_NODE_LABEL, "", IngressControllerManager.TOPOLOGY_KEY, "zone"+i)) + .endMetadata() + .build(); + openShiftClient.nodes().create(node); + }); + + ingressControllerManager.reconcileIngressControllers(); + + List ingressControllers = openShiftClient.operator().ingressControllers().inNamespace(IngressControllerManager.INGRESS_OPERATOR_NAMESPACE).list().getItems(); + assertEquals(4, ingressControllers.size(), "Expected 4 IngressControllers: one per zone, and one multi-zone"); + } +} diff --git a/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java b/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java index ee6fbc869..10666ec1b 100644 --- a/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java +++ b/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -52,9 +53,13 @@ class KafkaClusterTest { @Inject InformerManager informerManager; + @Inject + Labels labels; + @BeforeEach void beforeEach() { informerManager.createKafkaInformer(); + labels.putAll(Map.of("managedkafka.bf2.org/kas-zone0", "true", "managedkafka.bf2.org/kas-zone1", "true", "managedkafka.bf2.org/kas-zone2", "true")); } @Test diff --git a/operator/src/test/resources/expected/custom-config-strimzi.yml b/operator/src/test/resources/expected/custom-config-strimzi.yml index 603935892..3df5aba8a 100644 --- a/operator/src/test/resources/expected/custom-config-strimzi.yml +++ b/operator/src/test/resources/expected/custom-config-strimzi.yml @@ -6,6 +6,10 @@ metadata: app.kubernetes.io/managed-by: "kas-fleetshard-operator" ingressType: "sharded" managedkafka.bf2.org/strimziVersion: "0.22.1" + managedkafka.bf2.org/kas-multi-zone: "true" + managedkafka.bf2.org/kas-zone0: "true" + managedkafka.bf2.org/kas-zone1: "true" + managedkafka.bf2.org/kas-zone2: "true" name: "test-mk" namespace: "test" ownerReferences: @@ -171,6 +175,13 @@ spec: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - topologyKey: "kubernetes.io/hostname" + topologySpreadConstraints: + - labelSelector: + matchLabels: + strimzi.io/name: test-mk-kafka + maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: ScheduleAnyway zookeeper: replicas: 5 storage: ! diff --git a/operator/src/test/resources/expected/strimzi.yml b/operator/src/test/resources/expected/strimzi.yml index da01bfb9c..5929256bd 100644 --- a/operator/src/test/resources/expected/strimzi.yml +++ b/operator/src/test/resources/expected/strimzi.yml @@ -6,6 +6,10 @@ metadata: app.kubernetes.io/managed-by: "kas-fleetshard-operator" ingressType: "sharded" managedkafka.bf2.org/strimziVersion: "0.22.1" + managedkafka.bf2.org/kas-multi-zone: "true" + managedkafka.bf2.org/kas-zone0: "true" + managedkafka.bf2.org/kas-zone1: "true" + managedkafka.bf2.org/kas-zone2: "true" name: "test-mk" namespace: "test" ownerReferences: @@ -167,6 +171,13 @@ spec: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - topologyKey: "kubernetes.io/hostname" + topologySpreadConstraints: + - labelSelector: + matchLabels: + strimzi.io/name: test-mk-kafka + maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: ScheduleAnyway zookeeper: replicas: 3 storage: ! From bfa920d71f158406ef49938ac2bb31502a8017e9 Mon Sep 17 00:00:00 2001 From: Gerard Ryan Date: Tue, 17 Aug 2021 01:01:56 +0100 Subject: [PATCH 2/3] Add getRouteMatchLabels instead of Labels class --- .../managers/IngressControllerManager.java | 14 ++++++++--- .../bf2/operator/operands/AdminServer.java | 9 +++++-- .../bf2/operator/operands/KafkaCluster.java | 9 +++++-- .../org/bf2/operator/operands/Labels.java | 24 ------------------- .../operator/operands/KafkaClusterTest.java | 11 ++++++--- 5 files changed, 33 insertions(+), 34 deletions(-) delete mode 100644 operator/src/main/java/org/bf2/operator/operands/Labels.java diff --git a/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java index 91c9ce39a..e27bd5865 100644 --- a/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java +++ b/operator/src/main/java/org/bf2/operator/managers/IngressControllerManager.java @@ -25,7 +25,6 @@ import org.bf2.common.ResourceInformer; import org.bf2.common.ResourceInformerFactory; import org.bf2.operator.operands.AbstractKafkaCluster; -import org.bf2.operator.operands.Labels; import org.bf2.operator.resources.v1alpha1.ManagedKafka; import org.bf2.operator.resources.v1alpha1.ManagedKafkaRoute; import org.jboss.logging.Logger; @@ -91,13 +90,20 @@ public class IngressControllerManager { @Inject ResourceInformerFactory resourceInformerFactory; - @Inject - Labels routeMatchLabels; + private volatile Map routeMatchLabels; ResourceInformer brokerPodInformer; ResourceInformer nodeInformer; ResourceInformer ingressControllerInformer; + public Map getRouteMatchLabels() { + return routeMatchLabels; + } + + public void addToRouteMatchLabels(String key, String value) { + routeMatchLabels.put(key, value); + } + public List getManagedKafkaRoutesFor(ManagedKafka mk) { String multiZoneRoute = getIngressControllerDomain("kas"); @@ -120,6 +126,8 @@ public List getManagedKafkaRoutesFor(ManagedKafka mk) { @PostConstruct protected void onStart() { + routeMatchLabels = new HashMap<>(4); + NonNamespaceOperation> ingressControllers = openShiftClient.operator().ingressControllers().inNamespace(INGRESS_OPERATOR_NAMESPACE); diff --git a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java index eee0f8dcd..0b088744b 100644 --- a/operator/src/main/java/org/bf2/operator/operands/AdminServer.java +++ b/operator/src/main/java/org/bf2/operator/operands/AdminServer.java @@ -33,6 +33,7 @@ import io.quarkus.arc.DefaultBean; import io.quarkus.runtime.StartupEvent; import org.bf2.common.OperandUtils; +import org.bf2.operator.managers.IngressControllerManager; import org.bf2.operator.resources.v1alpha1.ManagedKafka; import org.bf2.operator.resources.v1alpha1.ManagedKafkaAuthenticationOAuth; import org.bf2.operator.secrets.ImagePullSecretManager; @@ -42,6 +43,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; +import javax.enterprise.inject.Instance; import javax.inject.Inject; import java.util.ArrayList; @@ -93,7 +95,7 @@ public class AdminServer extends AbstractAdminServer { protected KafkaInstanceConfiguration config; @Inject - protected Labels extraLabels; + protected Instance ingressControllerManagerInstance; void onStart(@Observes StartupEvent ev) { if (kubernetesClient.isAdaptable(OpenShiftClient.class)) { @@ -291,7 +293,10 @@ private Map buildLabels(String adminServerName) { private Map buildRouteLabels() { Map labels = OperandUtils.getDefaultLabels(); labels.put("ingressType", "sharded"); - labels.putAll(extraLabels.get()); + + if (ingressControllerManagerInstance.isResolvable()) { + labels.putAll(ingressControllerManagerInstance.get().getRouteMatchLabels()); + } return labels; } diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java index 3cc675af4..bb3dbeee7 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java @@ -56,6 +56,7 @@ import org.jboss.logging.Logger; import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Instance; import javax.inject.Inject; import javax.xml.bind.DatatypeConverter; @@ -110,7 +111,7 @@ public class KafkaCluster extends AbstractKafkaCluster { protected StrimziManager strimziManager; @Inject - protected Labels extraLabels; + protected Instance ingressControllerManagerInstance; @Override public void createOrUpdate(ManagedKafka managedKafka) { @@ -638,7 +639,11 @@ private Map buildKafkaLabels(ManagedKafka managedKafka) { Map labels = OperandUtils.getDefaultLabels(); this.strimziManager.changeStrimziVersion(managedKafka, this, labels); labels.put("ingressType", "sharded"); - labels.putAll(extraLabels.get()); + + if (ingressControllerManagerInstance.isResolvable()) { + labels.putAll(ingressControllerManagerInstance.get().getRouteMatchLabels()); + } + log.debugf("Kafka %s/%s labels: %s", managedKafka.getMetadata().getNamespace(), managedKafka.getMetadata().getName(), labels); return labels; diff --git a/operator/src/main/java/org/bf2/operator/operands/Labels.java b/operator/src/main/java/org/bf2/operator/operands/Labels.java deleted file mode 100644 index 80cfcc3e0..000000000 --- a/operator/src/main/java/org/bf2/operator/operands/Labels.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.bf2.operator.operands; - -import javax.enterprise.context.ApplicationScoped; - -import java.util.HashMap; -import java.util.Map; - -@ApplicationScoped -public class Labels { - - private volatile Map labels; - - public Labels() { - labels = new HashMap<>(); - } - - public Map get() { - return labels; - } - - public void putAll(Map labels) { - this.labels.putAll(labels); - } -} diff --git a/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java b/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java index 10666ec1b..556fb3fc7 100644 --- a/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java +++ b/operator/src/test/java/org/bf2/operator/operands/KafkaClusterTest.java @@ -22,6 +22,7 @@ import io.strimzi.api.kafka.model.storage.PersistentClaimStorageOverrideBuilder; import org.bf2.operator.managers.DrainCleanerManager; import org.bf2.operator.managers.InformerManager; +import org.bf2.operator.managers.IngressControllerManager; import org.bf2.operator.resources.v1alpha1.ManagedKafka; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,7 +33,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -54,12 +54,17 @@ class KafkaClusterTest { InformerManager informerManager; @Inject - Labels labels; + IngressControllerManager ingressControllerManager; @BeforeEach void beforeEach() { informerManager.createKafkaInformer(); - labels.putAll(Map.of("managedkafka.bf2.org/kas-zone0", "true", "managedkafka.bf2.org/kas-zone1", "true", "managedkafka.bf2.org/kas-zone2", "true")); + + // Make label set more stable + ingressControllerManager.addToRouteMatchLabels("managedkafka.bf2.org/kas-multi-zone", "true"); + ingressControllerManager.addToRouteMatchLabels("managedkafka.bf2.org/kas-zone0", "true"); + ingressControllerManager.addToRouteMatchLabels("managedkafka.bf2.org/kas-zone1", "true"); + ingressControllerManager.addToRouteMatchLabels("managedkafka.bf2.org/kas-zone2", "true"); } @Test From 5d0a4cfd2257d3e6d9b9341b5048aca7f668d699 Mon Sep 17 00:00:00 2001 From: Gerard Ryan Date: Tue, 17 Aug 2021 01:21:54 +0100 Subject: [PATCH 3/3] Remove TopologySpreadConstraint toggle; set DoNotSchedule The property defaults to true, and there currently isn't a scenario where we need to set it to false. Given that there's also no clear consensus on what it should be called, I'm removing it. If it's needed in future, someone else can decide what to call it. :) To explain why we don't ever need to not set a TopologySpreadConstraint at the moment: - In the default case of a cluster with 3 zones, we'd always want to set it so that the brokers are always evenly spread across the 3 zones. - In the case of a single zone cluster, it evenly spreads the 3 brokers across the 1 available zone, which means that all 3 brokers get scheduled to the same zone. Note that this commit also changes the "whenUnsatisfiable" value from "ScheduleAnyway" to "DoNotSchedule". This prevents 2 brokers from being scheduled to the same zone in a 3+ zone cluster, if there's some transient reason that the pods can't be spread across all zones when they're being created (like if there's a node down in one zone, and not enough capacity left among the other nodes in the zone). This is important because it can't be re-balanced across zones easily after initial scheduling, because storage volumes aren't portable across zones (at least in the clouds that we work with so far). --- .../bf2/operator/operands/KafkaCluster.java | 21 ++++++++----------- .../operands/KafkaInstanceConfiguration.java | 10 --------- .../expected/custom-config-strimzi.yml | 8 +++---- .../src/test/resources/expected/strimzi.yml | 6 +++--- 4 files changed, 16 insertions(+), 29 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java index bb3dbeee7..7c852e17c 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java @@ -313,18 +313,15 @@ private KafkaClusterTemplate buildKafkaTemplate(ManagedKafka managedKafka) { PodTemplateBuilder podTemplateBuilder = new PodTemplateBuilder() .withAffinity(new AffinityBuilder().withPodAntiAffinity(podAntiAffinity).build()) - .withImagePullSecrets(imagePullSecretManager.getOperatorImagePullSecrets(managedKafka)); - - if (this.config.getKafka().isConstrainSpread()) { - podTemplateBuilder.withTopologySpreadConstraints(new TopologySpreadConstraintBuilder() - .withMaxSkew(1) - .withTopologyKey(IngressControllerManager.TOPOLOGY_KEY) - .withNewLabelSelector() - .withMatchLabels(Map.of("strimzi.io/name", managedKafka.getMetadata().getName() + "-kafka")) - .endLabelSelector() - .withWhenUnsatisfiable("ScheduleAnyway") - .build()); - } + .withImagePullSecrets(imagePullSecretManager.getOperatorImagePullSecrets(managedKafka)) + .withTopologySpreadConstraints(new TopologySpreadConstraintBuilder() + .withMaxSkew(1) + .withTopologyKey(IngressControllerManager.TOPOLOGY_KEY) + .withNewLabelSelector() + .withMatchLabels(Map.of("strimzi.io/name", managedKafka.getMetadata().getName() + "-kafka")) + .endLabelSelector() + .withWhenUnsatisfiable("DoNotSchedule") + .build()); KafkaClusterTemplateBuilder templateBuilder = new KafkaClusterTemplateBuilder() .withPod(podTemplateBuilder.build()); diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java b/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java index 9ee88aa0e..2dd57aca7 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaInstanceConfiguration.java @@ -74,8 +74,6 @@ public static class Kafka { protected String jvmXx = JVM_OPTIONS_XX; @JsonProperty("enable-quota") protected boolean enableQuota = true; - @JsonProperty("constrain-spread") - protected boolean constrainSpread = true; @JsonUnwrapped(prefix = "acl.") AccessControl acl = new AccessControl(); @@ -135,14 +133,6 @@ public void setJvmXx(String jvmXx) { this.jvmXx = jvmXx; } - public boolean isConstrainSpread() { - return constrainSpread; - } - - public void setConstrainSpread(boolean constrainSpread) { - this.constrainSpread = constrainSpread; - } - public int getConnectionAttemptsPerSec() { return connectionAttemptsPerSec; } diff --git a/operator/src/test/resources/expected/custom-config-strimzi.yml b/operator/src/test/resources/expected/custom-config-strimzi.yml index 3df5aba8a..126ddd6fb 100644 --- a/operator/src/test/resources/expected/custom-config-strimzi.yml +++ b/operator/src/test/resources/expected/custom-config-strimzi.yml @@ -76,7 +76,7 @@ spec: - broker: 2 host: "broker-2-xxx.yyy.zzz" - broker: 3 - host: "broker-3-xxx.yyy.zzz" + host: "broker-3-xxx.yyy.zzz" - name: "oauth" port: 9095 type: "internal" @@ -100,7 +100,7 @@ spec: - name: "sre" port: 9096 type: "internal" - tls: false + tls: false authorization: type: custom authorizerClass: io.bf2.kafka.authorizer.GlobalAclAuthorizer @@ -181,7 +181,7 @@ spec: strimzi.io/name: test-mk-kafka maxSkew: 1 topologyKey: topology.kubernetes.io/zone - whenUnsatisfiable: ScheduleAnyway + whenUnsatisfiable: DoNotSchedule zookeeper: replicas: 5 storage: ! @@ -215,7 +215,7 @@ spec: configMapKeyRef: key: log4j.properties name: test-mk-zookeeper-logging - optional: false + optional: false template: pod: affinity: diff --git a/operator/src/test/resources/expected/strimzi.yml b/operator/src/test/resources/expected/strimzi.yml index 5929256bd..ff2ddad32 100644 --- a/operator/src/test/resources/expected/strimzi.yml +++ b/operator/src/test/resources/expected/strimzi.yml @@ -98,7 +98,7 @@ spec: - name: "sre" port: 9096 type: "internal" - tls: false + tls: false authorization: type: custom authorizerClass: io.bf2.kafka.authorizer.GlobalAclAuthorizer @@ -177,7 +177,7 @@ spec: strimzi.io/name: test-mk-kafka maxSkew: 1 topologyKey: topology.kubernetes.io/zone - whenUnsatisfiable: ScheduleAnyway + whenUnsatisfiable: DoNotSchedule zookeeper: replicas: 3 storage: ! @@ -209,7 +209,7 @@ spec: configMapKeyRef: key: log4j.properties name: test-mk-zookeeper-logging - optional: false + optional: false template: pod: affinity: