diff --git a/compose/cryostat.yml b/compose/cryostat.yml index 69cf914b9..285b14734 100644 --- a/compose/cryostat.yml +++ b/compose/cryostat.yml @@ -26,7 +26,7 @@ services: io.cryostat.jmxPort: "0" io.cryostat.jmxUrl: "service:jmx:rmi:///jndi/rmi://localhost:0/jmxrmi" environment: - QUARKUS_LOG_LEVEL: TRACE + QUARKUS_LOG_LEVEL: ALL QUARKUS_HTTP_HOST: "cryostat" QUARKUS_HTTP_PORT: ${CRYOSTAT_HTTP_PORT} QUARKUS_HIBERNATE_ORM_LOG_SQL: "true" diff --git a/compose/db-viewer.yml b/compose/db-viewer.yml index 1b1efd5a6..c2f20c273 100644 --- a/compose/db-viewer.yml +++ b/compose/db-viewer.yml @@ -4,7 +4,7 @@ services: depends_on: db: condition: service_healthy - image: ${DB_VIEWER_IMAGE:-docker.io/dpage/pgadmin4:7} + image: ${DB_VIEWER_IMAGE:-docker.io/dpage/pgadmin4:8} hostname: db-viewer ports: - "8989:8989" diff --git a/compose/sample-apps.yml b/compose/sample-apps.yml index 0138b8ca3..e47c2ca7a 100644 --- a/compose/sample-apps.yml +++ b/compose/sample-apps.yml @@ -103,7 +103,18 @@ services: expose: - "9977" environment: - JAVA_OPTS_APPEND: "-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/deployments/app/cryostat-agent.jar" + JAVA_OPTS_APPEND: >- + -Dquarkus.http.host=0.0.0.0 + -Djava.util.logging.manager=org.jboss.logmanager.LogManager + -javaagent:/deployments/app/cryostat-agent.jar + -Dcom.sun.management.jmxremote.autodiscovery=false + -Dcom.sun.management.jmxremote + -Dcom.sun.management.jmxremote.port=22222 + -Dcom.sun.management.jmxremote.rmi.port=22222 + -Djava.rmi.server.hostname=quarkus-test-agent + -Dcom.sun.management.jmxremote.authenticate=false + -Dcom.sun.management.jmxremote.ssl=false + -Dcom.sun.management.jmxremote.local.only=false QUARKUS_HTTP_PORT: 10010 ORG_ACME_CRYOSTATSERVICE_ENABLED: "false" CRYOSTAT_AGENT_APP_NAME: quarkus-test-agent diff --git a/src/main/java/io/cryostat/credentials/Credential.java b/src/main/java/io/cryostat/credentials/Credential.java index 5f24dc8cd..324efaa9d 100644 --- a/src/main/java/io/cryostat/credentials/Credential.java +++ b/src/main/java/io/cryostat/credentials/Credential.java @@ -15,13 +15,16 @@ */ package io.cryostat.credentials; +import io.cryostat.discovery.DiscoveryPlugin; import io.cryostat.expressions.MatchExpression; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import io.quarkus.hibernate.orm.panache.PanacheEntity; import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.persistence.CascadeType; @@ -68,6 +71,16 @@ public class Credential extends PanacheEntity { @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) public String password; + @OneToOne( + optional = true, + fetch = FetchType.LAZY, + mappedBy = "credential", + cascade = CascadeType.REMOVE) + @JoinColumn(name = "discoveryPlugin_id") + @JsonIgnore + @Nullable + public DiscoveryPlugin discoveryPlugin; + @ApplicationScoped static class Listener { @Inject EventBus bus; diff --git a/src/main/java/io/cryostat/credentials/Credentials.java b/src/main/java/io/cryostat/credentials/Credentials.java index 2feffcf45..7f840be92 100644 --- a/src/main/java/io/cryostat/credentials/Credentials.java +++ b/src/main/java/io/cryostat/credentials/Credentials.java @@ -98,8 +98,7 @@ public RestResponse create( @RolesAllowed("write") @Path("/{id}") public void delete(@RestPath long id) { - Credential credential = Credential.find("id", id).singleResult(); - credential.delete(); + Credential.find("id", id).singleResult().delete(); } static Map notificationResult(Credential credential) throws ScriptException { diff --git a/src/main/java/io/cryostat/discovery/CustomDiscovery.java b/src/main/java/io/cryostat/discovery/CustomDiscovery.java index 2f7055cdb..c70f76d07 100644 --- a/src/main/java/io/cryostat/discovery/CustomDiscovery.java +++ b/src/main/java/io/cryostat/discovery/CustomDiscovery.java @@ -34,8 +34,8 @@ import io.cryostat.targets.Target.Annotations; import io.cryostat.targets.TargetConnectionManager; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.StartupEvent; -import io.smallrye.common.annotation.Blocking; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.context.ApplicationScoped; @@ -128,13 +128,15 @@ public Response create( return doV2Create(target, Optional.ofNullable(credential), dryrun, storeCredentials); } - @Transactional - @Blocking Response doV2Create( Target target, Optional credential, boolean dryrun, boolean storeCredentials) { + var beginTx = !QuarkusTransaction.isActive(); + if (beginTx) { + QuarkusTransaction.begin(); + } try { target.connectUrl = sanitizeConnectUrl(target.connectUrl.toString()); @@ -178,10 +180,17 @@ Response doV2Create( node.persist(); realm.persist(); + if (beginTx) { + QuarkusTransaction.commit(); + } + return Response.created(URI.create("/api/v3/targets/" + target.id)) .entity(V2Response.json(Response.Status.CREATED, target)) .build(); } catch (Exception e) { + // roll back regardless of whether we initiated this database transaction or a caller + // did + QuarkusTransaction.rollback(); if (ExceptionUtils.indexOfType(e, ConstraintViolationException.class) >= 0) { logger.warn("Invalid target definition", e); return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) diff --git a/src/main/java/io/cryostat/discovery/Discovery.java b/src/main/java/io/cryostat/discovery/Discovery.java index fc6025e9c..d0f3bb431 100644 --- a/src/main/java/io/cryostat/discovery/Discovery.java +++ b/src/main/java/io/cryostat/discovery/Discovery.java @@ -30,11 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.UUID; -import io.cryostat.credentials.Credential; import io.cryostat.discovery.DiscoveryPlugin.PluginCallback; import io.cryostat.targets.TargetConnectionManager; @@ -43,6 +41,7 @@ import com.nimbusds.jose.JOSEException; import com.nimbusds.jwt.proc.BadJWTException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.vertx.core.json.JsonObject; @@ -64,6 +63,7 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -180,6 +180,7 @@ public RestResponse checkRegistration( @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @RolesAllowed("write") + @SuppressFBWarnings("DLS_DEAD_LOCAL_STORE") public Response register(@Context RoutingContext ctx, JsonObject body) throws URISyntaxException, JOSEException, @@ -192,6 +193,7 @@ public Response register(@Context RoutingContext ctx, JsonObject body) String priorToken = body.getString("token"); String realmName = body.getString("realm"); URI callbackUri = new URI(body.getString("callback")); + URI unauthCallback = UriBuilder.fromUri(callbackUri).userInfo(null).build(); // TODO apply URI range validation to the remote address InetAddress remoteAddress = getRemoteAddress(ctx); @@ -205,12 +207,33 @@ public Response register(@Context RoutingContext ctx, JsonObject body) if (!Objects.equals(plugin.realm.name, realmName)) { throw new ForbiddenException(); } - if (!Objects.equals(plugin.callback, callbackUri)) { + if (!Objects.equals(plugin.callback, unauthCallback)) { throw new BadRequestException(); } location = jwtFactory.getPluginLocation(plugin); jwtFactory.parseDiscoveryPluginJwt(plugin, priorToken, location, remoteAddress, false); } else { + // check if a plugin record with the same callback already exists. If it does, ping it: + // if it's still there reject this request as a duplicate, otherwise delete the previous + // record and accept this new one as a replacement + DiscoveryPlugin.find("callback", unauthCallback) + .singleResultOptional() + .ifPresent( + p -> { + try { + var cb = PluginCallback.create(p); + cb.ping(); + throw new IllegalArgumentException( + String.format( + "Plugin with callback %s already exists and is" + + " still reachable", + unauthCallback)); + } catch (Exception e) { + logger.error(e); + p.delete(); + } + }); + // new plugin registration plugin = new DiscoveryPlugin(); plugin.callback = callbackUri; @@ -218,9 +241,12 @@ public Response register(@Context RoutingContext ctx, JsonObject body) DiscoveryNode.environment( requireNonBlank(realmName, "realm"), BaseNodeType.REALM); plugin.builtin = false; - plugin.persist(); - DiscoveryNode.getUniverse().children.add(plugin.realm); + var universe = DiscoveryNode.getUniverse(); + plugin.realm.parent = universe; + plugin.persist(); + universe.children.add(plugin.realm); + universe.persist(); location = jwtFactory.getPluginLocation(plugin); @@ -293,15 +319,15 @@ public Map> publish( DiscoveryPlugin plugin = DiscoveryPlugin.find("id", id).singleResult(); jwtValidator.validateJwt(ctx, plugin, token, true); plugin.realm.children.clear(); - plugin.persist(); plugin.realm.children.addAll(body); - body.forEach( - b -> { - if (b.target != null) { - b.target.discoveryNode = b; - } - b.persist(); - }); + for (var b : body) { + if (b.target != null) { + b.target.discoveryNode = b; + b.target.discoveryNode.parent = plugin.realm; + b.parent = plugin.realm; + } + b.persist(); + } plugin.persist(); return Map.of( @@ -338,10 +364,7 @@ public Map> deregister( scheduler.deleteJob(key); } - plugin.realm.delete(); plugin.delete(); - getStoredCredential(plugin).ifPresent(Credential::delete); - DiscoveryNode.getUniverse().children.remove(plugin.realm); return Map.of( "meta", Map.of( @@ -374,17 +397,11 @@ public DiscoveryPlugin getPlugin(@RestPath UUID id) throws JsonProcessingExcepti return DiscoveryPlugin.find("id", id).singleResult(); } - Optional getStoredCredential(DiscoveryPlugin plugin) { - return new DiscoveryPlugin.PluginCallback.DiscoveryPluginAuthorizationHeaderFactory(plugin) - .getCredential(); - } - + @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") static class RefreshPluginJob implements Job { @Inject Logger logger; @Override - @Transactional - @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public void execute(JobExecutionContext context) throws JobExecutionException { DiscoveryPlugin plugin = null; try { @@ -407,14 +424,12 @@ public void execute(JobExecutionContext context) throws JobExecutionException { if (plugin != null) { logger.debugv( e, "Pruned discovery plugin: {0} @ {1}", plugin.realm, plugin.callback); - plugin.realm.delete(); - plugin.delete(); - new DiscoveryPlugin.PluginCallback.DiscoveryPluginAuthorizationHeaderFactory( - plugin) - .getCredential() - .ifPresent(Credential::delete); + QuarkusTransaction.requiringNew().run(plugin::delete); + } else { + var ex = new JobExecutionException(e); + ex.setUnscheduleFiringTrigger(true); + throw ex; } - throw new JobExecutionException(e); } } } diff --git a/src/main/java/io/cryostat/discovery/DiscoveryNode.java b/src/main/java/io/cryostat/discovery/DiscoveryNode.java index 828eae17e..4d06143e0 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryNode.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryNode.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonView; import io.quarkus.hibernate.orm.panache.PanacheEntity; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; @@ -130,25 +131,33 @@ public static List findAllByNodeType(NodeType nodeType) { } public static DiscoveryNode environment(String name, NodeType nodeType) { - DiscoveryNode node = new DiscoveryNode(); - node.name = name; - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(); - node.children = new ArrayList<>(); - node.target = null; - node.persist(); - return node; + return QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = name; + node.nodeType = nodeType.getKind(); + node.labels = new HashMap<>(); + node.children = new ArrayList<>(); + node.target = null; + node.persist(); + return node; + }); } public static DiscoveryNode target(Target target, NodeType nodeType) { - DiscoveryNode node = new DiscoveryNode(); - node.name = target.connectUrl.toString(); - node.nodeType = nodeType.getKind(); - node.labels = new HashMap<>(target.labels); - node.children = null; - node.target = target; - node.persist(); - return node; + return QuarkusTransaction.joiningExisting() + .call( + () -> { + DiscoveryNode node = new DiscoveryNode(); + node.name = target.connectUrl.toString(); + node.nodeType = nodeType.getKind(); + node.labels = new HashMap<>(target.labels); + node.children = null; + node.target = target; + node.persist(); + return node; + }); } @Override diff --git a/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java b/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java index 81a03bf07..ef484ee39 100644 --- a/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java +++ b/src/main/java/io/cryostat/discovery/DiscoveryPlugin.java @@ -19,15 +19,16 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Base64; -import java.util.Optional; import java.util.UUID; import io.cryostat.credentials.Credential; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.hibernate.orm.panache.PanacheEntityBase; import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder; +import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.persistence.CascadeType; @@ -40,6 +41,7 @@ import jakarta.persistence.Id; import jakarta.persistence.OneToOne; import jakarta.persistence.PrePersist; +import jakarta.transaction.Transactional; import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; @@ -47,6 +49,7 @@ import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MultivaluedHashMap; import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.UriBuilder; import org.apache.commons.lang3.StringUtils; import org.eclipse.microprofile.rest.client.ext.ClientHeadersFactory; import org.hibernate.annotations.GenericGenerator; @@ -71,10 +74,18 @@ public class DiscoveryPlugin extends PanacheEntityBase { @NotNull public DiscoveryNode realm; - @Column(unique = true, updatable = false) + @Column(nullable = true, unique = true, updatable = false) @Convert(converter = UriConverter.class) public URI callback; + @OneToOne( + optional = true, // only nullable for builtins + fetch = FetchType.LAZY, + cascade = CascadeType.REMOVE) + @JsonIgnore + @Nullable + public Credential credential; + @JsonProperty(access = JsonProperty.Access.READ_ONLY) public boolean builtin; @@ -84,31 +95,63 @@ static class Listener { @Inject Logger logger; @PrePersist + @Transactional public void prePersist(DiscoveryPlugin plugin) { if (plugin.builtin) { return; } if (plugin.callback == null) { - plugin.realm.delete(); plugin.delete(); throw new IllegalArgumentException(); } + if (plugin.credential == null) { + var credential = getCredential(plugin); + plugin.credential = credential; + plugin.callback = UriBuilder.fromUri(plugin.callback).userInfo(null).build(); + } try { PluginCallback.create(plugin).ping(); logger.infov( "Registered discovery plugin: {0} @ {1}", plugin.realm.name, plugin.callback); } catch (URISyntaxException e) { - plugin.realm.delete(); plugin.delete(); throw new IllegalArgumentException(e); } catch (Exception e) { - plugin.realm.delete(); plugin.delete(); logger.error("Discovery Plugin ping failed", e); throw e; } } + + private Credential getCredential(DiscoveryPlugin plugin) { + String userInfo = plugin.callback.getUserInfo(); + if (StringUtils.isBlank(userInfo)) { + throw new IllegalArgumentException("No stored credentials specified"); + } + + if (!userInfo.contains(":")) { + throw new IllegalArgumentException( + String.format( + "Unexpected non-basic credential format, found: %s", userInfo)); + } + + String[] parts = userInfo.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + String.format("Unexpected basic credential format, found: %s", userInfo)); + } + + if (!"storedcredentials".equals(parts[0])) { + throw new IllegalArgumentException( + String.format( + "Unexpected credential format, expected \"storedcredentials\" but" + + " found: %s", + parts[0])); + } + + return Credential.find("id", Long.parseLong(parts[1])).singleResult(); + } } @Path("") @@ -123,12 +166,6 @@ interface PluginCallback { public void refresh(); public static PluginCallback create(DiscoveryPlugin plugin) throws URISyntaxException { - if (StringUtils.isBlank(plugin.callback.getUserInfo())) { - logger.warnv( - "Plugin with id:{0} realm:{1} callback:{2} did not supply userinfo", - plugin.id, plugin.realm, plugin.callback); - } - PluginCallback client = QuarkusRestClientBuilder.newBuilder() .baseUri(plugin.callback) @@ -148,50 +185,16 @@ public DiscoveryPluginAuthorizationHeaderFactory(DiscoveryPlugin plugin) { this.plugin = plugin; } - public Optional getCredential() { - String userInfo = plugin.callback.getUserInfo(); - if (StringUtils.isBlank(userInfo)) { - logger.error("No stored credentials specified"); - return Optional.empty(); - } - - if (!userInfo.contains(":")) { - logger.errorv("Unexpected non-basic credential format, found: {0}", userInfo); - return Optional.empty(); - } - - String[] parts = userInfo.split(":"); - if (parts.length != 2) { - logger.errorv("Unexpected basic credential format, found: {0}", userInfo); - return Optional.empty(); - } - - if (!"storedcredentials".equals(parts[0])) { - logger.errorv( - "Unexpected credential format, expected \"storedcredentials\" but" - + " found: {0}", - parts[0]); - return Optional.empty(); - } - - return Credential.find("id", Long.parseLong(parts[1])).singleResultOptional(); - } - @Override public MultivaluedMap update( MultivaluedMap incomingHeaders, MultivaluedMap clientOutgoingHeaders) { var result = new MultivaluedHashMap(); - Optional opt = getCredential(); - opt.ifPresent( - credential -> { - String basicAuth = credential.username + ":" + credential.password; - byte[] authBytes = basicAuth.getBytes(StandardCharsets.UTF_8); - String base64Auth = Base64.getEncoder().encodeToString(authBytes); - result.add( - HttpHeaders.AUTHORIZATION, - String.format("Basic %s", base64Auth)); - }); + var credential = plugin.credential; + String basicAuth = String.format("%s:%s", credential.username, credential.password); + byte[] authBytes = basicAuth.getBytes(StandardCharsets.UTF_8); + String base64Auth = Base64.getEncoder().encodeToString(authBytes); + result.add(HttpHeaders.AUTHORIZATION, String.format("Basic %s", base64Auth)); return result; } } diff --git a/src/main/java/io/cryostat/discovery/JDPDiscovery.java b/src/main/java/io/cryostat/discovery/JDPDiscovery.java index 4f057d959..bd88871aa 100644 --- a/src/main/java/io/cryostat/discovery/JDPDiscovery.java +++ b/src/main/java/io/cryostat/discovery/JDPDiscovery.java @@ -33,13 +33,15 @@ import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.quarkus.vertx.ConsumeEvent; import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import jakarta.transaction.Transactional; +import jakarta.transaction.Transactional.TxType; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; @@ -57,6 +59,7 @@ static JvmDiscoveryClient produceJvmDiscoveryClient() { @Inject Logger logger; @Inject JvmDiscoveryClient jdp; @Inject Vertx vertx; + @Inject EventBus eventBus; @ConfigProperty(name = "cryostat.discovery.jdp.enabled") boolean enabled; @@ -99,11 +102,12 @@ void onStop(@Observes ShutdownEvent evt) { @Override public void accept(JvmDiscoveryEvent evt) { - Infrastructure.getDefaultWorkerPool().execute(() -> this.handleJdpEvent(evt)); + eventBus.publish(getClass().getName(), evt); } - @Transactional - public synchronized void handleJdpEvent(JvmDiscoveryEvent evt) { + @ConsumeEvent(blocking = true, ordered = true) + @Transactional(TxType.REQUIRES_NEW) + void handleJdpEvent(JvmDiscoveryEvent evt) { logger.infov( "JDP Discovery Event {0} {1}", evt.getEventKind(), evt.getJvmDescriptor().getMainClass()); diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index dcac7c2f1..d6b52a711 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -154,7 +154,7 @@ void onStart(@Observes @Priority(1) StartupEvent evt) { @Transactional void onAfterStart(@Observes StartupEvent evt) { - if (!(enabled() && available())) { + if (!enabled() || !available()) { return; } safeGetInformers(); diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index cf9885eca..a534be4cb 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -76,6 +76,7 @@ import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Uni; import io.vertx.ext.web.handler.HttpException; @@ -87,7 +88,7 @@ import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.inject.Named; -import jakarta.transaction.Transactional; +import jakarta.persistence.PersistenceException; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.ServerErrorException; @@ -217,6 +218,10 @@ void onStart(@Observes StartupEvent evt) { // to the target and update our database when remote recording events occur, rather than doing a // full sync when this method is called. public List listActiveRecordings(Target target) { + return QuarkusTransaction.joiningExisting().call(() -> listActiveRecordingsImpl(target)); + } + + private List listActiveRecordingsImpl(Target target) { target = Target.find("id", target.id).singleResult(); try { var previousRecordings = target.activeRecordings; @@ -263,7 +268,11 @@ public List listActiveRecordings(Target target) { break; } if (updated) { - recording.persist(); + try { + recording.persist(); + } catch (PersistenceException e) { + logger.warn(e); + } } continue; } @@ -305,6 +314,16 @@ public Uni startRecording( RecordingOptions options, Map rawLabels) throws QuantityConversionException { + return QuarkusTransaction.joiningExisting() + .call(() -> startRecordingImpl(target, replace, template, options, rawLabels)); + } + + private Uni startRecordingImpl( + Target target, + RecordingReplace replace, + Template template, + RecordingOptions options, + Map rawLabels) { String recordingName = options.name(); RecordingState previousState = @@ -506,11 +525,15 @@ public Uni stopRecording(ActiveRecording recording, boolean arc recording.state = RecordingState.STOPPED; return recording; }); - out.persist(); - if (archive) { - archiveRecording(out, null, null); - } - return Uni.createFrom().item(out); + return QuarkusTransaction.joiningExisting() + .call( + () -> { + out.persist(); + if (archive) { + archiveRecording(out, null, null); + } + return Uni.createFrom().item(out); + }); } public Uni stopRecording(ActiveRecording recording) throws Exception { @@ -529,10 +552,14 @@ public Uni deleteRecording(ActiveRecording recording) { conn.getService().close(desc.get()); return recording; }); - closed.target.activeRecordings.remove(recording); - closed.target.persist(); - closed.delete(); - return Uni.createFrom().item(closed); + return QuarkusTransaction.joiningExisting() + .call( + () -> { + closed.target.activeRecordings.remove(recording); + closed.target.persist(); + closed.delete(); + return Uni.createFrom().item(closed); + }); } public LinkedRecordingDescriptor toExternalForm(ActiveRecording recording) { @@ -1079,19 +1106,24 @@ private Metadata taggingToMetadata(List tagSet) { public ActiveRecording updateRecordingMetadata( long recordingId, Map newLabels) { - ActiveRecording recording = ActiveRecording.find("id", recordingId).singleResult(); - - if (!recording.metadata.labels().equals(newLabels)) { - Metadata updatedMetadata = new Metadata(newLabels); - recording.setMetadata(updatedMetadata); - recording.persist(); - - notify( - new ActiveRecordingEvent( - Recordings.RecordingEventCategory.METADATA_UPDATED, - ActiveRecordingEvent.Payload.of(this, recording))); - } - return recording; + return QuarkusTransaction.joiningExisting() + .call( + () -> { + ActiveRecording recording = + ActiveRecording.find("id", recordingId).singleResult(); + + if (!recording.metadata.labels().equals(newLabels)) { + Metadata updatedMetadata = new Metadata(newLabels); + recording.setMetadata(updatedMetadata); + recording.persist(); + + notify( + new ActiveRecordingEvent( + Recordings.RecordingEventCategory.METADATA_UPDATED, + ActiveRecordingEvent.Payload.of(this, recording))); + } + return recording; + }); } private void notify(ActiveRecordingEvent event) { @@ -1275,7 +1307,6 @@ static class StopRecordingJob implements Job { @Inject Logger logger; @Override - @Transactional public void execute(JobExecutionContext ctx) throws JobExecutionException { var jobDataMap = ctx.getJobDetail().getJobDataMap(); try { diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 983fe0057..6fd262dca 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -73,6 +73,7 @@ public class RuleService { private final Map> ruleRecordingMap = new ConcurrentHashMap<>(); + @Transactional void onStart(@Observes StartupEvent ev) { logger.trace("RuleService started"); try (Stream rules = Rule.streamAll()) { @@ -86,6 +87,7 @@ void onStart(@Observes StartupEvent ev) { } @ConsumeEvent(value = Rule.RULE_ADDRESS, blocking = true) + @Transactional public void handleRuleModification(RuleEvent event) { Rule rule = event.rule(); var relatedRecordings = @@ -130,8 +132,7 @@ public void handleRuleRecordingCleanup(Rule rule) { }); } - @Transactional - public void activate(Rule rule, Target target) throws Exception { + void activate(Rule rule, Target target) throws Exception { var options = createRecordingOptions(rule); Pair pair = recordingHelper.parseEventSpecifier(rule.eventSpecifier); @@ -168,7 +169,6 @@ private RecordingOptions createRecordingOptions(Rule rule) { Optional.ofNullable((long) rule.maxAgeSeconds)); } - @Transactional void applyRuleToMatchingTargets(Rule rule) { try (Stream targets = Target.streamAll()) { targets.filter( @@ -208,9 +208,9 @@ private void scheduleArchival(Rule rule, Target target, ActiveRecording recordin } Map data = jobDetail.getJobDataMap(); - data.put("rule", rule); - data.put("target", target); - data.put("recording", recording); + data.put("rule", rule.id); + data.put("target", target.id); + data.put("recording", recording.id); Trigger trigger = TriggerBuilder.newTrigger() diff --git a/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java b/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java index a0e4f1fae..e1261a836 100644 --- a/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java +++ b/src/main/java/io/cryostat/rules/ScheduledArchiveJob.java @@ -47,27 +47,29 @@ class ScheduledArchiveJob implements Job { String archiveBucket; @Override + @Transactional public void execute(JobExecutionContext ctx) throws JobExecutionException { - try { - var rule = (Rule) ctx.getJobDetail().getJobDataMap().get("rule"); - var target = (Target) ctx.getJobDetail().getJobDataMap().get("target"); - var recording = (ActiveRecording) ctx.getJobDetail().getJobDataMap().get("recording"); + long ruleId = (long) ctx.getJobDetail().getJobDataMap().get("rule"); + Rule rule = Rule.find("id", ruleId).singleResult(); + long targetId = (long) ctx.getJobDetail().getJobDataMap().get("target"); + Target target = Target.find("id", targetId).singleResult(); + long recordingId = (long) ctx.getJobDetail().getJobDataMap().get("recording"); + ActiveRecording recording = ActiveRecording.find("id", recordingId).singleResult(); - Queue previousRecordings = new ArrayDeque<>(rule.preservedArchives); + Queue previousRecordings = new ArrayDeque<>(rule.preservedArchives); - initPreviousRecordings(target, rule, previousRecordings); + initPreviousRecordings(target, rule, previousRecordings); + while (previousRecordings.size() >= rule.preservedArchives) { + pruneArchive(target, previousRecordings, previousRecordings.remove()); + } - while (previousRecordings.size() >= rule.preservedArchives) { - pruneArchive(target, previousRecordings, previousRecordings.remove()); - } - performArchival(recording, previousRecordings); + try { + previousRecordings.add(recordingHelper.archiveRecording(recording, null, null).name()); } catch (Exception e) { - logger.error(e); - // TODO: Handle JMX/SSL errors + throw new JobExecutionException(e); } } - @Transactional void initPreviousRecordings(Target target, Rule rule, Queue previousRecordings) { recordingHelper.listArchivedRecordingObjects().stream() .sorted((a, b) -> a.lastModified().compareTo(b.lastModified())) @@ -89,16 +91,7 @@ void initPreviousRecordings(Target target, Rule rule, Queue previousReco }); } - @Transactional - void performArchival(ActiveRecording recording, Queue previousRecordings) - throws Exception { - String filename = recordingHelper.archiveRecording(recording, null, null).name(); - previousRecordings.add(filename); - } - - @Transactional - void pruneArchive(Target target, Queue previousRecordings, String filename) - throws Exception { + void pruneArchive(Target target, Queue previousRecordings, String filename) { recordingHelper.deleteArchivedRecording(target.jvmId, filename); previousRecordings.remove(filename); } diff --git a/src/main/java/io/cryostat/targets/AgentClient.java b/src/main/java/io/cryostat/targets/AgentClient.java index fd77d8f94..ceff7ba13 100644 --- a/src/main/java/io/cryostat/targets/AgentClient.java +++ b/src/main/java/io/cryostat/targets/AgentClient.java @@ -40,6 +40,7 @@ import io.cryostat.core.serialization.SerializableRecordingDescriptor; import io.cryostat.credentials.Credential; import io.cryostat.credentials.CredentialsFinder; +import io.cryostat.discovery.DiscoveryPlugin; import io.cryostat.targets.AgentJFRService.StartRecordingRequest; import io.cryostat.util.HttpStatusCodeIdentifier; @@ -394,21 +395,22 @@ private Uni> invoke(HttpMethod mtd, String path, BodyCodec Uni> invoke( HttpMethod mtd, String path, Buffer payload, BodyCodec codec) { logger.debugv("{0} {1} {2}", mtd, getUri(), path); + + Credential credential = + DiscoveryPlugin.find("callback", getUri()) + .singleResult() + .credential; + HttpRequest req = webClient .request(mtd, getUri().getPort(), getUri().getHost(), path) .ssl("https".equals(getUri().getScheme())) .timeout(httpTimeout.toMillis()) .followRedirects(true) - .as(codec); - Credential credential = - credentialsFinder.getCredentialsForConnectUrl(getUri()).orElse(null); - if (credential == null || credential.username == null || credential.password == null) { - throw new IllegalStateException(NULL_CREDENTIALS + " " + getUri()); - } - req = - req.authentication( - new UsernamePasswordCredentials(credential.username, credential.password)); + .as(codec) + .authentication( + new UsernamePasswordCredentials( + credential.username, credential.password)); Uni> uni; if (payload != null) { diff --git a/src/main/java/io/cryostat/targets/TargetConnectionManager.java b/src/main/java/io/cryostat/targets/TargetConnectionManager.java index 5bc3a1469..0d5969a98 100644 --- a/src/main/java/io/cryostat/targets/TargetConnectionManager.java +++ b/src/main/java/io/cryostat/targets/TargetConnectionManager.java @@ -52,13 +52,13 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.Scheduler; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.unchecked.Unchecked; import io.vertx.ext.web.handler.HttpException; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import jakarta.transaction.Transactional; import jdk.jfr.Category; import jdk.jfr.Event; import jdk.jfr.FlightRecorder; @@ -298,10 +298,14 @@ private void closeConnection(URI connectUrl, JFRConnection connection, RemovalCa } } - @Transactional JFRConnection connect(URI connectUrl) throws Exception { - var credentials = credentialsFinder.getCredentialsForConnectUrl(connectUrl); - return connect(connectUrl, credentials); + return QuarkusTransaction.joiningExisting() + .call( + () -> { + var credentials = + credentialsFinder.getCredentialsForConnectUrl(connectUrl); + return connect(connectUrl, credentials); + }); } JFRConnection connect(URI connectUrl, Optional credentials) throws Exception {