Skip to content

Commit

Permalink
fix(rules): rule archiving and re-activation bugfixes (backport #518) (
Browse files Browse the repository at this point in the history
…#519)

fix(rules): rule archiving and re-activation bugfixes (#518)

* fix(rules): rule archiving and re-activation bugfixes

* refactor

* blocking

* activate concurrently

* cancel job when target disappears

* fixup! cancel job when target disappears

* apply existing rules to newly appearing targets

(cherry picked from commit 233df2a)

Co-authored-by: Andrew Azores <[email protected]>
  • Loading branch information
mergify[bot] and andrewazores authored Jun 13, 2024
1 parent 24e130a commit 97dac34
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 93 deletions.
42 changes: 28 additions & 14 deletions src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package io.cryostat.expressions;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.cryostat.expressions.MatchExpression.ExpressionEvent;
import io.cryostat.targets.Target;
Expand Down Expand Up @@ -166,20 +166,34 @@ public boolean applies(MatchExpression matchExpression, Target target) throws Sc
}

public List<Target> getMatchedTargets(MatchExpression matchExpression) {
try (Stream<Target> targets = Target.streamAll()) {
return targets.filter(
target -> {
try {
return applies(matchExpression, target);
} catch (ScriptException e) {
logger.error(
"Error while processing expression: " + matchExpression,
e);
return false;
}
})
.collect(Collectors.toList());
var targets =
Target.<Target>listAll().stream()
.filter(
target -> {
try {
return applies(matchExpression, target);
} catch (ScriptException e) {
logger.error(
"Error while processing expression: "
+ matchExpression,
e);
return false;
}
})
.collect(Collectors.toList());

var ids = new HashSet<>();
var it = targets.iterator();
while (it.hasNext()) {
var t = it.next();
if (ids.contains(t.jvmId)) {
it.remove();
continue;
}
ids.add(t.jvmId);
}

return targets;
}

@Name("io.cryostat.rules.MatchExpressionEvaluator.MatchExpressionApplies")
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/io/cryostat/recordings/RecordingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -223,6 +224,15 @@ public List<ActiveRecording> listActiveRecordings(Target target) {
return QuarkusTransaction.joiningExisting().call(() -> listActiveRecordingsImpl(target));
}

public Optional<ActiveRecording> getActiveRecording(
Target target, Predicate<ActiveRecording> fn) {
return listActiveRecordings(target).stream().filter(fn).findFirst();
}

public Optional<ActiveRecording> getActiveRecording(Target target, long remoteId) {
return getActiveRecording(target, r -> r.remoteId == remoteId);
}

private List<ActiveRecording> listActiveRecordingsImpl(Target target) {
target = Target.find("id", target.id).singleResult();
try {
Expand Down Expand Up @@ -335,15 +345,12 @@ private Uni<ActiveRecording> startRecordingImpl(
getDescriptorByName(conn, recordingName)
.map(this::mapState)
.orElse(null));
boolean restart =
previousState == null
|| shouldRestartRecording(replace, previousState, recordingName);
boolean restart = previousState == null || shouldRestartRecording(replace, previousState);
if (!restart) {
throw new EntityExistsException("Recording", recordingName);
}
listActiveRecordings(target).stream()
.filter(r -> r.name.equals(recordingName))
.forEach(this::deleteRecording);
getActiveRecording(target, r -> r.name.equals(recordingName))
.ifPresent(r -> this.deleteRecording(r).await().atMost(connectionFailedTimeout));
var desc =
connectionManager.executeConnectedTask(
target,
Expand Down Expand Up @@ -503,8 +510,7 @@ private boolean snapshotIsReadable(Target target, InputStream snapshot) throws I
}
}

private boolean shouldRestartRecording(
RecordingReplace replace, RecordingState state, String recordingName)
private boolean shouldRestartRecording(RecordingReplace replace, RecordingState state)
throws BadRequestException {
switch (replace) {
case ALWAYS:
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/io/cryostat/recordings/Recordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public class Recordings {
@ConfigProperty(name = ConfigProperties.STORAGE_EXT_URL)
Optional<String> externalStorageUrl;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Duration connectionFailedTimeout;

void onStart(@Observes StartupEvent evt) {
storageBuckets.createIfNecessary(bucket);
}
Expand Down Expand Up @@ -481,7 +484,10 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod
ActiveRecording activeRecording = recording.get();
switch (body.toLowerCase()) {
case "stop":
recordingHelper.stopRecording(activeRecording).await().indefinitely();
recordingHelper
.stopRecording(activeRecording)
.await()
.atMost(connectionFailedTimeout);
return null;
case "save":
try {
Expand Down Expand Up @@ -698,7 +704,7 @@ public void deleteRecording(@RestPath long targetId, @RestPath long remoteId) th
if (recording == null) {
throw new NotFoundException();
}
recordingHelper.deleteRecording(recording);
recordingHelper.deleteRecording(recording).await().atMost(connectionFailedTimeout);
}

@DELETE
Expand Down
Loading

0 comments on commit 97dac34

Please sign in to comment.