diff --git a/pom.xml b/pom.xml index 1e5fdcb9d..169563d2d 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ 3.2.5 2 3.2.5 + 6.7.2 ${surefire.rerunFailingTestsCount} @@ -217,6 +218,11 @@ ${com.google.java-format.version} provided + + io.fabric8 + kubernetes-client + ${io.fabric8.client.version} + io.quarkus @@ -249,7 +255,6 @@ junit-jupiter test - diff --git a/src/main/java/io/cryostat/discovery/ContainerDiscovery.java b/src/main/java/io/cryostat/discovery/ContainerDiscovery.java index 48dffc944..ff8085c31 100644 --- a/src/main/java/io/cryostat/discovery/ContainerDiscovery.java +++ b/src/main/java/io/cryostat/discovery/ContainerDiscovery.java @@ -164,19 +164,21 @@ void onStart(@Observes StartupEvent evt) { getClass().getName(), socketPath); return; } - logger.infov("{0} started", getClass().getName()); DiscoveryNode universe = DiscoveryNode.getUniverse(); if (DiscoveryNode.getRealm(getRealm()).isEmpty()) { DiscoveryPlugin plugin = new DiscoveryPlugin(); - DiscoveryNode node = DiscoveryNode.environment(getRealm(), DiscoveryNode.REALM); + DiscoveryNode node = DiscoveryNode.environment(getRealm(), BaseNodeType.REALM); plugin.realm = node; plugin.builtin = true; universe.children.add(node); + node.parent = universe; plugin.persist(); universe.persist(); } + logger.infov("Starting {0} client", getRealm()); + queryContainers(); this.timerId = vertx.setPeriodic(pollPeriod.toMillis(), unused -> queryContainers()); } @@ -185,7 +187,7 @@ void onStop(@Observes ShutdownEvent evt) { if (!enabled()) { return; } - logger.info(String.format("Shutting down %s client", getRealm())); + logger.infov("Shutting down {0} client", getRealm()); vertx.cancelTimer(timerId); } @@ -352,27 +354,34 @@ public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { "PORT", // "AnnotationKey.PORT, Integer.toString(jmxPort))); - DiscoveryNode node = DiscoveryNode.target(target); + DiscoveryNode node = DiscoveryNode.target(target, BaseNodeType.JVM); target.discoveryNode = node; String podName = desc.PodName; if (StringUtils.isNotBlank(podName)) { - DiscoveryNode pod = DiscoveryNode.environment(podName, DiscoveryNode.POD); + DiscoveryNode pod = + DiscoveryNode.environment(podName, ContainerDiscoveryNodeType.POD); if (!realm.children.contains(pod)) { pod.children.add(node); + node.parent = pod; realm.children.add(pod); + pod.parent = realm; } else { pod = DiscoveryNode.getChild( realm, n -> podName.equals(n.name) - && DiscoveryNode.POD.equals(n.nodeType)) + && ContainerDiscoveryNodeType.POD + .getKind() + .equals(n.nodeType)) .orElseThrow(); pod.children.add(node); + node.parent = pod; } pod.persist(); } else { realm.children.add(node); + node.parent = realm; } target.persist(); node.persist(); @@ -381,11 +390,13 @@ public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { Target t = Target.getTargetByConnectUrl(connectUrl); String podName = desc.PodName; if (StringUtils.isNotBlank(podName)) { - DiscoveryNode pod = DiscoveryNode.environment(podName, DiscoveryNode.POD); + DiscoveryNode pod = + DiscoveryNode.environment(podName, ContainerDiscoveryNodeType.POD); pod.children.remove(t.discoveryNode); } else { realm.children.remove(t.discoveryNode); } + t.discoveryNode.parent = null; realm.persist(); t.delete(); } @@ -420,3 +431,25 @@ static record ContainerDetails(Config Config) {} static record Config(String Hostname) {} } + +enum ContainerDiscoveryNodeType implements NodeType { + // represents a container pod managed by Podman + POD("Pod"), + ; + + private final String kind; + + ContainerDiscoveryNodeType(String kind) { + this.kind = kind; + } + + @Override + public String getKind() { + return kind; + } + + @Override + public String toString() { + return getKind(); + } +} diff --git a/src/main/java/io/cryostat/discovery/CustomDiscovery.java b/src/main/java/io/cryostat/discovery/CustomDiscovery.java index 3d3428426..2f7055cdb 100644 --- a/src/main/java/io/cryostat/discovery/CustomDiscovery.java +++ b/src/main/java/io/cryostat/discovery/CustomDiscovery.java @@ -78,10 +78,11 @@ void onStart(@Observes StartupEvent evt) { DiscoveryNode universe = DiscoveryNode.getUniverse(); if (DiscoveryNode.getRealm(REALM).isEmpty()) { DiscoveryPlugin plugin = new DiscoveryPlugin(); - DiscoveryNode node = DiscoveryNode.environment(REALM, DiscoveryNode.REALM); + DiscoveryNode node = DiscoveryNode.environment(REALM, BaseNodeType.REALM); plugin.realm = node; plugin.builtin = true; universe.children.add(node); + node.parent = universe; plugin.persist(); universe.persist(); } @@ -167,11 +168,12 @@ Response doV2Create( target.annotations = new Annotations(); target.annotations.cryostat().putAll(Map.of("REALM", REALM)); - DiscoveryNode node = DiscoveryNode.target(target); + DiscoveryNode node = DiscoveryNode.target(target, BaseNodeType.JVM); target.discoveryNode = node; DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); realm.children.add(node); + node.parent = realm; target.persist(); node.persist(); realm.persist(); @@ -212,6 +214,7 @@ public Response delete(@RestPath long id) throws URISyntaxException { Target target = Target.find("id", id).singleResult(); DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); realm.children.remove(target.discoveryNode); + target.discoveryNode.parent = null; realm.persist(); target.delete(); return Response.noContent().build(); diff --git a/src/main/java/io/cryostat/discovery/Discovery.java b/src/main/java/io/cryostat/discovery/Discovery.java index 2b2aa99a6..f9310ef6f 100644 --- a/src/main/java/io/cryostat/discovery/Discovery.java +++ b/src/main/java/io/cryostat/discovery/Discovery.java @@ -195,7 +195,6 @@ public Response register(@Context RoutingContext ctx, JsonObject body) // TODO apply URI range validation to the remote address InetAddress remoteAddress = getRemoteAddress(ctx); - URI location; DiscoveryPlugin plugin; if (StringUtils.isNotBlank(pluginId) && StringUtils.isNotBlank(priorToken)) { @@ -217,7 +216,7 @@ public Response register(@Context RoutingContext ctx, JsonObject body) plugin.callback = callbackUri; plugin.realm = DiscoveryNode.environment( - requireNonBlank(realmName, "realm"), DiscoveryNode.REALM); + requireNonBlank(realmName, "realm"), BaseNodeType.REALM); plugin.builtin = false; plugin.persist(); diff --git a/src/main/java/io/cryostat/discovery/DiscoveryNode.java b/src/main/java/io/cryostat/discovery/DiscoveryNode.java index 46ccf2456..828eae17e 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryNode.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryNode.java @@ -25,6 +25,7 @@ import io.cryostat.targets.Target; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonView; @@ -38,6 +39,8 @@ import jakarta.persistence.Entity; import jakarta.persistence.EntityListeners; import jakarta.persistence.FetchType; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.ManyToOne; import jakarta.persistence.OneToMany; import jakarta.persistence.OneToOne; import jakarta.persistence.PostPersist; @@ -55,9 +58,6 @@ public class DiscoveryNode extends PanacheEntity { public static final String NODE_TYPE = "nodeType"; - public static final String UNIVERSE = "Universe"; - public static final String REALM = "Realm"; - public static final String POD = "Pod"; @Column(unique = false, nullable = false, updatable = false) @JsonView(Views.Flat.class) @@ -74,11 +74,17 @@ public class DiscoveryNode extends PanacheEntity { @JsonView(Views.Flat.class) public Map labels = new HashMap<>(); - @OneToMany(fetch = FetchType.LAZY, orphanRemoval = true) + @OneToMany(fetch = FetchType.LAZY, orphanRemoval = true, mappedBy = "parent") @JsonView(Views.Nested.class) @Nullable public List children = new ArrayList<>(); + @Nullable + @ManyToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "parentNode") + @JsonIgnore + public DiscoveryNode parent; + @OneToOne( mappedBy = "discoveryNode", cascade = {CascadeType.ALL}, @@ -94,10 +100,15 @@ public int hashCode() { return Objects.hash(id, name, nodeType, labels, children, target); } + public boolean hasChildren() { + return !children.isEmpty(); + } + public static DiscoveryNode getUniverse() { - return DiscoveryNode.find(NODE_TYPE, UNIVERSE) + return DiscoveryNode.find(NODE_TYPE, BaseNodeType.UNIVERSE.getKind()) .singleResultOptional() - .orElseGet(() -> environment(UNIVERSE, UNIVERSE)); + .orElseGet( + () -> environment(BaseNodeType.UNIVERSE.toString(), BaseNodeType.UNIVERSE)); } public static Optional getRealm(String name) { @@ -109,10 +120,19 @@ public static Optional getChild( return node.children.stream().filter(predicate).findFirst(); } - public static DiscoveryNode environment(String name, String nodeType) { + public static Optional getNode(Predicate predicate) { + List nodes = listAll(); + return nodes.stream().filter(predicate).findFirst(); + } + + public static List findAllByNodeType(NodeType nodeType) { + return DiscoveryNode.find(DiscoveryNode.NODE_TYPE, nodeType.getKind()).list(); + } + + public static DiscoveryNode environment(String name, NodeType nodeType) { DiscoveryNode node = new DiscoveryNode(); node.name = name; - node.nodeType = nodeType; + node.nodeType = nodeType.getKind(); node.labels = new HashMap<>(); node.children = new ArrayList<>(); node.target = null; @@ -120,10 +140,10 @@ public static DiscoveryNode environment(String name, String nodeType) { return node; } - public static DiscoveryNode target(Target target) { + public static DiscoveryNode target(Target target, NodeType nodeType) { DiscoveryNode node = new DiscoveryNode(); node.name = target.connectUrl.toString(); - node.nodeType = "JVM"; + node.nodeType = nodeType.getKind(); node.labels = new HashMap<>(target.labels); node.children = null; node.target = target; diff --git a/src/main/java/io/cryostat/discovery/JDPDiscovery.java b/src/main/java/io/cryostat/discovery/JDPDiscovery.java index fd203863f..4f057d959 100644 --- a/src/main/java/io/cryostat/discovery/JDPDiscovery.java +++ b/src/main/java/io/cryostat/discovery/JDPDiscovery.java @@ -70,10 +70,11 @@ void onStart(@Observes StartupEvent evt) { DiscoveryNode universe = DiscoveryNode.getUniverse(); if (DiscoveryNode.getRealm(REALM).isEmpty()) { DiscoveryPlugin plugin = new DiscoveryPlugin(); - DiscoveryNode node = DiscoveryNode.environment(REALM, DiscoveryNode.REALM); + DiscoveryNode node = DiscoveryNode.environment(REALM, BaseNodeType.REALM); plugin.realm = node; plugin.builtin = true; universe.children.add(node); + node.parent = universe; plugin.persist(); universe.persist(); } @@ -139,10 +140,11 @@ public synchronized void handleJdpEvent(JvmDiscoveryEvent evt) { "PORT", // "AnnotationKey.PORT, Integer.toString(rmiTarget.getPort()))); - DiscoveryNode node = DiscoveryNode.target(target); + DiscoveryNode node = DiscoveryNode.target(target, BaseNodeType.JVM); target.discoveryNode = node; realm.children.add(node); + node.parent = realm; target.persist(); node.persist(); realm.persist(); @@ -150,6 +152,7 @@ public synchronized void handleJdpEvent(JvmDiscoveryEvent evt) { case LOST: Target t = Target.getTargetByConnectUrl(connectUrl); realm.children.remove(t.discoveryNode); + t.discoveryNode.parent = null; realm.persist(); t.delete(); break; diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java new file mode 100644 index 000000000..87ecfd356 --- /dev/null +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -0,0 +1,695 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +import java.net.URI; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.management.remote.JMXServiceURL; + +import io.cryostat.core.sys.FileSystem; +import io.cryostat.targets.Target; +import io.cryostat.targets.Target.Annotations; + +import io.fabric8.kubernetes.api.model.EndpointAddress; +import io.fabric8.kubernetes.api.model.EndpointPort; +import io.fabric8.kubernetes.api.model.EndpointSubset; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectReference; +import io.fabric8.kubernetes.api.model.OwnerReference; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.persistence.NoResultException; +import jakarta.transaction.Transactional; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.commons.lang3.concurrent.LazyInitializer; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; + +@ApplicationScoped +public class KubeApiDiscovery { + public static final String REALM = "KubernetesApi"; + + public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace"; + + @Inject Logger logger; + + @Inject KubeConfig kubeConfig; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.enabled") + boolean enabled; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.port-names") + Optional> jmxPortNames; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.port-numbers") + Optional> jmxPortNumbers; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") + Duration informerResyncPeriod; + + private final Map, Pair> + discoveryNodeCache = new ConcurrentHashMap<>(); + private final Map, Object> queryLocks = + new ConcurrentHashMap<>(); + + private final LazyInitializer>> nsInformers = + new LazyInitializer>>() { + @Override + protected HashMap> initialize() + throws ConcurrentException { + // TODO: add support for some wildcard indicating a single Informer for any + // namespace that Cryostat has permissions to. This will need some restructuring + // of how the namespaces within the discovery tree are mapped. + var result = new HashMap>(); + kubeConfig + .getWatchNamespaces() + .forEach( + ns -> { + result.put( + ns, + client().endpoints() + .inNamespace(ns) + .inform( + new EndpointsHandler(), + informerResyncPeriod.toMillis())); + logger.infov( + "Started Endpoints SharedInformer for" + + " namespace \"{0}\"", + ns); + }); + return result; + } + }; + + // Priority is set higher than default 0 such that onStart is called first before onAfterStart + // This ensures realm node is persisted before initializing informers + @Transactional + void onStart(@Observes @Priority(1) StartupEvent evt) { + if (!enabled()) { + return; + } + + if (!available()) { + logger.errorv("{0} enabled but environment is not Kubernetes!", getClass().getName()); + return; + } + + DiscoveryNode universe = DiscoveryNode.getUniverse(); + if (DiscoveryNode.getRealm(REALM).isEmpty()) { + DiscoveryPlugin plugin = new DiscoveryPlugin(); + DiscoveryNode node = DiscoveryNode.environment(REALM, BaseNodeType.REALM); + plugin.realm = node; + plugin.builtin = true; + universe.children.add(node); + node.parent = universe; + plugin.persist(); + universe.persist(); + } + + logger.infov("Starting {0} client", REALM); + } + + @Transactional + void onAfterStart(@Observes StartupEvent evt) { + safeGetInformers(); + } + + void onStop(@Observes ShutdownEvent evt) { + if (!(enabled() && available())) { + return; + } + + logger.infov("Shutting down {0} client", REALM); + safeGetInformers() + .forEach( + (ns, informer) -> { + informer.close(); + logger.infov( + "Closed Endpoints SharedInformer for namespace \"{0}\"", ns); + }); + } + + boolean enabled() { + return enabled; + } + + boolean available() { + try { + boolean hasNamespace = StringUtils.isNotBlank(kubeConfig.getOwnNamespace()); + return kubeConfig.kubeApiAvailable() && hasNamespace; + } catch (Exception e) { + logger.info(e); + } + return false; + } + + KubernetesClient client() { + return kubeConfig.kubeClient(); + } + + private boolean isCompatiblePort(EndpointPort port) { + return jmxPortNames.orElse(List.of()).contains(port.getName()) + || jmxPortNumbers.orElse(List.of()).contains(port.getPort()); + } + + private List getTargetTuplesFrom(Endpoints endpoints) { + return tuplesFromEndpoints(endpoints).stream() + .filter( + (ref) -> { + return Objects.nonNull(ref) && isCompatiblePort(ref.port); + }) + .collect(Collectors.toList()); + } + + private Map> safeGetInformers() { + Map> informers; + try { + informers = nsInformers.get(); + } catch (ConcurrentException e) { + throw new IllegalStateException(e); + } + return informers; + } + + public void handleEndpointEvent(String namespace) { + QuarkusTransaction.joiningExisting() + .run( + () -> { + DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + DiscoveryNode nsNode = + DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) + .orElse( + DiscoveryNode.environment( + namespace, + KubeDiscoveryNodeType.NAMESPACE)); + + try { + List targetNodes = + DiscoveryNode.findAllByNodeType( + KubeDiscoveryNodeType.ENDPOINT) + .stream() + .filter( + (n) -> + namespace.equals( + n.labels.get( + DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + Map targetRefMap = new HashMap<>(); + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .collect(Collectors.toList()) + .forEach( + (tuple) -> + targetRefMap.put( + tuple.toTarget(), tuple.objRef)); + + Set persistedTargets = + targetNodes.stream() + .map(node -> node.target) + .collect(Collectors.toSet()); + Set observedTargets = targetRefMap.keySet(); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + buildOwnerChain( + nsNode, t, targetRefMap.get(t))); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach((t) -> pruneOwnerChain(nsNode, t)); + + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; + } + realm.persist(); + } catch (Exception e) { + logger.warn("Endpoint handler exception", e); + } finally { + discoveryNodeCache.clear(); + queryLocks.clear(); + } + }); + } + + private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { + if (!target.isPersistent()) { + logger.infov( + "Target with serviceURL {0} does not exist in discovery tree. Skipped deleting", + target.connectUrl); + return; + } + + DiscoveryNode child = target.discoveryNode; + while (true) { + DiscoveryNode parent = child.parent; + + if (parent == null) { + break; + } + + parent.children.remove(child); + child.parent = null; + parent.persist(); + + if (parent.hasChildren() + || parent.nodeType.equals(KubeDiscoveryNodeType.NAMESPACE.getKind())) { + break; + } + + child = parent; + } + + nsNode.persist(); + target.delete(); + } + + private void buildOwnerChain(DiscoveryNode nsNode, Target target, ObjectReference targetRef) { + if (target.isPersistent()) { + logger.infov( + "Target with serviceURL {0} already exist in discovery tree. Skipped adding", + target.connectUrl); + return; + } + String targetKind = targetRef.getKind(); + KubeDiscoveryNodeType targetType = KubeDiscoveryNodeType.fromKubernetesKind(targetKind); + + DiscoveryNode targetNode = DiscoveryNode.target(target, KubeDiscoveryNodeType.ENDPOINT); + target.discoveryNode = targetNode; + target.persist(); + + if (targetType == KubeDiscoveryNodeType.POD) { + // if the Endpoint points to a Pod, chase the owner chain up as far as possible, then + // add that to the Namespace + + Pair pod = + discoveryNodeCache.computeIfAbsent( + cacheKey(targetRef.getNamespace(), targetRef), this::queryForNode); + + pod.getRight().children.add(targetNode); + targetNode.parent = pod.getRight(); + pod.getRight().persist(); + + Pair child = pod; + while (true) { + Pair owner = getOwnerNode(child); + if (owner == null) { + break; + } + + DiscoveryNode ownerNode = owner.getRight(); + DiscoveryNode childNode = child.getRight(); + + if (!ownerNode.children.contains(childNode)) { + ownerNode.children.add(childNode); + } + childNode.parent = ownerNode; + + ownerNode.persist(); + childNode.persist(); + + child = owner; + } + + nsNode.children.add(child.getRight()); + child.getRight().parent = nsNode; + } else { + // if the Endpoint points to something else(?) than a Pod, just add the target straight + // to the Namespace + nsNode.children.add(targetNode); + targetNode.parent = nsNode; + targetNode.persist(); + } + + nsNode.persist(); + } + + private Pair getOwnerNode(Pair child) { + HasMetadata childRef = child.getLeft(); + if (childRef == null) { + return null; + } + List owners = childRef.getMetadata().getOwnerReferences(); + // Take first "expected" owner Kind from NodeTypes, or if none, simply use the first owner. + // If there are no owners then return null to signify this and break the chain + if (owners.isEmpty()) { + return null; + } + String namespace = childRef.getMetadata().getNamespace(); + OwnerReference owner = + owners.stream() + .filter(o -> KubeDiscoveryNodeType.fromKubernetesKind(o.getKind()) != null) + .findFirst() + .orElse(owners.get(0)); + return discoveryNodeCache.computeIfAbsent(cacheKey(namespace, owner), this::queryForNode); + } + + private Triple cacheKey(String ns, OwnerReference resource) { + return Triple.of(ns, resource.getKind(), resource.getName()); + } + + // Unfortunately, ObjectReference and OwnerReference both independently implement getKind and + // getName - they don't come from a common base class. + private Triple cacheKey(String ns, ObjectReference resource) { + return Triple.of(ns, resource.getKind(), resource.getName()); + } + + private Pair queryForNode( + Triple lookupKey) { + + String namespace = lookupKey.getLeft(); + String name = lookupKey.getRight(); + KubeDiscoveryNodeType nodeType = + KubeDiscoveryNodeType.fromKubernetesKind(lookupKey.getMiddle()); + if (nodeType == null) { + return null; + } + + synchronized (queryLocks.computeIfAbsent(lookupKey, k -> new Object())) { + HasMetadata kubeObj = + nodeType.getQueryFunction().apply(client()).apply(namespace).apply(name); + + DiscoveryNode node = + DiscoveryNode.getNode( + n -> { + return nodeType.getKind().equals(n.nodeType) + && name.equals(n.name) + && namespace.equals( + n.labels.get( + DISCOVERY_NAMESPACE_LABEL_KEY)); + }) + .orElseGet( + () -> { + DiscoveryNode newNode = new DiscoveryNode(); + newNode.name = name; + newNode.nodeType = nodeType.getKind(); + newNode.children = new ArrayList<>(); + newNode.target = null; + newNode.labels = + kubeObj != null + ? kubeObj.getMetadata().getLabels() + : new HashMap<>(); + // Add namespace to label to retrieve node later + newNode.labels.put( + DISCOVERY_NAMESPACE_LABEL_KEY, namespace); + return newNode; + }); + return Pair.of(kubeObj, node); + } + } + + @ApplicationScoped + static final class KubeConfig { + private static final String OWN_NAMESPACE = "."; + + @Inject Logger logger; + @Inject FileSystem fs; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.namespaces") + Optional> watchNamespaces; + + @ConfigProperty(name = "kubernetes.service.host") + Optional serviceHost; + + @ConfigProperty(name = "cryostat.discovery.kubernetes.namespace-path") + String namespacePath; + + private final KubernetesClient kubeClient = + new KubernetesClientBuilder() + .withTaskExecutor(Infrastructure.getDefaultWorkerPool()) + .build(); + + Collection getWatchNamespaces() { + return watchNamespaces.orElse(List.of()).stream() + .map( + n -> { + if (OWN_NAMESPACE.equals(n)) { + return getOwnNamespace(); + } + return n; + }) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toSet()); + } + + String getOwnNamespace() { + try { + return fs.readString(Path.of(namespacePath)); + } catch (Exception e) { + logger.trace(e); + return null; + } + } + + boolean kubeApiAvailable() { + return StringUtils.isNotBlank(serviceHost.orElse("")); + } + + KubernetesClient kubeClient() { + return kubeClient; + } + } + + private final class EndpointsHandler implements ResourceEventHandler { + @Override + public void onAdd(Endpoints endpoints) { + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + handleEndpointEvent(endpoints.getMetadata().getNamespace()); + } + + @Override + public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), + newEndpoints.getMetadata().getNamespace()); + handleEndpointEvent(newEndpoints.getMetadata().getNamespace()); + } + + @Override + public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); + return; + } + handleEndpointEvent(endpoints.getMetadata().getNamespace()); + } + } + + List tuplesFromEndpoints(Endpoints endpoints) { + List tts = new ArrayList<>(); + for (EndpointSubset subset : endpoints.getSubsets()) { + for (EndpointPort port : subset.getPorts()) { + for (EndpointAddress addr : subset.getAddresses()) { + tts.add(new TargetTuple(addr.getTargetRef(), addr, port)); + } + } + } + return tts; + } + + private class TargetTuple { + ObjectReference objRef; + EndpointAddress addr; + EndpointPort port; + + TargetTuple(ObjectReference objRef, EndpointAddress addr, EndpointPort port) { + this.objRef = objRef; + this.addr = addr; + this.port = port; + } + + public Target toTarget() { + try { + String ip = addr.getIp().replaceAll("\\.", "-"); + String namespace = objRef.getNamespace(); + + boolean isPod = objRef.getKind().equals(KubeDiscoveryNodeType.POD.getKind()); + + String host = String.format("%s.%s", ip, namespace); + if (isPod) { + host = String.format("%s.pod", host); + } + + JMXServiceURL jmxUrl = + new JMXServiceURL( + "rmi", + "", + 0, + "/jndi/rmi://" + host + ':' + port.getPort() + "/jmxrmi"); + URI connectUrl = URI.create(jmxUrl.toString()); + + try { + Target persistedTarget = Target.getTargetByConnectUrl(connectUrl); + DiscoveryNode targetNode = persistedTarget.discoveryNode; + if (!targetNode.nodeType.equals(KubeDiscoveryNodeType.ENDPOINT.getKind())) { + logger.warnv( + "Expected persisted target with serviceURL {0} to have node type" + + " {1} but found {2} ", + persistedTarget.connectUrl, + KubeDiscoveryNodeType.ENDPOINT.getKind(), + targetNode.nodeType); + throw new IllegalStateException(); + } + return persistedTarget; + } catch (NoResultException e) { + } + + Pair pair = + discoveryNodeCache.computeIfAbsent( + cacheKey(namespace, objRef), KubeApiDiscovery.this::queryForNode); + + HasMetadata obj = pair.getLeft(); + + Target target = new Target(); + target.activeRecordings = new ArrayList<>(); + target.connectUrl = connectUrl; + target.alias = objRef.getName(); + target.labels = obj != null ? obj.getMetadata().getLabels() : new HashMap<>(); + target.annotations = new Annotations(); + target.annotations + .platform() + .putAll(obj != null ? obj.getMetadata().getAnnotations() : Map.of()); + target.annotations + .cryostat() + .putAll( + Map.of( + "REALM", + REALM, + "HOST", + addr.getIp(), + "PORT", + Integer.toString(port.getPort()), + "NAMESPACE", + objRef.getNamespace(), + isPod ? "POD_NAME" : "OBJECT_NAME", + objRef.getName())); + + return target; + } catch (Exception e) { + logger.warn("Target conversion exception", e); + return null; + } + } + } +} + +enum KubeDiscoveryNodeType implements NodeType { + NAMESPACE("Namespace"), + STATEFULSET( + "StatefulSet", + c -> ns -> n -> c.apps().statefulSets().inNamespace(ns).withName(n).get()), + DAEMONSET("DaemonSet", c -> ns -> n -> c.apps().daemonSets().inNamespace(ns).withName(n).get()), + DEPLOYMENT( + "Deployment", c -> ns -> n -> c.apps().deployments().inNamespace(ns).withName(n).get()), + REPLICASET( + "ReplicaSet", c -> ns -> n -> c.apps().replicaSets().inNamespace(ns).withName(n).get()), + REPLICATIONCONTROLLER( + "ReplicationController", + c -> ns -> n -> c.replicationControllers().inNamespace(ns).withName(n).get()), + POD("Pod", c -> ns -> n -> c.pods().inNamespace(ns).withName(n).get()), + ENDPOINT("Endpoint", c -> ns -> n -> c.endpoints().inNamespace(ns).withName(n).get()), + // OpenShift resources + DEPLOYMENTCONFIG("DeploymentConfig"), + ; + + private final String kubernetesKind; + private final transient Function< + KubernetesClient, Function>> + getFn; + + KubeDiscoveryNodeType(String kubernetesKind) { + this(kubernetesKind, client -> namespace -> name -> null); + } + + KubeDiscoveryNodeType( + String kubernetesKind, + Function>> + getFn) { + this.kubernetesKind = kubernetesKind; + this.getFn = getFn; + } + + @Override + public String getKind() { + return kubernetesKind; + } + + public Function>> + getQueryFunction() { + return getFn; + } + + public static KubeDiscoveryNodeType fromKubernetesKind(String kubernetesKind) { + if (kubernetesKind == null) { + return null; + } + for (KubeDiscoveryNodeType nt : values()) { + if (kubernetesKind.equalsIgnoreCase(nt.kubernetesKind)) { + return nt; + } + } + return null; + } + + @Override + public String toString() { + return getKind(); + } +} diff --git a/src/main/java/io/cryostat/discovery/NodeType.java b/src/main/java/io/cryostat/discovery/NodeType.java new file mode 100644 index 000000000..cd96d729e --- /dev/null +++ b/src/main/java/io/cryostat/discovery/NodeType.java @@ -0,0 +1,52 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.discovery; + +public interface NodeType { + String getKind(); + + int ordinal(); +} + +enum BaseNodeType implements NodeType { + // represents the entire deployment scenario Cryostat finds itself in + UNIVERSE("Universe"), + // represents a division of the deployment scenario - the universe may consist of a + // Kubernetes Realm and a JDP Realm, for example + REALM("Realm"), + // represents a plain target JVM, connectable over JMX + JVM("JVM"), + // represents a target JVM using the Cryostat Agent, *not* connectable over JMX. Agent instances + // that do publish a JMX Service URL should publish themselves with the JVM NodeType. + AGENT("CryostatAgent"), + ; + + private final String kind; + + BaseNodeType(String kind) { + this.kind = kind; + } + + @Override + public String getKind() { + return kind; + } + + @Override + public String toString() { + return getKind(); + } +} diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index 2abb66f2e..130ebcc93 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -20,7 +20,9 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -185,6 +187,73 @@ public boolean equals(Object obj) { && Objects.equals(labels, other.labels); } + public static Compare compare(Collection src) { + return new Compare(src); + } + + public static class Compare { + private Collection previous, current; + + public Compare(Collection previous) { + this.previous = new HashSet<>(previous); + } + + public Compare to(Collection current) { + this.current = new HashSet<>(current); + return this; + } + + public Collection added() { + return removeAllUpdatedRefs(addedOrUpdatedRefs(), updated(false)); + } + + public Collection removed() { + return removeAllUpdatedRefs(removedOrUpdatedRefs(), updated(true)); + } + + public Collection updated(boolean keepOld) { + Collection updated = new HashSet<>(); + intersection(removedOrUpdatedRefs(), addedOrUpdatedRefs(), keepOld) + .forEach((ref) -> updated.add(ref)); + return updated; + } + + private Collection addedOrUpdatedRefs() { + Collection added = new HashSet<>(current); + added.removeAll(previous); + return added; + } + + private Collection removedOrUpdatedRefs() { + Collection removed = new HashSet<>(previous); + removed.removeAll(current); + return removed; + } + + private Collection removeAllUpdatedRefs( + Collection src, Collection updated) { + Collection tnSet = new HashSet<>(src); + intersection(src, updated, true).stream().forEach((ref) -> tnSet.remove(ref)); + return tnSet; + } + + private Collection intersection( + Collection src, Collection other, boolean keepOld) { + final Collection intersection = new HashSet<>(); + + // Manual removal since Target also compares jvmId + for (Target srcTarget : src) { + for (Target otherTarget : other) { + if (Objects.equals(srcTarget.connectUrl, otherTarget.connectUrl)) { + intersection.add(keepOld ? srcTarget : otherTarget); + } + } + } + + return intersection; + } + } + public enum EventKind { FOUND, MODIFIED, diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 0467ceee7..d08f95bab 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -17,6 +17,7 @@ quarkus.log.category."org.jboss.resteasy.reactive.common.core.AbstractResteasyRe cryostat.discovery.jdp.enabled=true cryostat.discovery.podman.enabled=true cryostat.discovery.docker.enabled=true +cryostat.discovery.kubernetes.enabled=true quarkus.datasource.devservices.enabled=true quarkus.datasource.devservices.image-name=quay.io/cryostat/cryostat-db diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index 70b0d40ef..28d180f32 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -3,6 +3,7 @@ quarkus.smallrye-openapi.info-title=Cryostat API (test) cryostat.discovery.jdp.enabled=true cryostat.discovery.podman.enabled=true cryostat.discovery.docker.enabled=true +cryostat.discovery.kubernetes.enabled=true quarkus.test.env.JAVA_OPTS_APPEND=-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9091 -Dcom.sun.management.jmxremote.rmi.port=9091 -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f27a26dcd..745458ec6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -10,6 +10,14 @@ cryostat.discovery.plugins.jwt.secret.keysize=256 cryostat.discovery.plugins.jwt.signature.algorithm=HS256 cryostat.discovery.plugins.jwt.encryption.algorithm=dir cryostat.discovery.plugins.jwt.encryption.method=A256GCM +cryostat.discovery.kubernetes.enabled=false +cryostat.discovery.kubernetes.port-names= +cryostat.discovery.kubernetes.port-numbers= +cryostat.discovery.kubernetes.namespaces= +cryostat.discovery.kubernetes.namespace-path=/var/run/secrets/kubernetes.io/serviceaccount/namespace +cryostat.discovery.kubernetes.resync-period=30s +kubernetes.service.host= + quarkus.test.integration-test-profile=test cryostat.connections.max-open=0