Skip to content

Commit

Permalink
Merge pull request #450 from grdryn/ic-per-az
Browse files Browse the repository at this point in the history
Use an IngressController per cloud availability zone
  • Loading branch information
grdryn authored Aug 23, 2021
2 parents b5e2708 + 5d0a4cf commit aac437d
Show file tree
Hide file tree
Showing 15 changed files with 625 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.bf2.operator.resources.v1alpha1;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;

@Buildable(
builderPackage = "io.fabric8.kubernetes.api.builder",
editableEnabled = false
)
@ToString
@EqualsAndHashCode
@JsonInclude(value = JsonInclude.Include.NON_NULL)
public class ManagedKafkaRoute {

private String name;
private String prefix;
private String router;

public ManagedKafkaRoute(String name, String prefix, String router) {
this.name = name;
this.prefix = prefix;
this.router = router;
}

public ManagedKafkaRoute() {
this(null, null, null);
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getRouter() {
return router;
}

public void setRouter(String router) {
this.router = router;
}

public String getPrefix() {
return prefix;
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
public class ManagedKafkaStatus {

private List<ManagedKafkaCondition> conditions;
private List<ManagedKafkaRoute> routes;
private ManagedKafkaCapacity capacity;
private Versions versions;
private String adminServerURI;
Expand All @@ -32,6 +33,14 @@ public void setConditions(List<ManagedKafkaCondition> conditions) {
this.conditions = conditions;
}

public List<ManagedKafkaRoute> getRoutes() {
return routes;
}

public void setRoutes(List<ManagedKafkaRoute> routes) {
this.routes = routes;
}

public ManagedKafkaCapacity getCapacity() {
return capacity;
}
Expand Down
5 changes: 4 additions & 1 deletion common/src/main/java/org/bf2/common/OperandUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

public class OperandUtils {

public static final String K8S_NAME_LABEL = "app.kubernetes.io/name";
public static final String MANAGED_BY_LABEL = "app.kubernetes.io/managed-by";
public static final String STRIMZI_OPERATOR_NAME = "strimzi-cluster-operator";
public static final String FLEETSHARD_OPERATOR_NAME = "kas-fleetshard-operator";

/**
Expand All @@ -33,7 +36,7 @@ public static void setAsOwner(HasMetadata owner, HasMetadata resource) {

public static Map<String, String> getDefaultLabels() {
LinkedHashMap<String, String> result = new LinkedHashMap<>(1);
result.put("app.kubernetes.io/managed-by", FLEETSHARD_OPERATOR_NAME);
result.put(MANAGED_BY_LABEL, FLEETSHARD_OPERATOR_NAME);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
import org.bf2.common.ConditionUtils;
import org.bf2.common.ManagedKafkaResourceClient;
import org.bf2.operator.events.ResourceEventSource;
import org.bf2.operator.managers.IngressControllerManager;
import org.bf2.operator.operands.KafkaInstance;
import org.bf2.operator.operands.OperandReadiness;
import org.bf2.operator.resources.v1alpha1.ManagedKafka;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaCapacityBuilder;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition.Reason;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition.Status;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaRoute;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatus;
import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatusBuilder;
import org.bf2.operator.resources.v1alpha1.VersionsBuilder;
import org.jboss.logging.Logger;
import org.jboss.logging.NDC;

import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import java.util.ArrayList;
Expand All @@ -43,6 +46,9 @@ public class ManagedKafkaController implements ResourceController<ManagedKafka>
@Inject
KafkaInstance kafkaInstance;

@Inject
Instance<IngressControllerManager> ingressControllerManagerInstance;

@Override
@Timed(value = "controller.delete", extraTags = {"resource", "ManagedKafka"}, description = "Time spent processing delete events")
@Counted(value = "controller.delete", extraTags = {"resource", "ManagedKafka"}, description = "The number of delete events")
Expand Down Expand Up @@ -131,6 +137,9 @@ private void updateManagedKafkaStatus(ManagedKafka managedKafka) {

ConditionUtils.updateConditionStatus(ready, readiness.getStatus(), readiness.getReason(), readiness.getMessage());

// routes should always be set on the CR status, even if it's just an empty list
status.setRoutes(List.of());

if (Status.True.equals(readiness.getStatus())) {
status.setCapacity(new ManagedKafkaCapacityBuilder(managedKafka.getSpec().getCapacity()).build());
if (!Reason.StrimziUpdating.equals(readiness.getReason())) {
Expand All @@ -140,6 +149,12 @@ private void updateManagedKafkaStatus(ManagedKafka managedKafka) {
}
status.setAdminServerURI(kafkaInstance.getAdminServer().uri(managedKafka));
status.setServiceAccounts(managedKafka.getSpec().getServiceAccounts());

if (ingressControllerManagerInstance.isResolvable()) {
IngressControllerManager ingressControllerManager = ingressControllerManagerInstance.get();
List<ManagedKafkaRoute> routes = ingressControllerManager.getManagedKafkaRoutesFor(managedKafka);
status.setRoutes(routes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.inject.Inject;

import java.util.List;
import java.util.stream.Stream;

@Startup
@ApplicationScoped
Expand Down Expand Up @@ -59,14 +60,14 @@ boolean isOpenShift() {
protected void onStart() {
deploymentInformer = resourceInformerFactory.create(Deployment.class, filter(kubernetesClient.apps().deployments()), eventSource);

serviceInformer = resourceInformerFactory.create(Service.class, filter(kubernetesClient.services()), eventSource);
serviceInformer = resourceInformerFactory.create(Service.class, filterManagedByFleetshardOrStrimzi(kubernetesClient.services()), eventSource);

configMapInformer = resourceInformerFactory.create(ConfigMap.class, filter(kubernetesClient.configMaps()), eventSource);

secretInformer = resourceInformerFactory.create(Secret.class, filter(kubernetesClient.secrets()), eventSource);

if (isOpenShift()) {
routeInformer = resourceInformerFactory.create(Route.class, filter(kubernetesClient.adapt(OpenShiftClient.class).routes()), eventSource);
routeInformer = resourceInformerFactory.create(Route.class, filterManagedByFleetshardOrStrimzi(kubernetesClient.adapt(OpenShiftClient.class).routes()), eventSource);
}
}

Expand All @@ -75,6 +76,11 @@ protected void onStart() {
return mixedOperation.inAnyNamespace().withLabels(OperandUtils.getDefaultLabels());
}

static <T extends HasMetadata> FilterWatchListDeletable<T, ? extends KubernetesResourceList<T>> filterManagedByFleetshardOrStrimzi(
MixedOperation<T, ? extends KubernetesResourceList<T>, ?> mixedOperation) {
return mixedOperation.inAnyNamespace().withLabelIn(OperandUtils.MANAGED_BY_LABEL, OperandUtils.FLEETSHARD_OPERATOR_NAME, OperandUtils.STRIMZI_OPERATOR_NAME);
}

public Kafka getLocalKafka(String namespace, String name) {
return kafkaInformer != null ? kafkaInformer.getByKey(Cache.namespaceKeyFunc(namespace, name)) : null;
}
Expand Down Expand Up @@ -104,6 +110,16 @@ public Route getLocalRoute(String namespace, String name) {
}
}

protected Stream<Route> getRoutesInNamespace(String namespace) {
if (isOpenShift()) {
return routeInformer.getList().stream()
.filter(r -> r.getMetadata().getNamespace().equals(namespace));
} else {
log.warn("Not running on OpenShift cluster, Routes are not available");
return Stream.empty();
}
}

/**
* Create the Kafka informer
* NOTE: it's called when a Strimzi bundle is installed and Kafka related CRDs are available to be listed/watched
Expand Down
Loading

0 comments on commit aac437d

Please sign in to comment.