diff --git a/aws/buckets.sh b/aws/buckets.sh new file mode 100755 index 000000000..f83bb26e1 --- /dev/null +++ b/aws/buckets.sh @@ -0,0 +1,3 @@ +#!/usr/bin/sh + +awslocal s3 mb s3://archivedrecordings diff --git a/compose.yml b/compose.yml index 6d13e3cbb..a4f47ef98 100644 --- a/compose.yml +++ b/compose.yml @@ -1,25 +1,36 @@ version: "3" services: - # TODO evaluate this downstream image. This allows the backend to assume that the storage buckets - # already exist in minio, which may or may not be a fair assumption for users in a production environment. - # would users prefer that assumption be made and they have to manually set up the buckets? the bucket names - # are configurable for the cryostat application so there is no forced name collision, so perhaps - # automatic creation of the buckets on startup is OK. - # minio: - # image: docker.io/bitnami/minio:latest - # hostname: minio + # s3: + # image: docker.io/localstack/localstack:1.4.0 + # hostname: s3 # ports: - # - "9000:9000" - # - "9001:9001" + # - "4566:4566" + # - "4577:4577" # environment: - # MINIO_ROOT_USER: minioroot - # MINIO_ROOT_PASSWORD: minioroot - # MINIO_DEFAULT_BUCKETS: archivedrecordings + # SERVICES: s3 + # START_WEB: 1 + # AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-aws_access_key_id} + # AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-aws_secret_access_key} + # DEFAULT_REGION: us-east-1 + # PORT_WEB_UI: 4577 # volumes: - # - minio_data:/data - # - minio_certs:/certs - minio: + # - ./aws:/etc/localstack/init/ready.d:z + # # this buckets.sh hook script isn't working for some reason. In the meantime, after spinning up the localstack instance, do: + # # $ podman exec -it cryostat3_s3_1 /bin/bash + # # $ awslocal s3 mb s3://archivedrecordings + # labels: + # kompose.service.expose: "s3" + # restart: unless-stopped + # healthcheck: + # test: curl --fail http://localhost:4566 || exit 1 + # interval: 10s + # retries: 3 + # start_period: 10s + # timeout: 5s + # + + s3: image: docker.io/minio/minio:latest hostname: minio ports: @@ -27,8 +38,9 @@ services: - "9000:9000" command: server /data --console-address ":9001" environment: - MINIO_ROOT_USER: minioroot - MINIO_ROOT_PASSWORD: minioroot + MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioroot} + MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioroot} + MINIO_DEFAULT_BUCKETS: archivedrecordings volumes: - minio_data:/data - minio_certs:/certs @@ -90,7 +102,7 @@ services: cryostat: depends_on: - db - - minio + - s3 image: quay.io/cryostat/cryostat3:dev hostname: cryostat3 expose: @@ -105,12 +117,13 @@ services: QUARKUS_DATASOURCE_USERNAME: cryostat3 QUARKUS_DATASOURCE_PASSWORD: cryostat3 QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://db:5432/cryostat3 - QUARKUS_MINIO_URL: http://minio:9000 - # create access key in minio console by visiting http://localhost:9001 - # `podman-compose up minio` may be useful to start minio first to create - # these secrets, then `export MINIO_ACCESS=abcd ; export MINIO_SECRET=1234 ; podman-compose up` - QUARKUS_MINIO_ACCESS_KEY: ${MINIO_ACCESS:-minioroot} - QUARKUS_MINIO_SECRET_KEY: ${MINIO_SECRET:-minioroot} + STORAGE_BUCKETS_ARCHIVES_NAME: archivedrecordings + QUARKUS_S3_ENDPOINT_OVERRIDE: http://s3:9000 + QUARKUS_S3_PATH_STYLE_ACCESS: "true" # needed for Minio, but if the specific S3 impl supports DNS subdomain style access then that should be preferred + QUARKUS_S3_AWS_REGION: us-east-1 + QUARKUS_S3_AWS_CREDENTIALS_TYPE: default + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-minioroot} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-minioroot} CRYOSTAT_JDP_ENABLED: "true" JAVA_OPTS_APPEND: "-XX:+FlightRecorder -XX:StartFlightRecording=name=onstart,settings=default,disk=true,maxage=5m -Dcom.sun.management.jmxremote.autodiscovery=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9091 -Dcom.sun.management.jmxremote.rmi.port=9091 -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false" restart: unless-stopped diff --git a/pom.xml b/pom.xml index a883cbecf..495cd8a97 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,6 @@ UTF-8 2.21.0 - 3.1.0.Final 1.15 2.11.0 4.5.14 @@ -112,15 +111,14 @@ io.quarkus quarkus-jdbc-postgresql - - io.quarkiverse.minio - quarkus-minio - ${io.quarkiverse.minio.version} - io.quarkiverse.amazonservices quarkus-amazon-s3 + + software.amazon.awssdk + url-connection-client + org.apache.commons commons-lang3 diff --git a/src/main/java/io/cryostat/recordings/ActiveRecording.java b/src/main/java/io/cryostat/recordings/ActiveRecording.java index 4a669e47d..1ed87ec1f 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecording.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecording.java @@ -189,13 +189,11 @@ public void preUpdate(ActiveRecording activeRecording) throws Exception { connectionManager.executeConnectedTask( activeRecording.target, conn -> { - conn.getService().getAvailableRecordings().stream() - .filter(rec -> rec.getId() == activeRecording.remoteId) - .findFirst() + Recordings.getDescriptorById(conn, activeRecording.remoteId) .ifPresent( d -> { try { - conn.getService().close(d); + conn.getService().stop(d); } catch (FlightRecorderException | IOException | ServiceNotAvailableException e) { diff --git a/src/main/java/io/cryostat/recordings/Recordings.java b/src/main/java/io/cryostat/recordings/Recordings.java index 55f532222..6ceddca87 100644 --- a/src/main/java/io/cryostat/recordings/Recordings.java +++ b/src/main/java/io/cryostat/recordings/Recordings.java @@ -37,12 +37,10 @@ */ package io.cryostat.recordings; -import java.io.BufferedInputStream; -import java.io.IOException; import java.net.URI; -import java.nio.file.Files; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; @@ -53,9 +51,9 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.openjdk.jmc.common.unit.IConstrainedMap; import org.openjdk.jmc.common.unit.IOptionDescriptor; @@ -67,7 +65,6 @@ import org.openjdk.jmc.rjmx.services.jfr.IFlightRecorderService; import org.openjdk.jmc.rjmx.services.jfr.IRecordingDescriptor; -import io.cryostat.ProgressInputStream; import io.cryostat.core.net.JFRConnection; import io.cryostat.core.sys.Clock; import io.cryostat.core.templates.Template; @@ -75,27 +72,11 @@ import io.cryostat.recordings.ActiveRecording.Listener.RecordingEvent; import io.cryostat.targets.Target; import io.cryostat.targets.TargetConnectionManager; +import io.cryostat.util.HttpStatusCodeIdentifier; import io.cryostat.ws.MessagingServer; import io.cryostat.ws.Notification; -import io.minio.BucketExistsArgs; -import io.minio.GetObjectTagsArgs; -import io.minio.ListObjectsArgs; -import io.minio.MakeBucketArgs; -import io.minio.MinioClient; -import io.minio.PutObjectArgs; -import io.minio.RemoveObjectArgs; -import io.minio.RemoveObjectsArgs; -import io.minio.Result; -import io.minio.StatObjectArgs; -import io.minio.errors.ErrorResponseException; -import io.minio.errors.InsufficientDataException; -import io.minio.errors.InternalException; -import io.minio.errors.InvalidResponseException; -import io.minio.errors.ServerException; -import io.minio.errors.XmlParserException; -import io.minio.messages.DeleteObject; -import io.minio.messages.Item; +import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; import io.vertx.core.eventbus.EventBus; @@ -114,6 +95,7 @@ import jakarta.ws.rs.ServerErrorException; import jakarta.ws.rs.core.Response; import jdk.jfr.RecordingState; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -122,6 +104,26 @@ import org.jboss.resteasy.reactive.RestPath; import org.jboss.resteasy.reactive.RestResponse; import org.jboss.resteasy.reactive.multipart.FileUpload; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.Tag; +import software.amazon.awssdk.services.s3.model.Tagging; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; @Path("") public class Recordings { @@ -136,20 +138,35 @@ public class Recordings { @Inject RecordingOptionsBuilderFactory recordingOptionsBuilderFactory; @Inject EventOptionsBuilder.Factory eventOptionsBuilderFactory; @Inject Clock clock; - @Inject MinioClient minio; + @Inject S3Client storage; @Inject RemoteRecordingInputStreamFactory remoteRecordingStreamFactory; @Inject ScheduledExecutorService scheduler; + @Inject ObjectMapper mapper; + private final Base64 base64Url = new Base64(true); - @ConfigProperty(name = "minio.buckets.archives.name") + @ConfigProperty(name = "storage.buckets.archives.name") String archiveBucket; void onStart(@Observes StartupEvent evt) { + boolean exists = false; try { - if (!minio.bucketExists(BucketExistsArgs.builder().bucket(archiveBucket).build())) { - minio.makeBucket(MakeBucketArgs.builder().bucket(archiveBucket).build()); - } + exists = + HttpStatusCodeIdentifier.isSuccessCode( + storage.headBucket( + HeadBucketRequest.builder() + .bucket(archiveBucket) + .build()) + .sdkHttpResponse() + .statusCode()); } catch (Exception e) { - logger.error(e); + logger.info(e); + } + if (!exists) { + try { + storage.createBucket(CreateBucketRequest.builder().bucket(archiveBucket).build()); + } catch (Exception e) { + logger.error(e); + } } } @@ -158,43 +175,23 @@ void onStart(@Observes StartupEvent evt) { @RolesAllowed("read") public List listArchivesV1() { var result = new ArrayList(); - minio.listObjects(ListObjectsArgs.builder().bucket(archiveBucket).recursive(true).build()) + storage.listObjectsV2(ListObjectsV2Request.builder().bucket(archiveBucket).build()) + .contents() .forEach( - r -> { - try { - Item item = r.get(); - String path = item.objectName(); - String[] parts = path.split("/"); - String jvmId = parts[0]; - String filename = parts[1]; - Metadata metadata = - new Metadata( - minio.getObjectTags( - GetObjectTagsArgs.builder() - .bucket(archiveBucket) - .object(path) - .build()) - .get()); - result.add( - new ArchivedRecording( - filename, - "TODO", - "TODO", - metadata, - item.size(), - item.lastModified().toEpochSecond())); - } catch (InvalidKeyException - | ErrorResponseException - | IllegalArgumentException - | InsufficientDataException - | InternalException - | InvalidResponseException - | NoSuchAlgorithmException - | ServerException - | XmlParserException - | IOException e) { - logger.error("Could not retrieve list of archived recordings", e); - } + item -> { + String path = item.key(); + String[] parts = path.split("/"); + String jvmId = parts[0]; + String filename = parts[1]; + Metadata metadata = getArchivedRecordingMetadata(jvmId, filename); + result.add( + new ArchivedRecording( + filename, + "TODO", + "TODO", + metadata, + item.size(), + item.lastModified().getEpochSecond())); }); return result; } @@ -234,39 +231,16 @@ public void agentPush( logger.infov( "recording:{0}, labels:{1}, maxFiles:{2}", recording.fileName(), labels, maxFiles); doUpload(recording, metadata, jvmId); - var objs = new ArrayList>(); - minio.listObjects( - ListObjectsArgs.builder() - .bucket(archiveBucket) - .prefix(jvmId) - .recursive(true) - .build()) + var objs = new ArrayList(); + storage.listObjectsV2( + ListObjectsV2Request.builder().bucket(archiveBucket).prefix(jvmId).build()) + .contents() .iterator() .forEachRemaining(objs::add); var toRemove = objs.stream() - .map( - r -> { - try { - return r.get(); - } catch (IllegalArgumentException - | InvalidKeyException - | ErrorResponseException - | InsufficientDataException - | InternalException - | InvalidResponseException - | NoSuchAlgorithmException - | ServerException - | XmlParserException - | IOException e) { - logger.warn("Could not retrieve file modification time", e); - return null; - } - }) - .filter(Objects::nonNull) .sorted((a, b) -> b.lastModified().compareTo(a.lastModified())) .skip(max) - .map(r -> r.objectName()) .toList(); if (toRemove.isEmpty()) { return; @@ -285,29 +259,28 @@ public void agentPush( metadata, 0 /*filesize*/, clock.getMonotonicTime())))); - minio.removeObjects( - RemoveObjectsArgs.builder() + storage.deleteObjects( + DeleteObjectsRequest.builder() .bucket(archiveBucket) - .objects(toRemove.stream().map(DeleteObject::new).toList()) + .delete( + Delete.builder() + .objects( + toRemove.stream() + .map(S3Object::key) + .map( + k -> + ObjectIdentifier + .builder() + .key(k) + .build()) + .toList()) + .build()) .build()) + .errors() .forEach( err -> { - try { - logger.errorv( - "Deletion failure: {0} due to {1}", - err.get().objectName(), err.get().message()); - } catch (InvalidKeyException - | ErrorResponseException - | IllegalArgumentException - | InsufficientDataException - | InternalException - | InvalidResponseException - | NoSuchAlgorithmException - | ServerException - | XmlParserException - | IOException e) { - logger.error("Failed to retrieve remote error", e); - } + logger.errorv( + "Deletion failure: {0} due to {1}", err.key(), err.message()); }); } @@ -317,46 +290,22 @@ public void agentPush( @RolesAllowed("read") public List agentGet(@RestPath String jvmId) { var result = new ArrayList(); - minio.listObjects( - ListObjectsArgs.builder() - .bucket(archiveBucket) - .prefix(jvmId) - .recursive(true) - .build()) + storage.listObjectsV2( + ListObjectsV2Request.builder().bucket(archiveBucket).prefix(jvmId).build()) + .contents() .forEach( - r -> { - try { - Item item = r.get(); - String objectName = item.objectName(); - String filename = objectName.split("/")[1]; - Metadata metadata = - new Metadata( - minio.getObjectTags( - GetObjectTagsArgs.builder() - .bucket(archiveBucket) - .object(objectName) - .build()) - .get()); - result.add( - new ArchivedRecording( - filename, - "TODO", - "TODO", - metadata, - item.size(), - item.lastModified().toEpochSecond())); - } catch (InvalidKeyException - | ErrorResponseException - | IllegalArgumentException - | InsufficientDataException - | InternalException - | InvalidResponseException - | NoSuchAlgorithmException - | ServerException - | XmlParserException - | IOException e) { - logger.error("Could not retrieve list of archived recordings", e); - } + item -> { + String objectName = item.key(); + String filename = objectName.split("/")[1]; + Metadata metadata = getArchivedRecordingMetadata(jvmId, filename); + result.add( + new ArchivedRecording( + filename, + "TODO", + "TODO", + metadata, + item.size(), + item.lastModified().getEpochSecond())); }); return result; } @@ -370,18 +319,15 @@ public void agentDelete( @RestForm("recording") FileUpload recording, @RestForm("labels") JsonObject rawLabels) throws Exception { - minio.removeObject( - RemoveObjectArgs.builder() + storage.deleteObject( + DeleteObjectRequest.builder() .bucket(archiveBucket) - .object(String.format("%s/%s", jvmId, filename)) + .key(String.format("%s/%s", jvmId, filename)) .build()); } @Blocking - Map doUpload(FileUpload recording, Metadata metadata, String jvmId) - throws InvalidKeyException, ErrorResponseException, InsufficientDataException, - InternalException, InvalidResponseException, NoSuchAlgorithmException, - ServerException, XmlParserException, IllegalArgumentException, IOException { + Map doUpload(FileUpload recording, Metadata metadata, String jvmId) { logger.infov( "Upload: {0} {1} {2} {3}", recording.name(), recording.fileName(), recording.filePath(), metadata.labels); @@ -393,33 +339,14 @@ Map doUpload(FileUpload recording, Metadata metadata, String jvm filename = filename + ".jfr"; } long filesize = recording.size(); - logger.infov("Uploading {0} ({1} bytes) to Minio storage...", filename, filesize); - try (var stream = - new ProgressInputStream( - new BufferedInputStream(Files.newInputStream(recording.filePath())), - new Consumer() { - long total; - - @Override - public void accept(Integer n) { - total += n; - logger.infov( - "Minio upload: {0}/{1} bytes ({2}%)", - total, filesize, (total * 100.0d) / filesize); - } - })) { - // TODO attach other metadata than labels somehow - long mib = 1024 * 1024; - minio.putObject( - PutObjectArgs.builder() - .bucket(archiveBucket) - .object(String.format("%s/%s", jvmId, filename)) - .contentType(JFR_MIME) - .stream(stream, -1 /*filesize*/, 5 * mib) - // FIXME invalid tag error from Minio? - // .tags(metadata.labels) - .build()); - } + storage.putObject( + PutObjectRequest.builder() + .bucket(archiveBucket) + .key(String.format("%s/%s", jvmId, filename)) + .contentType(JFR_MIME) + .tagging(createMetadataTagging(metadata)) + .build(), + RequestBody.fromFile(recording.filePath())); logger.info("Upload complete"); bus.publish( MessagingServer.class.getName(), @@ -443,10 +370,10 @@ public void accept(Integer n) { @RolesAllowed("write") @Blocking public void delete(@RestPath String filename) throws Exception { - minio.removeObject( - RemoveObjectArgs.builder() + storage.deleteObject( + DeleteObjectRequest.builder() .bucket(archiveBucket) - .object(String.format("%s/%s", "uploads", filename)) + .key(String.format("%s/%s", "uploads", filename)) .build()); } @@ -455,62 +382,34 @@ public void delete(@RestPath String filename) throws Exception { @RolesAllowed("read") public Collection listFsArchives() { var map = new HashMap(); - minio.listObjects( - ListObjectsArgs.builder() - .bucket(archiveBucket) - .recursive(true) - .includeUserMetadata(true) - .build()) + storage.listObjectsV2(ListObjectsV2Request.builder().bucket(archiveBucket).build()) + .contents() .forEach( - r -> { - try { - Item item = r.get(); - String path = item.objectName(); - String[] parts = path.split("/"); - String jvmId = parts[0]; - String connectUrl; - if (item.userMetadata().containsKey("connectUrl")) { - // FIXME this is broken somehow - the userMetadata map is always - // empty - connectUrl = item.userMetadata().get("connectUrl"); - } else { - connectUrl = "lost-" + jvmId; - } - var dir = - map.computeIfAbsent( - jvmId, - id -> - new ArchivedRecordingDirectory( - connectUrl, id, new ArrayList<>())); - String filename = parts[1]; - Metadata metadata = - new Metadata( - minio.getObjectTags( - GetObjectTagsArgs.builder() - .bucket(archiveBucket) - .object(path) - .build()) - .get()); - dir.recordings.add( - new ArchivedRecording( - filename, - "TODO", - "TODO", - metadata, - item.size(), - item.lastModified().toEpochSecond())); - } catch (InvalidKeyException - | ErrorResponseException - | IllegalArgumentException - | InsufficientDataException - | InternalException - | InvalidResponseException - | NoSuchAlgorithmException - | ServerException - | XmlParserException - | IOException e) { - logger.error("Could not retrieve list of archived recordings", e); - } + item -> { + String path = item.key(); + String[] parts = path.split("/"); + String jvmId = parts[0]; + String filename = parts[1]; + + Metadata metadata = getArchivedRecordingMetadata(jvmId, filename); + + String connectUrl = + metadata.labels.computeIfAbsent( + "connectUrl", k -> "lost-" + jvmId); + var dir = + map.computeIfAbsent( + jvmId, + id -> + new ArchivedRecordingDirectory( + connectUrl, id, new ArrayList<>())); + dir.recordings.add( + new ArchivedRecording( + filename, + "TODO", + "TODO", + metadata, + item.size(), + item.lastModified().getEpochSecond())); }); return map.values(); } @@ -561,32 +460,92 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod } @Blocking - String saveRecording(Target target, ActiveRecording activeRecording) - throws ErrorResponseException, InsufficientDataException, InternalException, - InvalidKeyException, InvalidResponseException, IOException, - NoSuchAlgorithmException, ServerException, XmlParserException, Exception { + String saveRecording(Target target, ActiveRecording activeRecording) throws Exception { String timestamp = clock.now().truncatedTo(ChronoUnit.SECONDS).toString().replaceAll("[-:]+", ""); String filename = String.format("%s_%s_%s.jfr", target.alias, activeRecording.name, timestamp); - long mib = 1024 * 1024; - try (var stream = remoteRecordingStreamFactory.open(target, activeRecording)) { - // TODO attach other metadata than labels somehow - minio.putObject( - PutObjectArgs.builder() - .bucket(archiveBucket) - .object(String.format("%s/%s", target.jvmId, filename)) - .contentType(JFR_MIME) - .stream(stream, -1, 5 * mib) - .tags(activeRecording.metadata.labels()) - .userMetadata( - Map.of( - "connectUrl", - target.connectUrl.toString(), - "recordingName", - activeRecording.name)) - .build()); + int mib = 1024 * 1024; + String key = String.format("%s/%s", target.jvmId, filename); + String multipartId = null; + List> parts = new ArrayList<>(); + try (var stream = remoteRecordingStreamFactory.open(target, activeRecording); + var ch = Channels.newChannel(stream)) { + ByteBuffer buf = ByteBuffer.allocate(20 * mib); + multipartId = + storage.createMultipartUpload( + CreateMultipartUploadRequest.builder() + .bucket(archiveBucket) + .key(key) + .contentType(JFR_MIME) + .tagging( + createMetadataTagging(activeRecording.metadata)) + .build()) + .uploadId(); + int read = 0; + long accum = 0; + for (int i = 1; i <= 10_000; i++) { + read = ch.read(buf); + accum += read; + if (read == -1) { + logger.infov("Completed upload of {0} chunks ({1} bytes)", i - 1, accum); + break; + } + logger.infov("Writing chunk {0} of {1} bytes", i, read); + String eTag = + storage.uploadPart( + UploadPartRequest.builder() + .bucket(archiveBucket) + .key(key) + .uploadId(multipartId) + .partNumber(i) + .build(), + RequestBody.fromByteBuffer(buf)) + .eTag(); + parts.add(Pair.of(i, eTag)); + // S3 API limit + if (i == 10_000) { + throw new IndexOutOfBoundsException("Exceeded S3 maximum part count"); + } + } + } catch (Exception e) { + logger.error("Could not upload recording to S3 storage", e); + try { + if (multipartId != null) { + storage.abortMultipartUpload( + AbortMultipartUploadRequest.builder() + .bucket(archiveBucket) + .key(key) + .uploadId(multipartId) + .build()); + } + } catch (Exception e2) { + logger.error("Could not abort S3 multipart upload", e2); + } + throw e; } + storage.completeMultipartUpload( + CompleteMultipartUploadRequest.builder() + .bucket(archiveBucket) + .key(key) + .uploadId(multipartId) + .multipartUpload( + CompletedMultipartUpload.builder() + .parts( + parts.stream() + .map( + part -> + CompletedPart.builder() + .partNumber( + part + .getLeft()) + .eTag( + part + .getRight()) + .build()) + .toList()) + .build()) + .build()); bus.publish( MessagingServer.class.getName(), new Notification( @@ -634,7 +593,7 @@ public LinkedRecordingDescriptor createRecording( @RestForm Optional toDisk, @RestForm Optional maxAge, @RestForm Optional maxSize, - @RestForm Optional metadata, + @RestForm("metadata") Optional rawMetadata, @RestForm Optional archiveOnStop) throws Exception { if (StringUtils.isBlank(recordingName)) { @@ -674,12 +633,11 @@ public LinkedRecordingDescriptor createRecording( if (maxSize.isPresent()) { optionsBuilder.maxSize(maxSize.get()); } - // if (attrs.contains("metadata")) { - // metadata = - // gson.fromJson( - // attrs.get("metadata"), - // new TypeToken() {}.getType()); - // } + Map labels = new HashMap<>(); + if (rawMetadata.isPresent()) { + labels.putAll( + mapper.readValue(rawMetadata.get(), Metadata.class).labels); + } IConstrainedMap recordingOptions = optionsBuilder.build(); Pair template = @@ -700,13 +658,9 @@ public LinkedRecordingDescriptor createRecording( templateName, preferredTemplateType)); - Metadata meta = - new Metadata( - Map.of( - "template.name", - templateName, - "template.type", - preferredTemplateType.name())); + labels.put("template.name", templateName); + labels.put("template.type", preferredTemplateType.name()); + Metadata meta = new Metadata(labels); return new LinkedRecordingDescriptor( desc.getId(), mapState(desc), @@ -729,7 +683,7 @@ public LinkedRecordingDescriptor createRecording( if (recording.duration > 0) { scheduler.schedule( - () -> stopRecording(target.id, recording.id, archiveOnStop.orElse(false)), + () -> stopRecording(target.id, recording.remoteId, archiveOnStop.orElse(false)), recording.duration, TimeUnit.MILLISECONDS); } @@ -782,7 +736,7 @@ static Optional getDescriptorByName( @POST @Path("/api/v1/targets/{connectUrl}/recordings") @RolesAllowed("write") - public LinkedRecordingDescriptor createRecordingV1( + public Response createRecordingV1( @RestPath URI connectUrl, @RestForm String recordingName, @RestForm String events, @@ -793,17 +747,13 @@ public LinkedRecordingDescriptor createRecordingV1( @RestForm Optional metadata, @RestForm Optional archiveOnStop) throws Exception { - Target target = Target.getTargetByConnectUrl(connectUrl); - return createRecording( - target.id, - recordingName, - events, - duration, - archiveOnStop, - maxAge, - maxSize, - metadata, - archiveOnStop); + return Response.status(RestResponse.Status.PERMANENT_REDIRECT) + .location( + URI.create( + String.format( + "/api/v3/targets/%d/recordings", + Target.getTargetByConnectUrl(connectUrl).id))) + .build(); } @Transactional @@ -853,22 +803,13 @@ public void deleteRecording(@RestPath long targetId, @RestPath long remoteId) th public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String filename) throws Exception { - var item = - minio.statObject( - StatObjectArgs.builder() - .bucket(archiveBucket) - .object(String.format("%s/%s", jvmId, filename)) - .build()); + var metadata = getArchivedRecordingMetadata(jvmId, filename); - String connectUrl = "lost-" + jvmId; - if (item.userMetadata().containsKey("connectUrl")) { - // FIXME this is broken somehow - the userMetadata map is always empty - connectUrl = item.userMetadata().get("connectUrl"); - } - minio.removeObject( - RemoveObjectArgs.builder() + String connectUrl = metadata.labels.computeIfAbsent("connectUrl", k -> "lost-" + jvmId); + storage.deleteObject( + DeleteObjectRequest.builder() .bucket(archiveBucket) - .object(String.format("%s/%s", jvmId, filename)) + .key(String.format("%s/%s", jvmId, filename)) .build()); bus.publish( MessagingServer.class.getName(), @@ -912,6 +853,61 @@ public Map getRecordingOptions(@RestPath long id) throws Excepti }); } + private Tagging createMetadataTagging(Metadata metadata) { + // TODO attach other metadata than labels somehow. Prefixed keys to create partitioning? + return Tagging.builder() + .tagSet( + metadata.labels.entrySet().stream() + .map( + e -> + Tag.builder() + .key( + base64Url + .encodeAsString( + e.getKey() + .getBytes()) + .trim()) + // e.getKey()) + .value( + base64Url + .encodeAsString( + e.getValue() + .getBytes()) + .trim()) + .build()) + .toList()) + .build(); + } + + private Metadata taggingToMetadata(List tagSet) { + // TODO parse out other metadata than labels + return new Metadata( + tagSet.stream() + .map( + tag -> + Pair.of( + new String( + base64Url.decode(tag.key()), + StandardCharsets.UTF_8) + .trim(), + // tag.key(), + new String( + base64Url.decode(tag.value()), + StandardCharsets.UTF_8) + .trim())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue))); + } + + private Metadata getArchivedRecordingMetadata(String jvmId, String filename) { + return taggingToMetadata( + storage.getObjectTagging( + GetObjectTaggingRequest.builder() + .bucket(archiveBucket) + .key(String.format("%s/%s", jvmId, filename)) + .build()) + .tagSet()); + } + private static Map getRecordingOptions( IFlightRecorderService service, RecordingOptionsBuilder builder) throws Exception { IConstrainedMap recordingOptions = builder.build(); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 63e3eda59..377b99130 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -3,10 +3,6 @@ cryostat.jdp.enabled=false quarkus.test.integration-test-profile=test -# minio devservice login credentials: minioaccess:miniosecret -# the console URL and access credentials can also be found here while in dev mode: http://localhost:8181/q/dev/ -minio.buckets.archives.name=archivedrecordings - quarkus.http.auth.proactive=false quarkus.http.host=0.0.0.0 quarkus.http.port=8181 @@ -55,18 +51,21 @@ quarkus.datasource.db-kind=postgresql quarkus.datasource.db-version=13.0 quarkus.datasource.devservices.enabled=true quarkus.datasource.devservices.image-name=quay.io/cryostat/cryostat3-db + # !!! prod databases must set this configuration parameter some other way via a secret !!! quarkus.datasource.devservices.command=postgres -c encrypt.key=REPLACEME # !!! -testcontainers.reuse.enable=true -quarkus.minio.devservices.enabled=true + +storage.buckets.archives.name=archivedrecordings +quarkus.s3.devservices.enabled=true +quarkus.s3.devservices.buckets=archivedrecordings quarkus.quinoa.build-dir=dist quarkus.quinoa.frozen-lockfile=true quarkus.quinoa.dev-server.port=9000 quarkus.quinoa.enable-spa-routing=true quarkus.quinoa.package-manager-install=true -quarkus.quinoa.package-manager-install.node-version=16.18.1 +quarkus.quinoa.package-manager-install.node-version=18.16.0 quarkus.quinoa.package-manager-command.build=yarn yarn:frzinstall && yarn build:notests quarkus.quinoa.package-manager-command.test=yarn test:ci quarkus.quinoa.package-manager-command.dev=yarn start:dev