From c3ee63b83ce27a902da5d77c53b250b6d7903ad3 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 28 Nov 2024 15:07:43 -0500 Subject: [PATCH] fix(discovery): discovery synchronization for stale lost targets (#689) --- src/main/java/io/cryostat/StorageBuckets.java | 42 ++- .../io/cryostat/discovery/DiscoveryNode.java | 71 ++-- .../cryostat/discovery/KubeApiDiscovery.java | 309 +++++++++--------- .../cryostat/recordings/ActiveRecordings.java | 2 + .../java/io/cryostat/rules/RuleService.java | 11 + src/main/java/io/cryostat/targets/Target.java | 82 ----- .../io/cryostat/targets/TargetUpdateJob.java | 84 +++++ .../cryostat/targets/TargetUpdateService.java | 131 ++++++++ src/main/resources/application.properties | 1 + 9 files changed, 480 insertions(+), 253 deletions(-) create mode 100644 src/main/java/io/cryostat/targets/TargetUpdateJob.java create mode 100644 src/main/java/io/cryostat/targets/TargetUpdateService.java diff --git a/src/main/java/io/cryostat/StorageBuckets.java b/src/main/java/io/cryostat/StorageBuckets.java index 3688668f9..87b9fa9bd 100644 --- a/src/main/java/io/cryostat/StorageBuckets.java +++ b/src/main/java/io/cryostat/StorageBuckets.java @@ -15,10 +15,21 @@ */ package io.cryostat; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import io.cryostat.util.HttpStatusCodeIdentifier; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -30,7 +41,17 @@ public class StorageBuckets { @Inject S3Client storage; @Inject Logger logger; + @ConfigProperty(name = "storage.buckets.creation-retry.period") + Duration creationRetryPeriod; + + private final Set buckets = ConcurrentHashMap.newKeySet(); + private final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor(); + public void createIfNecessary(String bucket) { + buckets.add(bucket); + } + + private boolean tryCreate(String bucket) { boolean exists = false; logger.debugv("Checking if storage bucket \"{0}\" exists ...", bucket); try { @@ -49,8 +70,27 @@ public void createIfNecessary(String bucket) { storage.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); logger.debugv("Storage bucket \"{0}\" created", bucket); } catch (Exception e) { - logger.error(e); + logger.warn(e); + return false; } } + return true; + } + + void onStart(@Observes StartupEvent evt) { + worker.scheduleAtFixedRate( + () -> { + var it = buckets.iterator(); + while (it.hasNext()) { + if (tryCreate(it.next())) it.remove(); + } + }, + 0, + creationRetryPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } + + void onStop(@Observes ShutdownEvent evt) { + worker.shutdown(); } } diff --git a/src/main/java/io/cryostat/discovery/DiscoveryNode.java b/src/main/java/io/cryostat/discovery/DiscoveryNode.java index 3446ad51b..6bf524349 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryNode.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryNode.java @@ -31,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonView; import io.quarkus.hibernate.orm.panache.PanacheEntity; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.panache.common.Parameters; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; @@ -42,6 +43,8 @@ import jakarta.persistence.FetchType; import jakarta.persistence.JoinColumn; import jakarta.persistence.ManyToOne; +import jakarta.persistence.NamedQueries; +import jakarta.persistence.NamedQuery; import jakarta.persistence.OneToMany; import jakarta.persistence.OneToOne; import jakarta.persistence.PostPersist; @@ -56,6 +59,11 @@ @Entity @EntityListeners(DiscoveryNode.Listener.class) +@NamedQueries({ + @NamedQuery( + name = "DiscoveryNode.byTypeWithName", + query = "from DiscoveryNode where nodeType = :nodeType and name = :name") +}) public class DiscoveryNode extends PanacheEntity { public static final String NODE_TYPE = "nodeType"; @@ -129,33 +137,48 @@ public static List findAllByNodeType(NodeType nodeType) { } public static DiscoveryNode environment(String name, NodeType nodeType) { - return QuarkusTransaction.joiningExisting() - .call( - () -> { - DiscoveryNode node = new DiscoveryNode(); - node.name = name; - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(); - node.children = new ArrayList<>(); - node.target = null; - node.persist(); - return node; - }); + var kind = nodeType.getKind(); + return DiscoveryNode.find( + "#DiscoveryNode.byTypeWithName", + Parameters.with("nodeType", kind).and("name", name)) + .firstResultOptional() + .orElseGet( + () -> + QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = name; + node.nodeType = kind; + node.labels = new HashMap<>(); + node.children = new ArrayList<>(); + node.target = null; + node.persist(); + return node; + })); } public static DiscoveryNode target(Target target, NodeType nodeType) { - return QuarkusTransaction.joiningExisting() - .call( - () -> { - DiscoveryNode node = new DiscoveryNode(); - node.name = target.connectUrl.toString(); - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(target.labels); - node.children = null; - node.target = target; - node.persist(); - return node; - }); + var kind = nodeType.getKind(); + var connectUrl = target.connectUrl.toString(); + return DiscoveryNode.find( + "#DiscoveryNode.byTypeWithName", + Parameters.with("nodeType", kind).and("name", connectUrl)) + .firstResultOptional() + .orElseGet( + () -> + QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = connectUrl; + node.nodeType = kind; + node.labels = new HashMap<>(target.labels); + node.children = null; + node.target = target; + node.persist(); + return node; + })); } @Override diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index e9ccd2abe..685dbd92c 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -27,6 +27,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -45,16 +48,12 @@ import io.fabric8.kubernetes.api.model.ObjectReference; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; -import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.mutiny.core.eventbus.EventBus; -import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @@ -69,7 +68,11 @@ import org.jboss.logging.Logger; @ApplicationScoped -public class KubeApiDiscovery { +public class KubeApiDiscovery implements ResourceEventHandler { + + private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; + private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; + public static final String REALM = "KubernetesApi"; public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace"; @@ -86,6 +89,8 @@ public class KubeApiDiscovery { @Inject EventBus bus; + ScheduledExecutorService resyncWorker = Executors.newSingleThreadScheduledExecutor(); + @ConfigProperty(name = "cryostat.discovery.kubernetes.enabled") boolean enabled; @@ -116,20 +121,18 @@ protected HashMap> initialize() client.endpoints() .inNamespace(ns) .inform( - new EndpointsHandler(), + KubeApiDiscovery.this, informerResyncPeriod.toMillis())); logger.debugv( - "Started Endpoints SharedInformer for" - + " namespace \"{0}\"", - ns); + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); }); return result; } }; - // Priority is set higher than default 0 such that onStart is called first before onAfterStart - // This ensures realm node is persisted before initializing informers - void onStart(@Observes @Priority(1) StartupEvent evt) { + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; } @@ -140,13 +143,19 @@ void onStart(@Observes @Priority(1) StartupEvent evt) { } logger.debugv("Starting {0} client", REALM); - } - - void onAfterStart(@Observes StartupEvent evt) { - if (!enabled() || !available()) { - return; - } safeGetInformers(); + resyncWorker.scheduleAtFixedRate( + () -> { + try { + logger.debug("Resyncing"); + notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); + } catch (Exception e) { + logger.warn(e); + } + }, + 0, + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); } void onStop(@Observes ShutdownEvent evt) { @@ -155,6 +164,7 @@ void onStop(@Observes ShutdownEvent evt) { } logger.debugv("Shutting down {0} client", REALM); + resyncWorker.shutdown(); safeGetInformers() .forEach( (ns, informer) -> { @@ -178,6 +188,33 @@ boolean available() { return false; } + @Override + public void onAdd(Endpoints endpoints) { + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + notify(NamespaceQueryEvent.from(endpoints.getMetadata().getNamespace())); + } + + @Override + public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), newEndpoints.getMetadata().getNamespace()); + notify(NamespaceQueryEvent.from(newEndpoints.getMetadata().getNamespace())); + } + + @Override + public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); + } + notify(NamespaceQueryEvent.from(endpoints.getMetadata().getNamespace())); + } + private boolean isCompatiblePort(EndpointPort port) { return jmxPortNames.orElse(EMPTY_PORT_NAMES).contains(port.getName()) || jmxPortNumbers.orElse(EMPTY_PORT_NUMBERS).contains(port.getPort()); @@ -188,7 +225,14 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointSubset subset : endpoints.getSubsets()) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { - tts.add(new TargetTuple(addr.getTargetRef(), addr, port)); + var ref = addr.getTargetRef(); + tts.add( + new TargetTuple( + ref, + queryForNode(ref.getNamespace(), ref.getName(), ref.getKind()) + .getLeft(), + addr, + port)); } } } @@ -205,19 +249,17 @@ private List getTargetTuplesFrom(Endpoints endpoints) { } private Map> safeGetInformers() { - Map> informers; try { - informers = nsInformers.get(); + return nsInformers.get(); } catch (ConcurrentException e) { throw new IllegalStateException(e); } - return informers; } - private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException { + private boolean isTargetUnderRealm(Target target) throws IllegalStateException { // Check for any targets with the same connectUrl in other realms try { - Target persistedTarget = Target.getTargetByConnectUrl(connectUrl); + Target persistedTarget = Target.getTargetByConnectUrl(target.connectUrl); String realmOfTarget = persistedTarget.annotations.cryostat().get("REALM"); if (!REALM.equals(realmOfTarget)) { logger.warnv( @@ -232,8 +274,75 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException return false; } - @ConsumeEvent(blocking = true, ordered = true) + @ConsumeEvent(value = NAMESPACE_QUERY_ADDR, blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) + public void handleQueryEvent(NamespaceQueryEvent evt) { + Map targetRefMap = new HashMap<>(); + + for (var namespace : evt.namespaces) { + try { + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() + .filter( + (n) -> + namespace.equals( + n.labels.get( + DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); + } + + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put(t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, t, null, EventKind.LOST))); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + } catch (Exception e) { + logger.error( + String.format("Failed to syncronize Endpoints in namespace %s", namespace), + e); + } + } + } + + @ConsumeEvent(value = ENDPOINTS_DISCOVERY_ADDR, blocking = true, ordered = true) + @Transactional(TxType.REQUIRED) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { String namespace = evt.namespace; DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); @@ -243,89 +352,33 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { DiscoveryNode.environment( namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } - - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; - } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); } - } - private void handleObservedEndpoints(String namespace) { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() - .filter( - (n) -> - namespace.equals( - n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; } + realm.persist(); + } - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put(t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); + private void notify(NamespaceQueryEvent evt) { + bus.publish(NAMESPACE_QUERY_ADDR, evt); } private void notify(EndpointDiscoveryEvent evt) { - bus.publish(KubeApiDiscovery.class.getName(), evt); + bus.publish(ENDPOINTS_DISCOVERY_ADDR, evt); } private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { - if (!isTargetUnderRealm(target.connectUrl)) { - logger.infov( + if (!isTargetUnderRealm(target)) { + logger.debugv( "Target with serviceURL {0} does not exist in discovery tree. Skipped deleting", target.connectUrl); return; @@ -361,9 +414,9 @@ private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { } private void buildOwnerChain(DiscoveryNode nsNode, Target target, ObjectReference targetRef) { - if (isTargetUnderRealm(target.connectUrl)) { - logger.infov( - "Target with serviceURL {0} already exist in discovery tree. Skipped adding", + if (isTargetUnderRealm(target)) { + logger.debugv( + "Target with serviceURL {0} already exists in discovery tree. Skipped adding", target.connectUrl); return; } @@ -494,16 +547,6 @@ static final class KubeConfig { @ConfigProperty(name = "cryostat.discovery.kubernetes.namespace-path") String namespacePath; - private final LazyInitializer kubeClient = - new LazyInitializer() { - @Override - protected KubernetesClient initialize() throws ConcurrentException { - return new KubernetesClientBuilder() - .withTaskExecutor(Infrastructure.getDefaultWorkerPool()) - .build(); - } - }; - Collection getWatchNamespaces() { return watchNamespaces.orElse(List.of()).stream() .map( @@ -531,37 +574,13 @@ boolean kubeApiAvailable() { } } - private final class EndpointsHandler implements ResourceEventHandler { - @Override - public void onAdd(Endpoints endpoints) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + private static record NamespaceQueryEvent(Collection namespaces) { + static NamespaceQueryEvent from(Collection namespaces) { + return new NamespaceQueryEvent(namespaces); } - @Override - public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), - newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); - } - - @Override - public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); - return; - } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + static NamespaceQueryEvent from(String namespace) { + return new NamespaceQueryEvent(List.of(namespace)); } } @@ -575,11 +594,14 @@ static EndpointDiscoveryEvent from( private class TargetTuple { ObjectReference objRef; + HasMetadata obj; EndpointAddress addr; EndpointPort port; - TargetTuple(ObjectReference objRef, EndpointAddress addr, EndpointPort port) { + TargetTuple( + ObjectReference objRef, HasMetadata obj, EndpointAddress addr, EndpointPort port) { this.objRef = objRef; + this.obj = obj; this.addr = addr; this.port = port; } @@ -604,11 +626,6 @@ public Target toTarget() { "/jndi/rmi://" + host + ':' + port.getPort() + "/jmxrmi"); URI connectUrl = URI.create(jmxUrl.toString()); - Pair pair = - queryForNode(namespace, objRef.getName(), objRef.getKind()); - - HasMetadata obj = pair.getLeft(); - Target target = new Target(); target.activeRecordings = new ArrayList<>(); target.connectUrl = connectUrl; diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 99520b4bc..2d7ea05ca 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -70,6 +70,8 @@ public class ActiveRecordings { Duration connectionFailedTimeout; @GET + @Blocking + @Transactional @RolesAllowed("read") public List list(@RestPath long targetId) throws Exception { Target target = Target.find("id", targetId).singleResult(); diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 164c53cdb..58df86a9f 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -40,6 +40,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -48,6 +49,7 @@ import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import jakarta.transaction.Transactional; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -86,10 +88,19 @@ void onStart(@Observes StartupEvent ev) { .forEach(this::applyRuleToMatchingTargets)); } + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + quartz.shutdown(); + } + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { + case MODIFIED: + // fall-through case FOUND: + if (StringUtils.isBlank(event.serviceRef().jvmId)) { + break; + } applyRulesToTarget(event.serviceRef()); break; case LOST: diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index 9bc733033..1389199b0 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -18,7 +18,6 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,14 +30,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import io.cryostat.ConfigProperties; -import io.cryostat.core.net.JFRConnection; -import io.cryostat.credentials.Credential; import io.cryostat.discovery.DiscoveryNode; -import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.libcryostat.JvmIdentifier; import io.cryostat.recordings.ActiveRecording; -import io.cryostat.recordings.RecordingHelper; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; @@ -46,7 +39,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.hibernate.orm.panache.PanacheEntity; -import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -63,15 +55,12 @@ import jakarta.persistence.PostRemove; import jakarta.persistence.PostUpdate; import jakarta.persistence.PrePersist; -import jakarta.transaction.Transactional; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import org.apache.commons.lang3.StringUtils; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.hibernate.annotations.JdbcTypeCode; import org.hibernate.type.SqlTypes; import org.jboss.logging.Logger; -import org.projectnessie.cel.tools.ScriptException; @Entity @EntityListeners(Target.Listener.class) @@ -291,53 +280,6 @@ static class Listener { @Inject Logger logger; @Inject EventBus bus; - @Inject TargetConnectionManager connectionManager; - @Inject RecordingHelper recordingHelper; - @Inject MatchExpressionEvaluator matchExpressionEvaluator; - - @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) - Duration timeout; - - @Transactional - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) - void onMessage(TargetDiscovery event) { - var target = Target.find("id", event.serviceRef().id).singleResultOptional(); - switch (event.kind()) { - case LOST: - // this should already be handled by the cascading deletion of the Target - // TODO verify this - break; - case FOUND: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - case MODIFIED: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - default: - // no-op - break; - } - } - - @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) - @Transactional - void updateCredential(Credential credential) { - Target.stream("#Target.unconnected") - .forEach( - t -> { - try { - if (matchExpressionEvaluator.applies( - credential.matchExpression, t)) { - updateTargetJvmId(t, credential); - t.persist(); - } - } catch (ScriptException e) { - logger.error(e); - } catch (Exception e) { - logger.warn(e); - } - }); - } @PrePersist void prePersist(Target target) { @@ -358,30 +300,6 @@ void prePersist(Target target) { if (target.activeRecordings == null) { target.activeRecordings = new ArrayList<>(); } - - try { - if (StringUtils.isBlank(target.jvmId)) { - updateTargetJvmId(target, null); - } - } catch (Exception e) { - logger.warn(e); - } - } - - private void updateTargetJvmId(Target t, Credential credential) { - try { - t.jvmId = - connectionManager - .executeDirect( - t, - Optional.ofNullable(credential), - JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(timeout); - } catch (Exception e) { - logger.error(e); - } } @PostPersist diff --git a/src/main/java/io/cryostat/targets/TargetUpdateJob.java b/src/main/java/io/cryostat/targets/TargetUpdateJob.java new file mode 100644 index 000000000..7a67cd15a --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetUpdateJob.java @@ -0,0 +1,84 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.util.List; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.net.JFRConnection; +import io.cryostat.libcryostat.JvmIdentifier; +import io.cryostat.recordings.RecordingHelper; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class TargetUpdateJob implements Job { + + @Inject Logger logger; + @Inject TargetConnectionManager connectionManager; + @Inject RecordingHelper recordingHelper; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + @Override + @Transactional + public void execute(JobExecutionContext context) throws JobExecutionException { + List targets; + Long targetId = (Long) context.getJobDetail().getJobDataMap().get("targetId"); + if (targetId != null) { + targets = List.of(Target.getTargetById(targetId)); + } else { + targets = Target.find("#Target.unconnected").list(); + } + + if (targets.size() == 1) { + updateTarget(targets.get(0)); + } else { + targets.forEach( + t -> Infrastructure.getDefaultExecutor().execute(() -> updateTargetTx(t.id))); + } + } + + private void updateTargetTx(long id) { + QuarkusTransaction.requiringNew().run(() -> updateTarget(Target.getTargetById(id))); + } + + private void updateTarget(Target target) { + try { + target.jvmId = + connectionManager + .executeConnectedTaskUni(target, JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + } catch (Exception e) { + target.jvmId = null; + target.persist(); + throw e; + } + target.activeRecordings = recordingHelper.listActiveRecordings(target); + target.persist(); + } +} diff --git a/src/main/java/io/cryostat/targets/TargetUpdateService.java b/src/main/java/io/cryostat/targets/TargetUpdateService.java new file mode 100644 index 000000000..54a6ac268 --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetUpdateService.java @@ -0,0 +1,131 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.time.Instant; +import java.util.Date; +import java.util.Map; + +import io.cryostat.ConfigProperties; +import io.cryostat.credentials.Credential; +import io.cryostat.expressions.MatchExpressionEvaluator; +import io.cryostat.targets.Target.TargetDiscovery; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.vertx.ConsumeEvent; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +@ApplicationScoped +public class TargetUpdateService { + + @Inject Logger logger; + @Inject Scheduler scheduler; + @Inject MatchExpressionEvaluator matchExpressionEvaluator; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + void onStart(@Observes StartupEvent evt) throws SchedulerException { + logger.tracev("{0} started", getClass().getName()); + + JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); + + Trigger trigger = + TriggerBuilder.newTrigger() + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds( + (int) (connectionTimeout.toSeconds() * 2)) + .repeatForever() + .withMisfireHandlingInstructionNowWithExistingCount()) + .startAt( + Date.from( + Instant.now() + .plusSeconds( + (int) (connectionTimeout.toSeconds() * 2)))) + .build(); + scheduler.scheduleJob(jobDetail, trigger); + } + + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + scheduler.shutdown(); + } + + @ConsumeEvent(Credential.CREDENTIALS_STORED) + void onCredentialsStored(Credential credential) { + updateTargetsForExpression(credential); + } + + @ConsumeEvent(Credential.CREDENTIALS_UPDATED) + void onCredentialsUpdated(Credential credential) { + updateTargetsForExpression(credential); + } + + @ConsumeEvent(Credential.CREDENTIALS_DELETED) + void onCredentialsDeleted(Credential credential) { + updateTargetsForExpression(credential); + } + + private void updateTargetsForExpression(Credential credential) { + for (Target target : + matchExpressionEvaluator.getMatchedTargets(credential.matchExpression)) { + try { + fireTargetUpdate(target); + } catch (SchedulerException se) { + logger.warn(se); + } + } + } + + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY) + void onMessage(TargetDiscovery event) throws SchedulerException { + switch (event.kind()) { + case MODIFIED: + // fall-through + case FOUND: + fireTargetUpdate(event.serviceRef()); + break; + default: + // no-op + break; + } + } + + private void fireTargetUpdate(Target target) throws SchedulerException { + JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); + Map data = jobDetail.getJobDataMap(); + data.put("targetId", target.id); + Trigger trigger = + TriggerBuilder.newTrigger() + .startAt(Date.from(Instant.now().plusSeconds(1))) + .usingJobData(jobDetail.getJobDataMap()) + .build(); + scheduler.scheduleJob(jobDetail, trigger); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f068086b5..ba4ecb4f8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -100,6 +100,7 @@ storage-ext.url= storage.presigned-downloads.enabled=false storage.transient-archives.enabled=false storage.transient-archives.ttl=60s +storage.buckets.creation-retry.period=10s storage.buckets.archives.name=archivedrecordings storage.buckets.archives.expiration-label=expiration storage.buckets.event-templates.name=eventtemplates