From 3a12aa0aedc8e1e0b006ed34bd6a88380aca3226 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 9 Oct 2024 10:07:05 -0400 Subject: [PATCH 01/38] fix(discovery): k8s discovery synchronization for stale lost targets --- .../cryostat/discovery/KubeApiDiscovery.java | 83 ++++++++++--------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index e9ccd2abe..9fe573fbb 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -69,7 +69,7 @@ import org.jboss.logging.Logger; @ApplicationScoped -public class KubeApiDiscovery { +public class KubeApiDiscovery implements ResourceEventHandler { public static final String REALM = "KubernetesApi"; public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace"; @@ -98,6 +98,8 @@ public class KubeApiDiscovery { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; + private final Object txLock = new Object(); + private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @Override @@ -116,12 +118,12 @@ protected HashMap> initialize() client.endpoints() .inNamespace(ns) .inform( - new EndpointsHandler(), + KubeApiDiscovery.this, informerResyncPeriod.toMillis())); logger.debugv( - "Started Endpoints SharedInformer for" - + " namespace \"{0}\"", - ns); + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); }); return result; } @@ -178,6 +180,43 @@ boolean available() { return false; } + @Override + public void onAdd(Endpoints endpoints) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + } + } + + @Override + public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), + newEndpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); + } + } + + @Override + public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); + } + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + } + } + private boolean isCompatiblePort(EndpointPort port) { return jmxPortNames.orElse(EMPTY_PORT_NAMES).contains(port.getName()) || jmxPortNumbers.orElse(EMPTY_PORT_NUMBERS).contains(port.getPort()); @@ -531,40 +570,6 @@ boolean kubeApiAvailable() { } } - private final class EndpointsHandler implements ResourceEventHandler { - @Override - public void onAdd(Endpoints endpoints) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } - - @Override - public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), - newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); - } - - @Override - public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); - return; - } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } - } - private static record EndpointDiscoveryEvent( String namespace, Target target, ObjectReference objRef, EventKind eventKind) { static EndpointDiscoveryEvent from( From ca9c97fdce5f93110a55ffab1b048da0a47a5cb2 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 9 Oct 2024 11:13:33 -0400 Subject: [PATCH 02/38] synchronize on lock while building/pruning owner chains to avoid duplicate entity creation in different transactions --- .../cryostat/discovery/KubeApiDiscovery.java | 152 +++++++++--------- 1 file changed, 78 insertions(+), 74 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 9fe573fbb..08f011b9e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -274,88 +274,92 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException @ConsumeEvent(blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { - String namespace = evt.namespace; - DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); - DiscoveryNode nsNode = - DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) - .orElse( - DiscoveryNode.environment( - namespace, KubeDiscoveryNodeType.NAMESPACE)); + synchronized (txLock) { + String namespace = evt.namespace; + DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + DiscoveryNode nsNode = + DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) + .orElse( + DiscoveryNode.environment( + namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + try { + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; + } + realm.persist(); + } catch (Exception e) { + logger.warn("Endpoint handler exception", e); } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); } } private void handleObservedEndpoints(String namespace) { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() - .filter( - (n) -> - namespace.equals( - n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); - } - - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put(t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + synchronized (txLock) { + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() + .filter( + (n) -> + namespace.equals( + n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Map targetRefMap = new HashMap<>(); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); + } - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put(t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, t, null, EventKind.LOST))); + } } private void notify(EndpointDiscoveryEvent evt) { From 15934f57989cea751fc0d827c222609c4227b84c Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 15 Oct 2024 16:20:19 -0400 Subject: [PATCH 03/38] do not perform node query in toString --- .../cryostat/discovery/KubeApiDiscovery.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 08f011b9e..3f47ffdaf 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -227,7 +227,14 @@ List tuplesFromEndpoints(Endpoints endpoints) { for (EndpointSubset subset : endpoints.getSubsets()) { for (EndpointPort port : subset.getPorts()) { for (EndpointAddress addr : subset.getAddresses()) { - tts.add(new TargetTuple(addr.getTargetRef(), addr, port)); + var ref = addr.getTargetRef(); + tts.add( + new TargetTuple( + ref, + queryForNode(ref.getNamespace(), ref.getName(), ref.getKind()) + .getLeft(), + addr, + port)); } } } @@ -584,11 +591,14 @@ static EndpointDiscoveryEvent from( private class TargetTuple { ObjectReference objRef; + HasMetadata obj; EndpointAddress addr; EndpointPort port; - TargetTuple(ObjectReference objRef, EndpointAddress addr, EndpointPort port) { + TargetTuple( + ObjectReference objRef, HasMetadata obj, EndpointAddress addr, EndpointPort port) { this.objRef = objRef; + this.obj = obj; this.addr = addr; this.port = port; } @@ -613,11 +623,6 @@ public Target toTarget() { "/jndi/rmi://" + host + ':' + port.getPort() + "/jmxrmi"); URI connectUrl = URI.create(jmxUrl.toString()); - Pair pair = - queryForNode(namespace, objRef.getName(), objRef.getKind()); - - HasMetadata obj = pair.getLeft(); - Target target = new Target(); target.activeRecordings = new ArrayList<>(); target.connectUrl = connectUrl; From 88397f8a31c14ae2981e47887cacead507020f61 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 16 Oct 2024 15:29:57 -0400 Subject: [PATCH 04/38] return existing node if already discovered --- .../io/cryostat/discovery/DiscoveryNode.java | 71 ++++++++++++------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/DiscoveryNode.java b/src/main/java/io/cryostat/discovery/DiscoveryNode.java index 3446ad51b..6bf524349 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryNode.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryNode.java @@ -31,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonView; import io.quarkus.hibernate.orm.panache.PanacheEntity; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.panache.common.Parameters; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; @@ -42,6 +43,8 @@ import jakarta.persistence.FetchType; import jakarta.persistence.JoinColumn; import jakarta.persistence.ManyToOne; +import jakarta.persistence.NamedQueries; +import jakarta.persistence.NamedQuery; import jakarta.persistence.OneToMany; import jakarta.persistence.OneToOne; import jakarta.persistence.PostPersist; @@ -56,6 +59,11 @@ @Entity @EntityListeners(DiscoveryNode.Listener.class) +@NamedQueries({ + @NamedQuery( + name = "DiscoveryNode.byTypeWithName", + query = "from DiscoveryNode where nodeType = :nodeType and name = :name") +}) public class DiscoveryNode extends PanacheEntity { public static final String NODE_TYPE = "nodeType"; @@ -129,33 +137,48 @@ public static List findAllByNodeType(NodeType nodeType) { } public static DiscoveryNode environment(String name, NodeType nodeType) { - return QuarkusTransaction.joiningExisting() - .call( - () -> { - DiscoveryNode node = new DiscoveryNode(); - node.name = name; - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(); - node.children = new ArrayList<>(); - node.target = null; - node.persist(); - return node; - }); + var kind = nodeType.getKind(); + return DiscoveryNode.find( + "#DiscoveryNode.byTypeWithName", + Parameters.with("nodeType", kind).and("name", name)) + .firstResultOptional() + .orElseGet( + () -> + QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = name; + node.nodeType = kind; + node.labels = new HashMap<>(); + node.children = new ArrayList<>(); + node.target = null; + node.persist(); + return node; + })); } public static DiscoveryNode target(Target target, NodeType nodeType) { - return QuarkusTransaction.joiningExisting() - .call( - () -> { - DiscoveryNode node = new DiscoveryNode(); - node.name = target.connectUrl.toString(); - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(target.labels); - node.children = null; - node.target = target; - node.persist(); - return node; - }); + var kind = nodeType.getKind(); + var connectUrl = target.connectUrl.toString(); + return DiscoveryNode.find( + "#DiscoveryNode.byTypeWithName", + Parameters.with("nodeType", kind).and("name", connectUrl)) + .firstResultOptional() + .orElseGet( + () -> + QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = connectUrl; + node.nodeType = kind; + node.labels = new HashMap<>(target.labels); + node.children = null; + node.target = target; + node.persist(); + return node; + })); } @Override From 4eee22c8e17c1a4de4bfac5f4b0367c33ca41df9 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 11:14:59 -0400 Subject: [PATCH 05/38] remove unnecessary locking --- .../cryostat/discovery/KubeApiDiscovery.java | 195 ++++++++---------- 1 file changed, 91 insertions(+), 104 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 3f47ffdaf..141489ea0 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -98,8 +98,6 @@ public class KubeApiDiscovery implements ResourceEventHandler { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; - private final Object txLock = new Object(); - private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @Override @@ -182,39 +180,32 @@ boolean available() { @Override public void onAdd(Endpoints endpoints) { - synchronized (txLock) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); } @Override public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - synchronized (txLock) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), - newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); - } + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), newEndpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); } @Override public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - synchronized (txLock) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); - } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); } + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); } private boolean isCompatiblePort(EndpointPort port) { @@ -281,92 +272,88 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException @ConsumeEvent(blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { - synchronized (txLock) { - String namespace = evt.namespace; - DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); - DiscoveryNode nsNode = - DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) - .orElse( - DiscoveryNode.environment( - namespace, KubeDiscoveryNodeType.NAMESPACE)); + String namespace = evt.namespace; + DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + DiscoveryNode nsNode = + DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) + .orElse( + DiscoveryNode.environment( + namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + try { + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; - } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; } + realm.persist(); + } catch (Exception e) { + logger.warn("Endpoint handler exception", e); } } private void handleObservedEndpoints(String namespace) { - synchronized (txLock) { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() - .filter( - (n) -> - namespace.equals( - n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); - } - - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put(t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() + .filter( + (n) -> + namespace.equals( + n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Map targetRefMap = new HashMap<>(); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); } + + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put(t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, t, null, EventKind.LOST))); } private void notify(EndpointDiscoveryEvent evt) { From c066ddb4f492b171c63376f802810328e967688a Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 11:17:59 -0400 Subject: [PATCH 06/38] Revert "remove unnecessary locking" This reverts commit c3bb2dfdae3528380b775eb1517e7808e96907d8. --- .../cryostat/discovery/KubeApiDiscovery.java | 195 ++++++++++-------- 1 file changed, 104 insertions(+), 91 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 141489ea0..3f47ffdaf 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -98,6 +98,8 @@ public class KubeApiDiscovery implements ResourceEventHandler { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; + private final Object txLock = new Object(); + private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @Override @@ -180,32 +182,39 @@ boolean available() { @Override public void onAdd(Endpoints endpoints) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + synchronized (txLock) { + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + } } @Override public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); + synchronized (txLock) { + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), + newEndpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); + } } @Override public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); + synchronized (txLock) { + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); + } + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); } private boolean isCompatiblePort(EndpointPort port) { @@ -272,88 +281,92 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException @ConsumeEvent(blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { - String namespace = evt.namespace; - DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); - DiscoveryNode nsNode = - DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) - .orElse( - DiscoveryNode.environment( - namespace, KubeDiscoveryNodeType.NAMESPACE)); + synchronized (txLock) { + String namespace = evt.namespace; + DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + DiscoveryNode nsNode = + DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) + .orElse( + DiscoveryNode.environment( + namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + try { + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; + } + realm.persist(); + } catch (Exception e) { + logger.warn("Endpoint handler exception", e); } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); } } private void handleObservedEndpoints(String namespace) { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() - .filter( - (n) -> - namespace.equals( - n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); - } - - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put(t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + synchronized (txLock) { + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() + .filter( + (n) -> + namespace.equals( + n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Map targetRefMap = new HashMap<>(); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); + } - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put(t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, t, null, EventKind.LOST))); + } } private void notify(EndpointDiscoveryEvent evt) { From 9bbf6cdb79165dca02374795bb85433a4663f51b Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 11:32:48 -0400 Subject: [PATCH 07/38] start transaction for active recording list request --- src/main/java/io/cryostat/recordings/ActiveRecordings.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 99520b4bc..2d7ea05ca 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -70,6 +70,8 @@ public class ActiveRecordings { Duration connectionFailedTimeout; @GET + @Blocking + @Transactional @RolesAllowed("read") public List list(@RestPath long targetId) throws Exception { Target target = Target.find("id", targetId).singleResult(); From e23aa7ad71bca00e48d320f084e51d0e998f3b97 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 14:22:22 -0400 Subject: [PATCH 08/38] cleanup --- .../java/io/cryostat/discovery/KubeApiDiscovery.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 3f47ffdaf..251f6bdbd 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -54,7 +54,6 @@ import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.mutiny.core.eventbus.EventBus; -import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; @@ -129,9 +128,7 @@ protected HashMap> initialize() } }; - // Priority is set higher than default 0 such that onStart is called first before onAfterStart - // This ensures realm node is persisted before initializing informers - void onStart(@Observes @Priority(1) StartupEvent evt) { + void onStart(@Observes StartupEvent evt) { if (!enabled()) { return; } @@ -142,12 +139,6 @@ void onStart(@Observes @Priority(1) StartupEvent evt) { } logger.debugv("Starting {0} client", REALM); - } - - void onAfterStart(@Observes StartupEvent evt) { - if (!enabled() || !available()) { - return; - } safeGetInformers(); } From e0473db538648a0736b8315d48e8606a7e020b9c Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 14:23:17 -0400 Subject: [PATCH 09/38] cleanup --- src/main/java/io/cryostat/discovery/KubeApiDiscovery.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 251f6bdbd..7cecf1db6 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -242,13 +242,11 @@ private List getTargetTuplesFrom(Endpoints endpoints) { } private Map> safeGetInformers() { - Map> informers; try { - informers = nsInformers.get(); + return nsInformers.get(); } catch (ConcurrentException e) { throw new IllegalStateException(e); } - return informers; } private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException { From 15b7bb088dc5915fadf2327d3977ae748ff031f8 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 14:25:00 -0400 Subject: [PATCH 10/38] rename --- .../java/io/cryostat/discovery/KubeApiDiscovery.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 7cecf1db6..a3b55feef 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -97,7 +97,7 @@ public class KubeApiDiscovery implements ResourceEventHandler { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; - private final Object txLock = new Object(); + private final Object updateLock = new Object(); private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @@ -173,7 +173,7 @@ boolean available() { @Override public void onAdd(Endpoints endpoints) { - synchronized (txLock) { + synchronized (updateLock) { logger.debugv( "Endpoint {0} created in namespace {1}", endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); @@ -184,7 +184,7 @@ public void onAdd(Endpoints endpoints) { @Override public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - synchronized (txLock) { + synchronized (updateLock) { logger.debugv( "Endpoint {0} modified in namespace {1}", newEndpoints.getMetadata().getName(), @@ -196,7 +196,7 @@ public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { @Override public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - synchronized (txLock) { + synchronized (updateLock) { logger.debugv( "Endpoint {0} deleted in namespace {1}", endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); @@ -270,7 +270,7 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException @ConsumeEvent(blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { - synchronized (txLock) { + synchronized (updateLock) { String namespace = evt.namespace; DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); DiscoveryNode nsNode = @@ -301,7 +301,7 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { } private void handleObservedEndpoints(String namespace) { - synchronized (txLock) { + synchronized (updateLock) { List targetNodes = DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() .filter( From df5af8d125db1f4f072bf091ef63bb28d53b3999 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 14:30:55 -0400 Subject: [PATCH 11/38] simplify transaction ordering and locking --- .../cryostat/discovery/KubeApiDiscovery.java | 206 +++++++++--------- 1 file changed, 100 insertions(+), 106 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index a3b55feef..6410843ba 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -97,8 +97,6 @@ public class KubeApiDiscovery implements ResourceEventHandler { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; - private final Object updateLock = new Object(); - private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @Override @@ -173,39 +171,29 @@ boolean available() { @Override public void onAdd(Endpoints endpoints) { - synchronized (updateLock) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + handleObservedEndpoints(endpoints.getMetadata().getNamespace()); } @Override public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - synchronized (updateLock) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), - newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); - } + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), newEndpoints.getMetadata().getNamespace()); + handleObservedEndpoints(newEndpoints.getMetadata().getNamespace()); } @Override public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - synchronized (updateLock) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); - } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); } + handleObservedEndpoints(endpoints.getMetadata().getNamespace()); } private boolean isCompatiblePort(EndpointPort port) { @@ -270,92 +258,98 @@ private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException @ConsumeEvent(blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { - synchronized (updateLock) { - String namespace = evt.namespace; - DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); - DiscoveryNode nsNode = - DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) - .orElse( - DiscoveryNode.environment( - namespace, KubeDiscoveryNodeType.NAMESPACE)); + String namespace = evt.namespace; + DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); + DiscoveryNode nsNode = + DiscoveryNode.getChild(realm, n -> n.name.equals(namespace)) + .orElse( + DiscoveryNode.environment( + namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + try { + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; - } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; } + realm.persist(); + } catch (Exception e) { + logger.warn("Endpoint handler exception", e); } } - private void handleObservedEndpoints(String namespace) { - synchronized (updateLock) { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() - .filter( - (n) -> - namespace.equals( - n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); - } - - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put(t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, t, null, EventKind.LOST))); - } + private synchronized void handleObservedEndpoints(String namespace) { + QuarkusTransaction.joiningExisting() + .run( + () -> { + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT) + .stream() + .filter( + (n) -> + namespace.equals( + n.labels.get( + DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Map targetRefMap = new HashMap<>(); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); + } + + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put( + t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + null, + EventKind.LOST))); + }); } private void notify(EndpointDiscoveryEvent evt) { From a805cc09b9a5fd206e8abd01b7ce7ca8b5fd3dfa Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 14:59:55 -0400 Subject: [PATCH 12/38] refactor --- .../java/io/cryostat/discovery/KubeApiDiscovery.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 6410843ba..49e8421d6 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -237,10 +237,10 @@ private Map> safeGetInformers() { } } - private boolean isTargetUnderRealm(URI connectUrl) throws IllegalStateException { + private boolean isTargetUnderRealm(Target target) throws IllegalStateException { // Check for any targets with the same connectUrl in other realms try { - Target persistedTarget = Target.getTargetByConnectUrl(connectUrl); + Target persistedTarget = Target.getTargetByConnectUrl(target.connectUrl); String realmOfTarget = persistedTarget.annotations.cryostat().get("REALM"); if (!REALM.equals(realmOfTarget)) { logger.warnv( @@ -357,7 +357,7 @@ private void notify(EndpointDiscoveryEvent evt) { } private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { - if (!isTargetUnderRealm(target.connectUrl)) { + if (!isTargetUnderRealm(target)) { logger.infov( "Target with serviceURL {0} does not exist in discovery tree. Skipped deleting", target.connectUrl); @@ -394,9 +394,9 @@ private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { } private void buildOwnerChain(DiscoveryNode nsNode, Target target, ObjectReference targetRef) { - if (isTargetUnderRealm(target.connectUrl)) { + if (isTargetUnderRealm(target)) { logger.infov( - "Target with serviceURL {0} already exist in discovery tree. Skipped adding", + "Target with serviceURL {0} already exists in discovery tree. Skipped adding", target.connectUrl); return; } From faddfd2cc47264e69e79d38da6acf60d8f7f8767 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 15:00:07 -0400 Subject: [PATCH 13/38] force resync on existing period --- .../java/io/cryostat/discovery/KubeApiDiscovery.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 49e8421d6..bfff5ee1c 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -27,6 +27,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -85,6 +88,8 @@ public class KubeApiDiscovery implements ResourceEventHandler { @Inject EventBus bus; + ScheduledExecutorService resyncWorker = Executors.newSingleThreadScheduledExecutor(); + @ConfigProperty(name = "cryostat.discovery.kubernetes.enabled") boolean enabled; @@ -138,6 +143,11 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); + resyncWorker.scheduleAtFixedRate( + () -> kubeConfig.getWatchNamespaces().forEach(this::handleObservedEndpoints), + informerResyncPeriod.toMillis(), + informerResyncPeriod.toMillis(), + TimeUnit.MILLISECONDS); } void onStop(@Observes ShutdownEvent evt) { @@ -146,6 +156,7 @@ void onStop(@Observes ShutdownEvent evt) { } logger.debugv("Shutting down {0} client", REALM); + resyncWorker.shutdown(); safeGetInformers() .forEach( (ns, informer) -> { From 9be1431abcf5ddf46fce9fd3f308d33b5df31115 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 15:03:03 -0400 Subject: [PATCH 14/38] lower log level --- src/main/java/io/cryostat/discovery/KubeApiDiscovery.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index bfff5ee1c..ecd17f03e 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -369,7 +369,7 @@ private void notify(EndpointDiscoveryEvent evt) { private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { if (!isTargetUnderRealm(target)) { - logger.infov( + logger.debugv( "Target with serviceURL {0} does not exist in discovery tree. Skipped deleting", target.connectUrl); return; @@ -406,7 +406,7 @@ private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { private void buildOwnerChain(DiscoveryNode nsNode, Target target, ObjectReference targetRef) { if (isTargetUnderRealm(target)) { - logger.infov( + logger.debugv( "Target with serviceURL {0} already exists in discovery tree. Skipped adding", target.connectUrl); return; From fccdc0770251d908aa96a8b5638e3e550286d7ee Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 16:09:35 -0400 Subject: [PATCH 15/38] ensure full sync on startup --- src/main/java/io/cryostat/discovery/KubeApiDiscovery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index ecd17f03e..7d7f70d04 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -145,7 +145,7 @@ void onStart(@Observes StartupEvent evt) { safeGetInformers(); resyncWorker.scheduleAtFixedRate( () -> kubeConfig.getWatchNamespaces().forEach(this::handleObservedEndpoints), - informerResyncPeriod.toMillis(), + 0, informerResyncPeriod.toMillis(), TimeUnit.MILLISECONDS); } From bd42f0bf5807ffacb0808f6ab3e9b91874b93800 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 17 Oct 2024 16:30:17 -0400 Subject: [PATCH 16/38] periodically retry storage bucket creation --- src/main/java/io/cryostat/StorageBuckets.java | 42 ++++++++++++++++++- src/main/resources/application.properties | 1 + 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/StorageBuckets.java b/src/main/java/io/cryostat/StorageBuckets.java index 3688668f9..87b9fa9bd 100644 --- a/src/main/java/io/cryostat/StorageBuckets.java +++ b/src/main/java/io/cryostat/StorageBuckets.java @@ -15,10 +15,21 @@ */ package io.cryostat; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import io.cryostat.util.HttpStatusCodeIdentifier; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -30,7 +41,17 @@ public class StorageBuckets { @Inject S3Client storage; @Inject Logger logger; + @ConfigProperty(name = "storage.buckets.creation-retry.period") + Duration creationRetryPeriod; + + private final Set buckets = ConcurrentHashMap.newKeySet(); + private final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor(); + public void createIfNecessary(String bucket) { + buckets.add(bucket); + } + + private boolean tryCreate(String bucket) { boolean exists = false; logger.debugv("Checking if storage bucket \"{0}\" exists ...", bucket); try { @@ -49,8 +70,27 @@ public void createIfNecessary(String bucket) { storage.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); logger.debugv("Storage bucket \"{0}\" created", bucket); } catch (Exception e) { - logger.error(e); + logger.warn(e); + return false; } } + return true; + } + + void onStart(@Observes StartupEvent evt) { + worker.scheduleAtFixedRate( + () -> { + var it = buckets.iterator(); + while (it.hasNext()) { + if (tryCreate(it.next())) it.remove(); + } + }, + 0, + creationRetryPeriod.toMillis(), + TimeUnit.MILLISECONDS); + } + + void onStop(@Observes ShutdownEvent evt) { + worker.shutdown(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f068086b5..ba4ecb4f8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -100,6 +100,7 @@ storage-ext.url= storage.presigned-downloads.enabled=false storage.transient-archives.enabled=false storage.transient-archives.ttl=60s +storage.buckets.creation-retry.period=10s storage.buckets.archives.name=archivedrecordings storage.buckets.archives.expiration-label=expiration storage.buckets.event-templates.name=eventtemplates From 966297ef7e13243fb7a57daf0b02fcad2da8c9f7 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 22 Oct 2024 16:08:04 -0400 Subject: [PATCH 17/38] ensure ordered queue processing of transactions, error handling --- .../cryostat/discovery/KubeApiDiscovery.java | 179 ++++++++++-------- 1 file changed, 96 insertions(+), 83 deletions(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 7d7f70d04..54b93b1fb 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -48,14 +48,11 @@ import io.fabric8.kubernetes.api.model.ObjectReference; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; -import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; @@ -72,6 +69,10 @@ @ApplicationScoped public class KubeApiDiscovery implements ResourceEventHandler { + + private static final String NAMESPACE_QUERY_ADDR = "NS_QUERY"; + private static final String ENDPOINTS_DISCOVERY_ADDR = "ENDPOINTS_DISC"; + public static final String REALM = "KubernetesApi"; public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace"; @@ -144,7 +145,14 @@ void onStart(@Observes StartupEvent evt) { logger.debugv("Starting {0} client", REALM); safeGetInformers(); resyncWorker.scheduleAtFixedRate( - () -> kubeConfig.getWatchNamespaces().forEach(this::handleObservedEndpoints), + () -> { + try { + logger.debugv("Resyncing"); + notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); + } catch (Exception e) { + logger.warn(e); + } + }, 0, informerResyncPeriod.toMillis(), TimeUnit.MILLISECONDS); @@ -185,7 +193,7 @@ public void onAdd(Endpoints endpoints) { logger.debugv( "Endpoint {0} created in namespace {1}", endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - handleObservedEndpoints(endpoints.getMetadata().getNamespace()); + notify(NamespaceQueryEvent.from(endpoints.getMetadata().getNamespace())); } @Override @@ -193,7 +201,7 @@ public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { logger.debugv( "Endpoint {0} modified in namespace {1}", newEndpoints.getMetadata().getName(), newEndpoints.getMetadata().getNamespace()); - handleObservedEndpoints(newEndpoints.getMetadata().getNamespace()); + notify(NamespaceQueryEvent.from(newEndpoints.getMetadata().getNamespace())); } @Override @@ -204,7 +212,7 @@ public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { if (deletedFinalStateUnknown) { logger.warnv("Deleted final state unknown: {0}", endpoints); } - handleObservedEndpoints(endpoints.getMetadata().getNamespace()); + notify(NamespaceQueryEvent.from(endpoints.getMetadata().getNamespace())); } private boolean isCompatiblePort(EndpointPort port) { @@ -266,8 +274,75 @@ private boolean isTargetUnderRealm(Target target) throws IllegalStateException { return false; } - @ConsumeEvent(blocking = true, ordered = true) + @ConsumeEvent(value = NAMESPACE_QUERY_ADDR, blocking = true, ordered = true) @Transactional(TxType.REQUIRES_NEW) + public void handleQueryEvent(NamespaceQueryEvent evt) { + Map targetRefMap = new HashMap<>(); + + for (var namespace : evt.namespaces) { + try { + List targetNodes = + DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT).stream() + .filter( + (n) -> + namespace.equals( + n.labels.get( + DISCOVERY_NAMESPACE_LABEL_KEY))) + .collect(Collectors.toList()); + + Set persistedTargets = new HashSet<>(); + for (DiscoveryNode node : targetNodes) { + persistedTargets.add(node.target); + } + + Set observedTargets = + safeGetInformers().get(namespace).getStore().list().stream() + .map((endpoint) -> getTargetTuplesFrom(endpoint)) + .flatMap(List::stream) + .filter((tuple) -> Objects.nonNull(tuple.objRef)) + .map( + (tuple) -> { + Target t = tuple.toTarget(); + if (t != null) { + targetRefMap.put(t.connectUrl, tuple.objRef); + } + return t; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // Prune deleted targets + Target.compare(persistedTargets) + .to(observedTargets) + .removed() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, t, null, EventKind.LOST))); + + // Add new targets + Target.compare(persistedTargets) + .to(observedTargets) + .added() + .forEach( + (t) -> + notify( + EndpointDiscoveryEvent.from( + namespace, + t, + targetRefMap.get(t.connectUrl), + EventKind.FOUND))); + } catch (Exception e) { + logger.error( + String.format("Failed to syncronize Endpoints in namespace %s", namespace), + e); + } + } + } + + @ConsumeEvent(value = ENDPOINTS_DISCOVERY_ADDR, blocking = true, ordered = true) + @Transactional(TxType.REQUIRED) public void handleEndpointEvent(EndpointDiscoveryEvent evt) { String namespace = evt.namespace; DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow(); @@ -297,74 +372,12 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { } } - private synchronized void handleObservedEndpoints(String namespace) { - QuarkusTransaction.joiningExisting() - .run( - () -> { - List targetNodes = - DiscoveryNode.findAllByNodeType(KubeDiscoveryNodeType.ENDPOINT) - .stream() - .filter( - (n) -> - namespace.equals( - n.labels.get( - DISCOVERY_NAMESPACE_LABEL_KEY))) - .collect(Collectors.toList()); - - Map targetRefMap = new HashMap<>(); - - Set persistedTargets = new HashSet<>(); - for (DiscoveryNode node : targetNodes) { - persistedTargets.add(node.target); - } - - Set observedTargets = - safeGetInformers().get(namespace).getStore().list().stream() - .map((endpoint) -> getTargetTuplesFrom(endpoint)) - .flatMap(List::stream) - .filter((tuple) -> Objects.nonNull(tuple.objRef)) - .map( - (tuple) -> { - Target t = tuple.toTarget(); - if (t != null) { - targetRefMap.put( - t.connectUrl, tuple.objRef); - } - return t; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - // Add new targets - Target.compare(persistedTargets) - .to(observedTargets) - .added() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - targetRefMap.get(t.connectUrl), - EventKind.FOUND))); - - // Prune deleted targets - Target.compare(persistedTargets) - .to(observedTargets) - .removed() - .forEach( - (t) -> - notify( - EndpointDiscoveryEvent.from( - namespace, - t, - null, - EventKind.LOST))); - }); + private void notify(NamespaceQueryEvent evt) { + bus.publish(NAMESPACE_QUERY_ADDR, evt); } private void notify(EndpointDiscoveryEvent evt) { - bus.publish(KubeApiDiscovery.class.getName(), evt); + bus.publish(ENDPOINTS_DISCOVERY_ADDR, evt); } private void pruneOwnerChain(DiscoveryNode nsNode, Target target) { @@ -538,16 +551,6 @@ static final class KubeConfig { @ConfigProperty(name = "cryostat.discovery.kubernetes.namespace-path") String namespacePath; - private final LazyInitializer kubeClient = - new LazyInitializer() { - @Override - protected KubernetesClient initialize() throws ConcurrentException { - return new KubernetesClientBuilder() - .withTaskExecutor(Infrastructure.getDefaultWorkerPool()) - .build(); - } - }; - Collection getWatchNamespaces() { return watchNamespaces.orElse(List.of()).stream() .map( @@ -575,6 +578,16 @@ boolean kubeApiAvailable() { } } + private static record NamespaceQueryEvent(Collection namespaces) { + static NamespaceQueryEvent from(Collection namespaces) { + return new NamespaceQueryEvent(namespaces); + } + + static NamespaceQueryEvent from(String namespace) { + return new NamespaceQueryEvent(List.of(namespace)); + } + } + private static record EndpointDiscoveryEvent( String namespace, Target target, ObjectReference objRef, EventKind eventKind) { static EndpointDiscoveryEvent from( From facfe29b35f925fa5f11bbafaef32dfa6f5fe037 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 22 Oct 2024 16:10:54 -0400 Subject: [PATCH 18/38] fixup! ensure ordered queue processing of transactions, error handling --- src/main/java/io/cryostat/discovery/KubeApiDiscovery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 54b93b1fb..8bbf10cae 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -147,7 +147,7 @@ void onStart(@Observes StartupEvent evt) { resyncWorker.scheduleAtFixedRate( () -> { try { - logger.debugv("Resyncing"); + logger.debug("Resyncing"); notify(NamespaceQueryEvent.from(kubeConfig.getWatchNamespaces())); } catch (Exception e) { logger.warn(e); From 5dc0591ed00e2873e5bf1aeb3293e7ba01e1a322 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 10:13:41 -0400 Subject: [PATCH 19/38] decouple target JVM ID retrieval from persistence, handle in separate worker threads, attempt reconnections periodically --- .../cryostat/discovery/KubeApiDiscovery.java | 28 ++- .../java/io/cryostat/rules/RuleService.java | 5 + src/main/java/io/cryostat/targets/Target.java | 89 +--------- .../targets/TargetJvmIdUpdateJob.java | 83 +++++++++ .../targets/TargetJvmIdUpdateService.java | 165 ++++++++++++++++++ 5 files changed, 271 insertions(+), 99 deletions(-) create mode 100644 src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java create mode 100644 src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index 8bbf10cae..685dbd92c 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -352,24 +352,20 @@ public void handleEndpointEvent(EndpointDiscoveryEvent evt) { DiscoveryNode.environment( namespace, KubeDiscoveryNodeType.NAMESPACE)); - try { - if (evt.eventKind == EventKind.FOUND) { - buildOwnerChain(nsNode, evt.target, evt.objRef); - } else { - pruneOwnerChain(nsNode, evt.target); - } + if (evt.eventKind == EventKind.FOUND) { + buildOwnerChain(nsNode, evt.target, evt.objRef); + } else { + pruneOwnerChain(nsNode, evt.target); + } - if (!nsNode.hasChildren()) { - realm.children.remove(nsNode); - nsNode.parent = null; - } else if (!realm.children.contains(nsNode)) { - realm.children.add(nsNode); - nsNode.parent = realm; - } - realm.persist(); - } catch (Exception e) { - logger.warn("Endpoint handler exception", e); + if (!nsNode.hasChildren()) { + realm.children.remove(nsNode); + nsNode.parent = null; + } else if (!realm.children.contains(nsNode)) { + realm.children.add(nsNode); + nsNode.parent = realm; } + realm.persist(); } private void notify(NamespaceQueryEvent evt) { diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 164c53cdb..d080fe5f7 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -40,6 +40,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -86,6 +87,10 @@ void onStart(@Observes StartupEvent ev) { .forEach(this::applyRuleToMatchingTargets)); } + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + quartz.shutdown(); + } + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index 9bc733033..d722f7825 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -18,7 +18,6 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -28,17 +27,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; -import io.cryostat.ConfigProperties; -import io.cryostat.core.net.JFRConnection; -import io.cryostat.credentials.Credential; import io.cryostat.discovery.DiscoveryNode; -import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.libcryostat.JvmIdentifier; import io.cryostat.recordings.ActiveRecording; -import io.cryostat.recordings.RecordingHelper; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; @@ -46,7 +42,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.hibernate.orm.panache.PanacheEntity; -import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -63,15 +58,12 @@ import jakarta.persistence.PostRemove; import jakarta.persistence.PostUpdate; import jakarta.persistence.PrePersist; -import jakarta.transaction.Transactional; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import org.apache.commons.lang3.StringUtils; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.hibernate.annotations.JdbcTypeCode; import org.hibernate.type.SqlTypes; import org.jboss.logging.Logger; -import org.projectnessie.cel.tools.ScriptException; @Entity @EntityListeners(Target.Listener.class) @@ -291,53 +283,8 @@ static class Listener { @Inject Logger logger; @Inject EventBus bus; - @Inject TargetConnectionManager connectionManager; - @Inject RecordingHelper recordingHelper; - @Inject MatchExpressionEvaluator matchExpressionEvaluator; - - @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) - Duration timeout; - - @Transactional - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) - void onMessage(TargetDiscovery event) { - var target = Target.find("id", event.serviceRef().id).singleResultOptional(); - switch (event.kind()) { - case LOST: - // this should already be handled by the cascading deletion of the Target - // TODO verify this - break; - case FOUND: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - case MODIFIED: - target.ifPresent(recordingHelper::listActiveRecordings); - break; - default: - // no-op - break; - } - } - - @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) - @Transactional - void updateCredential(Credential credential) { - Target.stream("#Target.unconnected") - .forEach( - t -> { - try { - if (matchExpressionEvaluator.applies( - credential.matchExpression, t)) { - updateTargetJvmId(t, credential); - t.persist(); - } - } catch (ScriptException e) { - logger.error(e); - } catch (Exception e) { - logger.warn(e); - } - }); - } + ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); @PrePersist void prePersist(Target target) { @@ -358,35 +305,11 @@ void prePersist(Target target) { if (target.activeRecordings == null) { target.activeRecordings = new ArrayList<>(); } - - try { - if (StringUtils.isBlank(target.jvmId)) { - updateTargetJvmId(target, null); - } - } catch (Exception e) { - logger.warn(e); - } - } - - private void updateTargetJvmId(Target t, Credential credential) { - try { - t.jvmId = - connectionManager - .executeDirect( - t, - Optional.ofNullable(credential), - JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(timeout); - } catch (Exception e) { - logger.error(e); - } } @PostPersist void postPersist(Target target) { - notify(EventKind.FOUND, target); + scheduler.schedule(() -> notify(EventKind.FOUND, target), 1, TimeUnit.SECONDS); } @PostUpdate diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java new file mode 100644 index 000000000..0bac9462c --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -0,0 +1,83 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.net.JFRConnection; +import io.cryostat.libcryostat.JvmIdentifier; + +import io.quarkus.narayana.jta.QuarkusTransaction; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class TargetJvmIdUpdateJob implements Job { + + @Inject Logger logger; + @Inject TargetConnectionManager connectionManager; + ExecutorService executor = ForkJoinPool.commonPool(); + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + @Override + @Transactional + public void execute(JobExecutionContext context) throws JobExecutionException { + Target.stream("#Target.unconnected") + .forEach( + t -> { + executor.submit( + () -> { + try { + updateTargetJvmId(t.id); + } catch (Exception e) { + logger.warn(e); + } + }); + }); + } + + private void updateTargetJvmId(long id) { + QuarkusTransaction.requiringNew() + .run( + () -> { + try { + Target target = Target.getTargetById(id); + target.jvmId = + connectionManager + .executeDirect( + target, + Optional.empty(), + JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + target.persist(); + } catch (Exception e) { + logger.error(e); + } + }); + } +} diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java new file mode 100644 index 000000000..ce41141df --- /dev/null +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -0,0 +1,165 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.targets; + +import java.time.Duration; +import java.time.Instant; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +import io.cryostat.ConfigProperties; +import io.cryostat.core.net.JFRConnection; +import io.cryostat.credentials.Credential; +import io.cryostat.expressions.MatchExpressionEvaluator; +import io.cryostat.libcryostat.JvmIdentifier; +import io.cryostat.recordings.RecordingHelper; +import io.cryostat.targets.Target.TargetDiscovery; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.vertx.ConsumeEvent; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.transaction.Transactional; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; +import org.projectnessie.cel.tools.ScriptException; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +@ApplicationScoped +public class TargetJvmIdUpdateService { + + @Inject Logger logger; + @Inject TargetConnectionManager connectionManager; + @Inject RecordingHelper recordingHelper; + @Inject EntityManager entityManager; + @Inject MatchExpressionEvaluator matchExpressionEvaluator; + @Inject Scheduler scheduler; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration connectionTimeout; + + private final List jobs = new CopyOnWriteArrayList<>(); + + void onStart(@Observes StartupEvent evt) { + logger.tracev("{0} started", getClass().getName()); + + JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); + + if (jobs.contains(jobDetail.getKey())) { + return; + } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(30) + .repeatForever() + .withMisfireHandlingInstructionNowWithExistingCount()) + .startAt(Date.from(Instant.now().plusSeconds(30))) + .build(); + try { + scheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException e) { + logger.errorv(e, "Failed to schedule JVM ID updater job"); + } + jobs.add(jobDetail.getKey()); + } + + void onStop(@Observes ShutdownEvent evt) throws SchedulerException { + scheduler.shutdown(); + } + + @Transactional + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) + void onMessage(TargetDiscovery event) { + var target = Target.find("id", event.serviceRef().id).singleResultOptional(); + switch (event.kind()) { + case LOST: + // this should already be handled by the cascading deletion of the Target + // TODO verify this + break; + case MODIFIED: + // fall-through + case FOUND: + target.ifPresent( + t -> { + try { + logger.debugv("Updating JVM ID for {0} ({1})", t.connectUrl, t.id); + if (StringUtils.isBlank(t.jvmId)) { + updateTargetJvmId(t, null); + } + } catch (Exception e) { + logger.warn(e); + } + }); + target.ifPresent(recordingHelper::listActiveRecordings); + break; + default: + // no-op + break; + } + } + + @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) + @Transactional + void updateCredential(Credential credential) { + Target.stream("#Target.unconnected") + .forEach( + t -> { + try { + if (matchExpressionEvaluator.applies( + credential.matchExpression, t)) { + updateTargetJvmId(t, credential); + } + } catch (ScriptException e) { + logger.error(e); + } catch (Exception e) { + logger.warn(e); + } + }); + } + + private void updateTargetJvmId(Target t, Credential credential) { + try { + t.jvmId = + connectionManager + .executeDirect( + t, + Optional.ofNullable(credential), + JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + t.persist(); + } catch (Exception e) { + logger.error(e); + } + } +} From a18223012abc11385dbb2a54657522a5e087bd25 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 10:20:06 -0400 Subject: [PATCH 20/38] ensure JVM ID is nulled if connection fails --- src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 0bac9462c..e28bb2e53 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -63,8 +63,8 @@ private void updateTargetJvmId(long id) { QuarkusTransaction.requiringNew() .run( () -> { + Target target = Target.getTargetById(id); try { - Target target = Target.getTargetById(id); target.jvmId = connectionManager .executeDirect( @@ -74,10 +74,11 @@ private void updateTargetJvmId(long id) { .map(JvmIdentifier::getHash) .await() .atMost(connectionTimeout); - target.persist(); } catch (Exception e) { + target.jvmId = null; logger.error(e); } + target.persist(); }); } } From 4016408df486506042ee9423d0ceed6d4b6b66ea Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 10:36:50 -0400 Subject: [PATCH 21/38] delay MODIFIED events same as FOUND events --- src/main/java/io/cryostat/targets/Target.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index d722f7825..fabbf292a 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -314,7 +314,7 @@ void postPersist(Target target) { @PostUpdate void postUpdate(Target target) { - notify(EventKind.MODIFIED, target); + scheduler.schedule(() -> notify(EventKind.MODIFIED, target), 1, TimeUnit.SECONDS); } @PostRemove From 75568cd80a72e9193e1bed9777038fbf1e0a9a24 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 11:12:19 -0400 Subject: [PATCH 22/38] refactor to use quartz for scheduling delayed connection, and reuse logic for periodic updates vs on-discovery updates --- src/main/java/io/cryostat/targets/Target.java | 9 +- .../targets/TargetJvmIdUpdateJob.java | 49 +++++----- .../targets/TargetJvmIdUpdateService.java | 94 ++++--------------- 3 files changed, 49 insertions(+), 103 deletions(-) diff --git a/src/main/java/io/cryostat/targets/Target.java b/src/main/java/io/cryostat/targets/Target.java index fabbf292a..1389199b0 100644 --- a/src/main/java/io/cryostat/targets/Target.java +++ b/src/main/java/io/cryostat/targets/Target.java @@ -27,9 +27,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -283,8 +280,6 @@ static class Listener { @Inject Logger logger; @Inject EventBus bus; - ScheduledExecutorService scheduler = - Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); @PrePersist void prePersist(Target target) { @@ -309,12 +304,12 @@ void prePersist(Target target) { @PostPersist void postPersist(Target target) { - scheduler.schedule(() -> notify(EventKind.FOUND, target), 1, TimeUnit.SECONDS); + notify(EventKind.FOUND, target); } @PostUpdate void postUpdate(Target target) { - scheduler.schedule(() -> notify(EventKind.MODIFIED, target), 1, TimeUnit.SECONDS); + notify(EventKind.MODIFIED, target); } @PostRemove diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index e28bb2e53..22d176221 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -16,13 +16,14 @@ package io.cryostat.targets; import java.time.Duration; -import java.util.Optional; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import io.cryostat.ConfigProperties; import io.cryostat.core.net.JFRConnection; import io.cryostat.libcryostat.JvmIdentifier; +import io.cryostat.recordings.RecordingHelper; import io.quarkus.narayana.jta.QuarkusTransaction; import jakarta.inject.Inject; @@ -37,6 +38,7 @@ public class TargetJvmIdUpdateJob implements Job { @Inject Logger logger; @Inject TargetConnectionManager connectionManager; + @Inject RecordingHelper recordingHelper; ExecutorService executor = ForkJoinPool.commonPool(); @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) @@ -45,40 +47,45 @@ public class TargetJvmIdUpdateJob implements Job { @Override @Transactional public void execute(JobExecutionContext context) throws JobExecutionException { - Target.stream("#Target.unconnected") - .forEach( - t -> { - executor.submit( - () -> { - try { - updateTargetJvmId(t.id); - } catch (Exception e) { - logger.warn(e); - } - }); - }); + List targets; + long targetId = (long) context.getJobDetail().getJobDataMap().get("targetId"); + if (targetId > 0) { + targets = List.of(Target.getTargetById(targetId)); + } else { + targets = Target.find("#Target.unconnected").list(); + } + + targets.forEach( + t -> { + executor.submit( + () -> { + try { + updateTarget(t.id); + } catch (Exception e) { + logger.warn(e); + } + }); + }); } - private void updateTargetJvmId(long id) { + private void updateTarget(long id) { QuarkusTransaction.requiringNew() .run( () -> { - Target target = Target.getTargetById(id); try { + Target target = Target.getTargetById(id); target.jvmId = connectionManager - .executeDirect( - target, - Optional.empty(), - JFRConnection::getJvmIdentifier) + .executeConnectedTaskUni( + target, JFRConnection::getJvmIdentifier) .map(JvmIdentifier::getHash) .await() .atMost(connectionTimeout); + recordingHelper.listActiveRecordings(target); + target.persist(); } catch (Exception e) { - target.jvmId = null; logger.error(e); } - target.persist(); }); } } diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index ce41141df..dc3f0ac05 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -18,16 +18,9 @@ import java.time.Duration; import java.time.Instant; import java.util.Date; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Map; import io.cryostat.ConfigProperties; -import io.cryostat.core.net.JFRConnection; -import io.cryostat.credentials.Credential; -import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.libcryostat.JvmIdentifier; -import io.cryostat.recordings.RecordingHelper; import io.cryostat.targets.Target.TargetDiscovery; import io.quarkus.runtime.ShutdownEvent; @@ -36,15 +29,10 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; -import jakarta.persistence.EntityManager; -import jakarta.transaction.Transactional; -import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; -import org.projectnessie.cel.tools.ScriptException; import org.quartz.JobBuilder; import org.quartz.JobDetail; -import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SimpleScheduleBuilder; @@ -55,26 +43,16 @@ public class TargetJvmIdUpdateService { @Inject Logger logger; - @Inject TargetConnectionManager connectionManager; - @Inject RecordingHelper recordingHelper; - @Inject EntityManager entityManager; - @Inject MatchExpressionEvaluator matchExpressionEvaluator; @Inject Scheduler scheduler; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionTimeout; - private final List jobs = new CopyOnWriteArrayList<>(); - void onStart(@Observes StartupEvent evt) { logger.tracev("{0} started", getClass().getName()); JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); - if (jobs.contains(jobDetail.getKey())) { - return; - } - Trigger trigger = TriggerBuilder.newTrigger() .withSchedule( @@ -89,17 +67,14 @@ void onStart(@Observes StartupEvent evt) { } catch (SchedulerException e) { logger.errorv(e, "Failed to schedule JVM ID updater job"); } - jobs.add(jobDetail.getKey()); } void onStop(@Observes ShutdownEvent evt) throws SchedulerException { scheduler.shutdown(); } - @Transactional - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY) void onMessage(TargetDiscovery event) { - var target = Target.find("id", event.serviceRef().id).singleResultOptional(); switch (event.kind()) { case LOST: // this should already be handled by the cascading deletion of the Target @@ -108,58 +83,27 @@ void onMessage(TargetDiscovery event) { case MODIFIED: // fall-through case FOUND: - target.ifPresent( - t -> { - try { - logger.debugv("Updating JVM ID for {0} ({1})", t.connectUrl, t.id); - if (StringUtils.isBlank(t.jvmId)) { - updateTargetJvmId(t, null); - } - } catch (Exception e) { - logger.warn(e); - } - }); - target.ifPresent(recordingHelper::listActiveRecordings); + JobDetail jobDetail = + JobBuilder.newJob(TargetJvmIdUpdateJob.class) + .withIdentity(event.kind().name(), event.serviceRef().id.toString()) + .build(); + Map data = jobDetail.getJobDataMap(); + data.put("targetId", event.serviceRef().id); + + Trigger trigger = + TriggerBuilder.newTrigger() + .startAt(Date.from(Instant.now().plusSeconds(1))) + .usingJobData(jobDetail.getJobDataMap()) + .build(); + try { + scheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException e) { + logger.errorv(e, "Failed to schedule JVM ID updater job"); + } break; default: // no-op break; } } - - @ConsumeEvent(value = Credential.CREDENTIALS_STORED, blocking = true) - @Transactional - void updateCredential(Credential credential) { - Target.stream("#Target.unconnected") - .forEach( - t -> { - try { - if (matchExpressionEvaluator.applies( - credential.matchExpression, t)) { - updateTargetJvmId(t, credential); - } - } catch (ScriptException e) { - logger.error(e); - } catch (Exception e) { - logger.warn(e); - } - }); - } - - private void updateTargetJvmId(Target t, Credential credential) { - try { - t.jvmId = - connectionManager - .executeDirect( - t, - Optional.ofNullable(credential), - JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(connectionTimeout); - t.persist(); - } catch (Exception e) { - logger.error(e); - } - } } From d33e8cc330b05b28acf90543aa6779e4a992a678 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 11:13:08 -0400 Subject: [PATCH 23/38] remove unused case --- .../java/io/cryostat/targets/TargetJvmIdUpdateService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index dc3f0ac05..848b48b5f 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -76,10 +76,6 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY) void onMessage(TargetDiscovery event) { switch (event.kind()) { - case LOST: - // this should already be handled by the cascading deletion of the Target - // TODO verify this - break; case MODIFIED: // fall-through case FOUND: From a8ce783d9fae0cadc4530771baf0377488bee650 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 11:15:07 -0400 Subject: [PATCH 24/38] remove unnecessary job identity --- .../java/io/cryostat/targets/TargetJvmIdUpdateService.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index 848b48b5f..8155d0305 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -79,10 +79,7 @@ void onMessage(TargetDiscovery event) { case MODIFIED: // fall-through case FOUND: - JobDetail jobDetail = - JobBuilder.newJob(TargetJvmIdUpdateJob.class) - .withIdentity(event.kind().name(), event.serviceRef().id.toString()) - .build(); + JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); Map data = jobDetail.getJobDataMap(); data.put("targetId", event.serviceRef().id); From 94844e399333cacb73c7a30f6450f6aa2a081a72 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 13:21:10 -0400 Subject: [PATCH 25/38] handle nullable input data --- src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 22d176221..06e620bd7 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -48,8 +48,8 @@ public class TargetJvmIdUpdateJob implements Job { @Transactional public void execute(JobExecutionContext context) throws JobExecutionException { List targets; - long targetId = (long) context.getJobDetail().getJobDataMap().get("targetId"); - if (targetId > 0) { + Long targetId = (Long) context.getJobDetail().getJobDataMap().get("targetId"); + if (targetId != null) { targets = List.of(Target.getTargetById(targetId)); } else { targets = Target.find("#Target.unconnected").list(); From c2f5fd0951bba1adde2d5b92ed1fc271e1065ce0 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 13:21:33 -0400 Subject: [PATCH 26/38] slower initial delay, periodic delay based on connection timeout --- .../io/cryostat/targets/TargetJvmIdUpdateService.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index 8155d0305..13cfca4e0 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -57,10 +57,15 @@ void onStart(@Observes StartupEvent evt) { TriggerBuilder.newTrigger() .withSchedule( SimpleScheduleBuilder.simpleSchedule() - .withIntervalInSeconds(30) + .withIntervalInSeconds( + (int) (connectionTimeout.toSeconds() * 2)) .repeatForever() .withMisfireHandlingInstructionNowWithExistingCount()) - .startAt(Date.from(Instant.now().plusSeconds(30))) + .startAt( + Date.from( + Instant.now() + .plusSeconds( + (int) (connectionTimeout.toSeconds() * 2)))) .build(); try { scheduler.scheduleJob(jobDetail, trigger); @@ -85,7 +90,7 @@ void onMessage(TargetDiscovery event) { Trigger trigger = TriggerBuilder.newTrigger() - .startAt(Date.from(Instant.now().plusSeconds(1))) + .startAt(Date.from(Instant.now().plusSeconds(3))) .usingJobData(jobDetail.getJobDataMap()) .build(); try { From 95151f52330960eed2799e7fa695319b69e75ea2 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 13:21:45 -0400 Subject: [PATCH 27/38] unwrap exception handling so transactions can be rolled back --- .../targets/TargetJvmIdUpdateJob.java | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 06e620bd7..75b7b101b 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -57,14 +57,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException { targets.forEach( t -> { - executor.submit( - () -> { - try { - updateTarget(t.id); - } catch (Exception e) { - logger.warn(e); - } - }); + executor.submit(() -> updateTarget(t.id)); }); } @@ -72,20 +65,16 @@ private void updateTarget(long id) { QuarkusTransaction.requiringNew() .run( () -> { - try { - Target target = Target.getTargetById(id); - target.jvmId = - connectionManager - .executeConnectedTaskUni( - target, JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(connectionTimeout); - recordingHelper.listActiveRecordings(target); - target.persist(); - } catch (Exception e) { - logger.error(e); - } + Target target = Target.getTargetById(id); + target.jvmId = + connectionManager + .executeConnectedTaskUni( + target, JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + recordingHelper.listActiveRecordings(target); + target.persist(); }); } } From 2cec4835d7b313230db7403d86470daf5c917a94 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 13:57:55 -0400 Subject: [PATCH 28/38] cleanup --- .../cryostat/targets/TargetJvmIdUpdateJob.java | 5 +---- .../targets/TargetJvmIdUpdateService.java | 16 ++++------------ 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 75b7b101b..6bc60587e 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -55,10 +55,7 @@ public void execute(JobExecutionContext context) throws JobExecutionException { targets = Target.find("#Target.unconnected").list(); } - targets.forEach( - t -> { - executor.submit(() -> updateTarget(t.id)); - }); + targets.forEach(t -> executor.submit(() -> updateTarget(t.id))); } private void updateTarget(long id) { diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index 13cfca4e0..ed7161380 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -48,7 +48,7 @@ public class TargetJvmIdUpdateService { @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionTimeout; - void onStart(@Observes StartupEvent evt) { + void onStart(@Observes StartupEvent evt) throws SchedulerException { logger.tracev("{0} started", getClass().getName()); JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); @@ -67,11 +67,7 @@ void onStart(@Observes StartupEvent evt) { .plusSeconds( (int) (connectionTimeout.toSeconds() * 2)))) .build(); - try { - scheduler.scheduleJob(jobDetail, trigger); - } catch (SchedulerException e) { - logger.errorv(e, "Failed to schedule JVM ID updater job"); - } + scheduler.scheduleJob(jobDetail, trigger); } void onStop(@Observes ShutdownEvent evt) throws SchedulerException { @@ -79,7 +75,7 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY) - void onMessage(TargetDiscovery event) { + void onMessage(TargetDiscovery event) throws SchedulerException { switch (event.kind()) { case MODIFIED: // fall-through @@ -93,11 +89,7 @@ void onMessage(TargetDiscovery event) { .startAt(Date.from(Instant.now().plusSeconds(3))) .usingJobData(jobDetail.getJobDataMap()) .build(); - try { - scheduler.scheduleJob(jobDetail, trigger); - } catch (SchedulerException e) { - logger.errorv(e, "Failed to schedule JVM ID updater job"); - } + scheduler.scheduleJob(jobDetail, trigger); break; default: // no-op From c14f1ab64dd14c8b130fff683395ebbdbd6f5473 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 14:09:17 -0400 Subject: [PATCH 29/38] handle single-target updates within existing transaction --- .../targets/TargetJvmIdUpdateJob.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 6bc60587e..173680820 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -55,23 +55,25 @@ public void execute(JobExecutionContext context) throws JobExecutionException { targets = Target.find("#Target.unconnected").list(); } - targets.forEach(t -> executor.submit(() -> updateTarget(t.id))); + if (targets.size() == 1) { + updateTarget(targets.get(0)); + } else { + targets.forEach(t -> executor.submit(() -> updateTargetTx(t.id))); + } + } + + private void updateTargetTx(long id) { + QuarkusTransaction.requiringNew().run(() -> updateTarget(Target.getTargetById(id))); } - private void updateTarget(long id) { - QuarkusTransaction.requiringNew() - .run( - () -> { - Target target = Target.getTargetById(id); - target.jvmId = - connectionManager - .executeConnectedTaskUni( - target, JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(connectionTimeout); - recordingHelper.listActiveRecordings(target); - target.persist(); - }); + private void updateTarget(Target target) { + target.jvmId = + connectionManager + .executeConnectedTaskUni(target, JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + recordingHelper.listActiveRecordings(target); + target.persist(); } } From 730ddcece247525d05184d35670081fdb2e0d501 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 14:26:56 -0400 Subject: [PATCH 30/38] reduce delay --- src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index ed7161380..60254f803 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -86,7 +86,7 @@ void onMessage(TargetDiscovery event) throws SchedulerException { Trigger trigger = TriggerBuilder.newTrigger() - .startAt(Date.from(Instant.now().plusSeconds(3))) + .startAt(Date.from(Instant.now().plusSeconds(1))) .usingJobData(jobDetail.getJobDataMap()) .build(); scheduler.scheduleJob(jobDetail, trigger); From 35b17bc88b7e9b14de46a7c3ec6cd26844b91d17 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Wed, 23 Oct 2024 16:43:36 -0400 Subject: [PATCH 31/38] skip update if jvmId already known --- .../java/io/cryostat/targets/TargetJvmIdUpdateService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index 60254f803..d214efd28 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -29,6 +29,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.quartz.JobBuilder; @@ -80,6 +81,9 @@ void onMessage(TargetDiscovery event) throws SchedulerException { case MODIFIED: // fall-through case FOUND: + if (StringUtils.isNotBlank(event.serviceRef().jvmId)) { + break; + } JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); Map data = jobDetail.getJobDataMap(); data.put("targetId", event.serviceRef().id); From c1262c3003986fecb0057858bb9f7bbed31c0367 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 11:31:06 -0400 Subject: [PATCH 32/38] updates should continue even if JVM ID is already known, so that active recordings can be updated as well --- src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java | 2 +- .../java/io/cryostat/targets/TargetJvmIdUpdateService.java | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java index 173680820..8b8ad0517 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java @@ -73,7 +73,7 @@ private void updateTarget(Target target) { .map(JvmIdentifier::getHash) .await() .atMost(connectionTimeout); - recordingHelper.listActiveRecordings(target); + target.activeRecordings = recordingHelper.listActiveRecordings(target); target.persist(); } } diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java index d214efd28..f3b2ee876 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java @@ -29,7 +29,6 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; -import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.quartz.JobBuilder; @@ -81,13 +80,9 @@ void onMessage(TargetDiscovery event) throws SchedulerException { case MODIFIED: // fall-through case FOUND: - if (StringUtils.isNotBlank(event.serviceRef().jvmId)) { - break; - } JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); Map data = jobDetail.getJobDataMap(); data.put("targetId", event.serviceRef().id); - Trigger trigger = TriggerBuilder.newTrigger() .startAt(Date.from(Instant.now().plusSeconds(1))) From 9be3035c953eece1e4a1eec9c716fea06b2a91d5 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 14:52:00 -0400 Subject: [PATCH 33/38] rename --- .../{TargetJvmIdUpdateJob.java => TargetUpdateJob.java} | 2 +- ...rgetJvmIdUpdateService.java => TargetUpdateService.java} | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) rename src/main/java/io/cryostat/targets/{TargetJvmIdUpdateJob.java => TargetUpdateJob.java} (98%) rename src/main/java/io/cryostat/targets/{TargetJvmIdUpdateService.java => TargetUpdateService.java} (94%) diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java b/src/main/java/io/cryostat/targets/TargetUpdateJob.java similarity index 98% rename from src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java rename to src/main/java/io/cryostat/targets/TargetUpdateJob.java index 8b8ad0517..9cf066166 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetUpdateJob.java @@ -34,7 +34,7 @@ import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; -public class TargetJvmIdUpdateJob implements Job { +public class TargetUpdateJob implements Job { @Inject Logger logger; @Inject TargetConnectionManager connectionManager; diff --git a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java b/src/main/java/io/cryostat/targets/TargetUpdateService.java similarity index 94% rename from src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java rename to src/main/java/io/cryostat/targets/TargetUpdateService.java index f3b2ee876..a91a44070 100644 --- a/src/main/java/io/cryostat/targets/TargetJvmIdUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetUpdateService.java @@ -40,7 +40,7 @@ import org.quartz.TriggerBuilder; @ApplicationScoped -public class TargetJvmIdUpdateService { +public class TargetUpdateService { @Inject Logger logger; @Inject Scheduler scheduler; @@ -51,7 +51,7 @@ public class TargetJvmIdUpdateService { void onStart(@Observes StartupEvent evt) throws SchedulerException { logger.tracev("{0} started", getClass().getName()); - JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); + JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); Trigger trigger = TriggerBuilder.newTrigger() @@ -80,7 +80,7 @@ void onMessage(TargetDiscovery event) throws SchedulerException { case MODIFIED: // fall-through case FOUND: - JobDetail jobDetail = JobBuilder.newJob(TargetJvmIdUpdateJob.class).build(); + JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); Map data = jobDetail.getJobDataMap(); data.put("targetId", event.serviceRef().id); Trigger trigger = From 26993002a391cad961167dd9d058ab7f32021143 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 14:57:22 -0400 Subject: [PATCH 34/38] rules ignore target discovery when JVM ID is still blank, act later after JVM ID is determined --- src/main/java/io/cryostat/rules/RuleService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index d080fe5f7..58df86a9f 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -49,6 +49,7 @@ import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import jakarta.transaction.Transactional; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -94,7 +95,12 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { + case MODIFIED: + // fall-through case FOUND: + if (StringUtils.isBlank(event.serviceRef().jvmId)) { + break; + } applyRulesToTarget(event.serviceRef()); break; case LOST: From 1d372f37d48ed99c60bb9158ba95dd63c029df7a Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 15:06:55 -0400 Subject: [PATCH 35/38] handle updating JVM ID on credential change, null out JVM ID if update attempt fails --- .../io/cryostat/targets/TargetUpdateJob.java | 18 ++++--- .../cryostat/targets/TargetUpdateService.java | 51 +++++++++++++++---- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetUpdateJob.java b/src/main/java/io/cryostat/targets/TargetUpdateJob.java index 9cf066166..cda6e64f1 100644 --- a/src/main/java/io/cryostat/targets/TargetUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetUpdateJob.java @@ -67,12 +67,18 @@ private void updateTargetTx(long id) { } private void updateTarget(Target target) { - target.jvmId = - connectionManager - .executeConnectedTaskUni(target, JFRConnection::getJvmIdentifier) - .map(JvmIdentifier::getHash) - .await() - .atMost(connectionTimeout); + try { + target.jvmId = + connectionManager + .executeConnectedTaskUni(target, JFRConnection::getJvmIdentifier) + .map(JvmIdentifier::getHash) + .await() + .atMost(connectionTimeout); + } catch (Exception e) { + target.jvmId = null; + target.persist(); + throw e; + } target.activeRecordings = recordingHelper.listActiveRecordings(target); target.persist(); } diff --git a/src/main/java/io/cryostat/targets/TargetUpdateService.java b/src/main/java/io/cryostat/targets/TargetUpdateService.java index a91a44070..54a6ac268 100644 --- a/src/main/java/io/cryostat/targets/TargetUpdateService.java +++ b/src/main/java/io/cryostat/targets/TargetUpdateService.java @@ -21,6 +21,8 @@ import java.util.Map; import io.cryostat.ConfigProperties; +import io.cryostat.credentials.Credential; +import io.cryostat.expressions.MatchExpressionEvaluator; import io.cryostat.targets.Target.TargetDiscovery; import io.quarkus.runtime.ShutdownEvent; @@ -44,6 +46,7 @@ public class TargetUpdateService { @Inject Logger logger; @Inject Scheduler scheduler; + @Inject MatchExpressionEvaluator matchExpressionEvaluator; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionTimeout; @@ -74,25 +77,55 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { scheduler.shutdown(); } + @ConsumeEvent(Credential.CREDENTIALS_STORED) + void onCredentialsStored(Credential credential) { + updateTargetsForExpression(credential); + } + + @ConsumeEvent(Credential.CREDENTIALS_UPDATED) + void onCredentialsUpdated(Credential credential) { + updateTargetsForExpression(credential); + } + + @ConsumeEvent(Credential.CREDENTIALS_DELETED) + void onCredentialsDeleted(Credential credential) { + updateTargetsForExpression(credential); + } + + private void updateTargetsForExpression(Credential credential) { + for (Target target : + matchExpressionEvaluator.getMatchedTargets(credential.matchExpression)) { + try { + fireTargetUpdate(target); + } catch (SchedulerException se) { + logger.warn(se); + } + } + } + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY) void onMessage(TargetDiscovery event) throws SchedulerException { switch (event.kind()) { case MODIFIED: // fall-through case FOUND: - JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); - Map data = jobDetail.getJobDataMap(); - data.put("targetId", event.serviceRef().id); - Trigger trigger = - TriggerBuilder.newTrigger() - .startAt(Date.from(Instant.now().plusSeconds(1))) - .usingJobData(jobDetail.getJobDataMap()) - .build(); - scheduler.scheduleJob(jobDetail, trigger); + fireTargetUpdate(event.serviceRef()); break; default: // no-op break; } } + + private void fireTargetUpdate(Target target) throws SchedulerException { + JobDetail jobDetail = JobBuilder.newJob(TargetUpdateJob.class).build(); + Map data = jobDetail.getJobDataMap(); + data.put("targetId", target.id); + Trigger trigger = + TriggerBuilder.newTrigger() + .startAt(Date.from(Instant.now().plusSeconds(1))) + .usingJobData(jobDetail.getJobDataMap()) + .build(); + scheduler.scheduleJob(jobDetail, trigger); + } } From 9caa89a9e64cc388d4459a0a2e182b4b0cf8565e Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 16:45:50 -0400 Subject: [PATCH 36/38] handle events in ordered serial fashion --- src/main/java/io/cryostat/rules/RuleService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 58df86a9f..7b2014705 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -92,7 +92,7 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { quartz.shutdown(); } - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true, ordered = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { case MODIFIED: @@ -123,7 +123,7 @@ void onMessage(TargetDiscovery event) { } } - @ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true) + @ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true, ordered = true) @Transactional public void handleRuleModification(RuleEvent event) { Rule rule = event.rule(); @@ -148,7 +148,7 @@ public void handleRuleModification(RuleEvent event) { } } - @ConsumeEvent(value = Rule.RULE_ADDRESS + "?clean", blocking = true) + @ConsumeEvent(value = Rule.RULE_ADDRESS + "?clean", blocking = true, ordered = true) @Transactional public void handleRuleRecordingCleanup(Rule rule) { cancelTasksForRule(rule); From 64b2320eb2b156573423e7d5f0ebc85bc232a410 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 16:46:05 -0400 Subject: [PATCH 37/38] use infrastructure pool instead of forkjoin --- src/main/java/io/cryostat/targets/TargetUpdateJob.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/cryostat/targets/TargetUpdateJob.java b/src/main/java/io/cryostat/targets/TargetUpdateJob.java index cda6e64f1..7a67cd15a 100644 --- a/src/main/java/io/cryostat/targets/TargetUpdateJob.java +++ b/src/main/java/io/cryostat/targets/TargetUpdateJob.java @@ -17,8 +17,6 @@ import java.time.Duration; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import io.cryostat.ConfigProperties; import io.cryostat.core.net.JFRConnection; @@ -26,6 +24,7 @@ import io.cryostat.recordings.RecordingHelper; import io.quarkus.narayana.jta.QuarkusTransaction; +import io.smallrye.mutiny.infrastructure.Infrastructure; import jakarta.inject.Inject; import jakarta.transaction.Transactional; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -39,7 +38,6 @@ public class TargetUpdateJob implements Job { @Inject Logger logger; @Inject TargetConnectionManager connectionManager; @Inject RecordingHelper recordingHelper; - ExecutorService executor = ForkJoinPool.commonPool(); @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionTimeout; @@ -58,7 +56,8 @@ public void execute(JobExecutionContext context) throws JobExecutionException { if (targets.size() == 1) { updateTarget(targets.get(0)); } else { - targets.forEach(t -> executor.submit(() -> updateTargetTx(t.id))); + targets.forEach( + t -> Infrastructure.getDefaultExecutor().execute(() -> updateTargetTx(t.id))); } } From d9ea05b7c6c8dfa6fa4e28e7909196394831cefe Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 24 Oct 2024 16:58:09 -0400 Subject: [PATCH 38/38] Revert "handle events in ordered serial fashion" This reverts commit c2b8d3fc9577105cd16d2694ae3289e64931820a. --- src/main/java/io/cryostat/rules/RuleService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 7b2014705..58df86a9f 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -92,7 +92,7 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { quartz.shutdown(); } - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true, ordered = true) + @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { case MODIFIED: @@ -123,7 +123,7 @@ void onMessage(TargetDiscovery event) { } } - @ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true, ordered = true) + @ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true) @Transactional public void handleRuleModification(RuleEvent event) { Rule rule = event.rule(); @@ -148,7 +148,7 @@ public void handleRuleModification(RuleEvent event) { } } - @ConsumeEvent(value = Rule.RULE_ADDRESS + "?clean", blocking = true, ordered = true) + @ConsumeEvent(value = Rule.RULE_ADDRESS + "?clean", blocking = true) @Transactional public void handleRuleRecordingCleanup(Rule rule) { cancelTasksForRule(rule);