From 626604e1301edd989d34e7d4fdd4d4a894b6abd5 Mon Sep 17 00:00:00 2001 From: Thuan Vo Date: Fri, 3 May 2024 14:45:22 -0400 Subject: [PATCH] fix(discovery): observed containers should be checked with persisted nodes (#423) --- compose/db.yml | 2 +- smoketest.bash | 2 + .../discovery/ContainerDiscovery.java | 311 +++++++++++------- src/main/java/io/cryostat/targets/Target.java | 9 + 4 files changed, 210 insertions(+), 114 deletions(-) diff --git a/compose/db.yml b/compose/db.yml index 8c5fc19a6..aea89af84 100644 --- a/compose/db.yml +++ b/compose/db.yml @@ -2,7 +2,7 @@ version: "3" services: cryostat: environment: - QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION: drop-and-create + QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION: ${DATABASE_GENERATION:-drop-and-create} QUARKUS_DATASOURCE_USERNAME: cryostat3 QUARKUS_DATASOURCE_PASSWORD: cryostat3 QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://db:5432/cryostat3 diff --git a/smoketest.bash b/smoketest.bash index 2c1e69478..03c22b812 100755 --- a/smoketest.bash +++ b/smoketest.bash @@ -65,6 +65,7 @@ while getopts "hs:prGtOVXcbn" opt; do ;; V) KEEP_VOLUMES=true + DATABASE_GENERATION=update ;; X) FILES+=("${DIR}/compose/db-viewer.yml") @@ -111,6 +112,7 @@ fi export CRYOSTAT_HTTP_HOST export CRYOSTAT_HTTP_PORT export GRAFANA_DASHBOARD_EXT_URL +export DATABASE_GENERATION s3Manifest="${DIR}/compose/s3-${s3}.yml" STORAGE_PORT="$(yq '.services.*.expose[0]' "${s3Manifest}" | grep -v null)" diff --git a/src/main/java/io/cryostat/discovery/ContainerDiscovery.java b/src/main/java/io/cryostat/discovery/ContainerDiscovery.java index 3ad5b7bb1..eba8dfc9c 100644 --- a/src/main/java/io/cryostat/discovery/ContainerDiscovery.java +++ b/src/main/java/io/cryostat/discovery/ContainerDiscovery.java @@ -21,18 +21,18 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.management.remote.JMXServiceURL; @@ -48,18 +48,24 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.security.auth.module.UnixSystem; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; +import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.http.HttpMethod; import io.vertx.mutiny.core.Vertx; +import io.vertx.mutiny.core.eventbus.EventBus; import io.vertx.mutiny.core.net.SocketAddress; import io.vertx.mutiny.ext.web.client.WebClient; import io.vertx.mutiny.ext.web.codec.BodyCodec; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.persistence.NoResultException; +import jakarta.resource.spi.IllegalStateException; import jakarta.transaction.Transactional; +import jakarta.transaction.Transactional.TxType; import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -95,6 +101,21 @@ protected String getContainerQueryURL(ContainerSpec spec) { protected boolean enabled() { return enabled; } + + @ConsumeEvent(blocking = true, ordered = true) + @Transactional(TxType.REQUIRES_NEW) + public void handleContainerEvent(ContainerDiscoveryEvent evt) { + try { + updateDiscoveryTree(evt); + } catch (IllegalStateException e) { + logger.warn(e); + } + } + + @Override + protected String notificationAddress() { + return PodmanDiscovery.class.getName(); + } } @ApplicationScoped @@ -126,6 +147,21 @@ protected String getContainerQueryURL(ContainerSpec spec) { protected boolean enabled() { return enabled; } + + @ConsumeEvent(blocking = true, ordered = true) + @Transactional(TxType.REQUIRES_NEW) + public void handleContainerEvent(ContainerDiscoveryEvent evt) { + try { + updateDiscoveryTree(evt); + } catch (IllegalStateException e) { + logger.warn(e); + } + } + + @Override + protected String notificationAddress() { + return DockerDiscovery.class.getName(); + } } public abstract class ContainerDiscovery { @@ -133,7 +169,6 @@ public abstract class ContainerDiscovery { public static final String JMX_URL_LABEL = "io.cryostat.jmxUrl"; public static final String JMX_HOST_LABEL = "io.cryostat.jmxHost"; public static final String JMX_PORT_LABEL = "io.cryostat.jmxPort"; - public static final String CONTAINER_ID_LABEL = "io.cryostat.containerId"; @Inject Logger logger; @Inject FileSystem fs; @@ -141,6 +176,7 @@ public abstract class ContainerDiscovery { @Inject WebClient webClient; @Inject JFRConnectionToolkit connectionToolkit; @Inject ObjectMapper mapper; + @Inject EventBus bus; @ConfigProperty(name = ConfigProperties.CONTAINERS_POLL_PERIOD) Duration pollPeriod; @@ -150,19 +186,16 @@ public abstract class ContainerDiscovery { protected long timerId; - protected final CopyOnWriteArrayList containers = new CopyOnWriteArrayList<>(); - @Transactional void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; } - Path socketPath = Path.of(getSocket().path()); - if (!(fs.exists(socketPath) && fs.isReadable(socketPath))) { + if (!available()) { logger.errorv( "{0} enabled but socket {1} is not accessible!", - getClass().getName(), socketPath); + getClass().getName(), Path.of(getSocket().path())); return; } @@ -185,56 +218,110 @@ void onStart(@Observes StartupEvent evt) { } void onStop(@Observes ShutdownEvent evt) { - if (!enabled()) { + if (!(enabled() && available())) { return; } logger.infov("Shutting down {0} client", getRealm()); vertx.cancelTimer(timerId); } - private void queryContainers() { - doContainerListRequest( - current -> { - Set previous = new HashSet<>(containers); - Set updated = new HashSet<>(current); + boolean available() { + Path socketPath = Path.of(getSocket().path()); + return fs.exists(socketPath) && fs.isReadable(socketPath); + } - Set intersection = new HashSet<>(containers); - intersection.retainAll(updated); + // Construct a target representation (non-persistent) of the container spec + private Target toTarget(ContainerSpec desc) { + URI connectUrl; + String hostname; + int jmxPort; + try { + JMXServiceURL serviceUrl; + URI rmiTarget; + if (desc.Labels.containsKey(JMX_URL_LABEL)) { + serviceUrl = new JMXServiceURL(desc.Labels.get(JMX_URL_LABEL)); + connectUrl = URI.create(serviceUrl.toString()); + try { + rmiTarget = URIUtil.getRmiTarget(serviceUrl); + hostname = rmiTarget.getHost(); + jmxPort = rmiTarget.getPort(); + } catch (IllegalArgumentException e) { + hostname = serviceUrl.getHost(); + jmxPort = serviceUrl.getPort(); + } + } else { + jmxPort = Integer.parseInt(desc.Labels.get(JMX_PORT_LABEL)); + hostname = desc.Labels.get(JMX_HOST_LABEL); + if (hostname == null) { + try { + hostname = + doContainerInspectRequest(desc) + .get(2, TimeUnit.SECONDS) + .Config + .Hostname; + } catch (InterruptedException | TimeoutException | ExecutionException e) { + logger.warnv(e, "Invalid {0} target observed", getRealm()); + return null; + } + } + } + serviceUrl = connectionToolkit.createServiceURL(hostname, jmxPort); + connectUrl = URI.create(serviceUrl.toString()); + } catch (MalformedURLException | URISyntaxException e) { + logger.warnv(e, "Invalid {0} target observed", getRealm()); + return null; + } - Set removed = new HashSet<>(previous); - removed.removeAll(intersection); + Target target = new Target(); + target.activeRecordings = new ArrayList<>(); + target.connectUrl = connectUrl; + target.alias = Optional.ofNullable(desc.Names.get(0)).orElse(desc.Id); + target.labels = desc.Labels; + target.annotations = new Annotations(); + target.annotations + .cryostat() + .putAll( + Map.of( + "REALM", // AnnotationKey.REALM, + getRealm(), + "HOST", // AnnotationKey.HOST, + hostname, + "PORT", // "AnnotationKey.PORT, + Integer.toString(jmxPort))); + + return target; + } - Set added = new HashSet<>(updated); - added.removeAll(intersection); + private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException { + // Check for any targets with the same connectUrl in other realms + try { + Target persistedTarget = Target.getTargetByConnectUrl(connectUrl); + String realmOfTarget = persistedTarget.annotations.cryostat().get("REALM"); + if (!getRealm().equals(realmOfTarget)) { + logger.warnv( + "Expected persisted target with serviceURL {0} to be under realm" + + " {1} but found under {2} ", + persistedTarget.connectUrl, getRealm(), realmOfTarget); + throw new IllegalStateException(); + } + return true; + } catch (NoResultException e) { + } + return false; + } - containers.removeAll(removed); - Infrastructure.getDefaultWorkerPool() - .execute( - () -> - removed.stream() - .filter(Objects::nonNull) - .forEach( - container -> - handleContainerEvent( - container, - EventKind.LOST))); - - containers.addAll(added); + private void queryContainers() { + doContainerListRequest( + current -> { Infrastructure.getDefaultWorkerPool() .execute( () -> - added.stream() - .filter(Objects::nonNull) - .forEach( - container -> - handleContainerEvent( - container, - EventKind.FOUND))); + QuarkusTransaction.requiringNew() + .run(() -> handleObservedContainers(current))); }); } private void doContainerListRequest(Consumer> successHandler) { - logger.trace(String.format("Shutting down %s client", getRealm())); URI requestPath = URI.create(getContainersQueryURL()); try { webClient @@ -291,73 +378,56 @@ private CompletableFuture doContainerInspectRequest(ContainerS return result; } - @Transactional - public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { + private void handleObservedContainers(List current) { + Map containerRefMap = new HashMap<>(); + + Set persistedTargets = + Target.findByRealm(getRealm()).stream().collect(Collectors.toSet()); + Set observedTargets = + current.stream() + .map( + (desc) -> { + Target t = toTarget(desc); + if (Objects.nonNull(t)) { + containerRefMap.put(t.connectUrl, desc); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + ContainerDiscoveryEvent.from( + containerRefMap.get(t.connectUrl), + t, + EventKind.FOUND))); + + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach((t) -> notify(ContainerDiscoveryEvent.from(null, t, EventKind.LOST))); + } + + public void updateDiscoveryTree(ContainerDiscoveryEvent evt) throws IllegalStateException { + EventKind evtKind = evt.eventKind; + ContainerSpec desc = evt.desc; + Target target = evt.target; + DiscoveryNode realm = DiscoveryNode.getRealm(getRealm()).orElseThrow(); if (evtKind == EventKind.FOUND) { - URI connectUrl; - String hostname; - int jmxPort; - try { - JMXServiceURL serviceUrl; - URI rmiTarget; - if (desc.Labels.containsKey(JMX_URL_LABEL)) { - serviceUrl = new JMXServiceURL(desc.Labels.get(JMX_URL_LABEL)); - connectUrl = URI.create(serviceUrl.toString()); - try { - rmiTarget = URIUtil.getRmiTarget(serviceUrl); - hostname = rmiTarget.getHost(); - jmxPort = rmiTarget.getPort(); - } catch (IllegalArgumentException e) { - hostname = serviceUrl.getHost(); - jmxPort = serviceUrl.getPort(); - } - } else { - jmxPort = Integer.parseInt(desc.Labels.get(JMX_PORT_LABEL)); - hostname = desc.Labels.get(JMX_HOST_LABEL); - if (hostname == null) { - try { - hostname = - doContainerInspectRequest(desc) - .get(2, TimeUnit.SECONDS) - .Config - .Hostname; - } catch (InterruptedException | TimeoutException | ExecutionException e) { - containers.remove(desc); - logger.warnv(e, "Invalid {0} target observed", getRealm()); - return; - } - } - } - serviceUrl = connectionToolkit.createServiceURL(hostname, jmxPort); - connectUrl = URI.create(serviceUrl.toString()); - } catch (MalformedURLException | URISyntaxException e) { - containers.remove(desc); - logger.warnv(e, "Invalid {0} target observed", getRealm()); + if (isTargetUnderRealm(target.connectUrl)) { + logger.infov( + "Target with serviceURL {0} already exist in discovery tree. Skip adding", + target.connectUrl); return; } - - Target target = new Target(); - target.activeRecordings = new ArrayList<>(); - target.connectUrl = connectUrl; - target.alias = Optional.ofNullable(desc.Names.get(0)).orElse(desc.Id); - target.labels = desc.Labels; - target.annotations = new Annotations(); - target.annotations - .cryostat() - .putAll( - Map.of( - "REALM", // AnnotationKey.REALM, - getRealm(), - "HOST", // AnnotationKey.HOST, - hostname, - "PORT", // "AnnotationKey.PORT, - Integer.toString(jmxPort))); - DiscoveryNode node = DiscoveryNode.target(target, BaseNodeType.JVM); - node.labels.put( - CONTAINER_ID_LABEL, desc.Id); // Add container Id retrieve node during deletion target.discoveryNode = node; String podName = desc.PodName; @@ -394,18 +464,18 @@ public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { node.persist(); realm.persist(); } else { - Optional target = - Target.getTarget( - (t) -> - realm.name.equals(t.annotations.cryostat().get("REALM")) - && desc.Id.equals( - t.discoveryNode.labels.get( - CONTAINER_ID_LABEL))); - if (target.isEmpty()) { + if (!isTargetUnderRealm(target.connectUrl)) { + logger.infov( + "Target with serviceURL {0} does not exist in discovery tree. Skip" + + " deleting", + target.connectUrl); return; } - - DiscoveryNode node = target.get().discoveryNode; + // Retrieve the latest snapshot of the target + // The target received from event message is outdated as it belongs to the previous + // transaction + target = Target.getTargetByConnectUrl(target.connectUrl); + DiscoveryNode node = target.discoveryNode; while (true) { DiscoveryNode parent = node.parent; @@ -425,10 +495,14 @@ public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { } realm.persist(); - target.get().delete(); + target.delete(); } } + protected void notify(ContainerDiscoveryEvent evt) { + bus.publish(notificationAddress(), evt); + } + protected abstract SocketAddress getSocket(); protected abstract String getRealm(); @@ -439,6 +513,10 @@ public void handleContainerEvent(ContainerSpec desc, EventKind evtKind) { protected abstract boolean enabled(); + protected abstract String notificationAddress(); + + public abstract void handleContainerEvent(ContainerDiscoveryEvent evt); + static record PortSpec( long container_port, String host_ip, long host_port, String protocol, long range) {} @@ -457,6 +535,13 @@ static record ContainerSpec( static record ContainerDetails(Config Config) {} static record Config(String Hostname) {} + + static record ContainerDiscoveryEvent(ContainerSpec desc, Target target, EventKind eventKind) { + static ContainerDiscoveryEvent from( + ContainerSpec spec, Target target, EventKind eventKind) { + return new ContainerDiscoveryEvent(spec, target, eventKind); + } + } } enum ContainerDiscoveryNodeType implements NodeType { diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index 0cdcd6908..6b6d4c14d 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; +import java.util.stream.Collectors; import io.cryostat.ConfigProperties; import io.cryostat.core.JvmIdentifier; @@ -139,6 +140,14 @@ public static boolean deleteByConnectUrl(URI connectUrl) { return delete("connectUrl", connectUrl) > 0; } + public static List findByRealm(String realm) { + List targets = findAll().list(); + + return targets.stream() + .filter((t) -> realm.equals(t.annotations.cryostat().get("REALM"))) + .collect(Collectors.toList()); + } + public ActiveRecording getRecordingById(long remoteId) { return activeRecordings.stream() .filter(rec -> rec.remoteId == remoteId)