From bc724fe33df20bc306c0a8620874fb3c2d1bf487 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 8 Jan 2024 11:51:20 -0500 Subject: [PATCH] refactor, do not cache generation failures --- .../cryostat/reports/ReportsServiceImpl.java | 114 ++++++++++-------- 1 file changed, 67 insertions(+), 47 deletions(-) diff --git a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java index 61c5faca9..2084387f5 100644 --- a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java +++ b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.function.Predicate; import org.openjdk.jmc.flightrecorder.rules.IRule; @@ -60,63 +61,76 @@ class ReportsServiceImpl implements ReportsService { @Override public Uni> reportFor( ActiveRecording recording, Predicate predicate) { - return sidecarUri - .map( - uri -> { - logger.tracev( - "sidecar reportFor active recording {0} {1}", - recording.target.jvmId, recording.remoteId); - try { - return fireRequest(uri, helper.getActiveInputStream(recording)); - } catch (Exception e) { - return Uni.createFrom().>failure(e); - } - }) - .orElseGet( - () -> { - logger.tracev( - "inprocess reportFor active recording {0} {1}", - recording.target.jvmId, recording.remoteId); - try { - return process(helper.getActiveInputStream(recording), predicate); - } catch (Exception e) { - return Uni.createFrom().>failure(e); - } - }); + Future> future = + sidecarUri + .map( + uri -> { + logger.tracev( + "sidecar reportFor active recording {0} {1}", + recording.target.jvmId, recording.remoteId); + try { + return fireRequest( + uri, helper.getActiveInputStream(recording)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .orElseGet( + () -> { + logger.tracev( + "inprocess reportFor active recording {0} {1}", + recording.target.jvmId, recording.remoteId); + try { + return process( + helper.getActiveInputStream(recording), predicate); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return Uni.createFrom() + .future(future) + .onFailure() + .transform(ReportGenerationException::new); } @Blocking @Override public Uni> reportFor( String jvmId, String filename, Predicate predicate) { - return sidecarUri - .map( - uri -> { - logger.tracev( - "sidecar reportFor archived recording {0} {1}", - jvmId, filename); - return fireRequest( - uri, helper.getArchivedRecordingStream(jvmId, filename)); - }) - .orElseGet( - () -> { - logger.tracev( - "inprocess reportFor archived recording {0} {1}", - jvmId, filename); - return process( - helper.getArchivedRecordingStream(jvmId, filename), predicate); - }); + Future> future = + sidecarUri + .map( + uri -> { + logger.tracev( + "sidecar reportFor archived recording {0} {1}", + jvmId, filename); + return fireRequest( + uri, + helper.getArchivedRecordingStream(jvmId, filename)); + }) + .orElseGet( + () -> { + logger.tracev( + "inprocess reportFor archived recording {0} {1}", + jvmId, filename); + return process( + helper.getArchivedRecordingStream(jvmId, filename), + predicate); + }); + + return Uni.createFrom() + .future(future) + .onFailure() + .transform(ReportGenerationException::new); } - private Uni> process( + private Future> process( InputStream stream, Predicate predicate) { - return Uni.createFrom() - .future( - reportGenerator.generateEvalMapInterruptibly( - new BufferedInputStream(stream), predicate)); + return reportGenerator.generateEvalMapInterruptibly( + new BufferedInputStream(stream), predicate); } - private Uni> fireRequest(URI uri, InputStream stream) { + private Future> fireRequest(URI uri, InputStream stream) { var cf = new CompletableFuture>(); try (var http = HttpClients.createDefault(); stream) { @@ -143,6 +157,12 @@ private Uni> fireRequest(URI uri, InputStream stream } catch (Exception e) { cf.completeExceptionally(e); } - return Uni.createFrom().future(cf); + return cf; + } + + public static class ReportGenerationException extends RuntimeException { + public ReportGenerationException(Throwable cause) { + super(cause); + } } }