Skip to content
This repository has been archived by the owner on May 16, 2023. It is now read-only.

Fix: Delete Search Portal Entries #269

Merged
merged 5 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class Cancellation {
@JsonIgnore
private String dataExportError;

@Column(name = "search_portal_deleted")
private ZonedDateTime searchPortalDeleted;

@Transient()
private ZonedDateTime finalDeletion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public interface CancellationRepository extends JpaRepository<Cancellation, Stri
List<Cancellation> findByMovedToLongtermArchiveIsNullAndCancellationDateBefore(
ZonedDateTime expiryDate, Pageable pageable);

List<Cancellation> findBySearchPortalDeletedIsNullAndCancellationDateBefore(
ZonedDateTime expiryDate, Pageable pageable);

List<Cancellation> findByMovedToLongtermArchiveNotNullAndCsvCreatedIsNull(Pageable pageable);

List<Cancellation> findByCancellationDateBeforeAndDataDeletedIsNull(ZonedDateTime expiryDate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
package app.coronawarn.quicktest.service;

import app.coronawarn.quicktest.archive.domain.ArchiveCipherDtoV1;
import app.coronawarn.quicktest.config.CsvUploadConfig;
import app.coronawarn.quicktest.domain.Cancellation;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVWriter;
import com.opencsv.bean.StatefulBeanToCsv;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.ZonedDateTime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.crypto.codec.Hex;
import org.springframework.stereotype.Service;

@Slf4j
Expand All @@ -28,10 +12,6 @@
public class ArchiveSchedulingService {

private final ArchiveService archiveService;
private final CancellationService cancellationService;

private final CsvUploadConfig s3Config;
private final AmazonS3 s3Client;

/**
* Scheduler used for moving quicktests from qt archive to longterm.
Expand All @@ -44,97 +24,4 @@ public void moveToArchiveJob() {
archiveService.moveToArchive();
log.info("Completed Job: moveToArchiveJob");
}

/**
* Scheduler used for moving quicktests from qt archive to longterm when a cancellation was triggered.
*/
@Scheduled(cron = "${archive.cancellationArchiveJob.cron}")
@SchedulerLock(name = "CancellationArchiveJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
public void cancellationArchiveJob() {
log.info("Starting Job: cancellationArchiveJob");
processCancellationArchiveBatchRecursion(cancellationService.getReadyToArchiveBatch());
log.info("Completed Job: cancellationArchiveJob");
}

private void processCancellationArchiveBatchRecursion(List<Cancellation> cancellations) {
log.info("Process Cancellation Archive Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
String partnerId = cancellation.getPartnerId();
archiveService.moveToArchiveByTenantId(partnerId);
cancellationService.updateMovedToLongterm(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for moving longterm archives to bucket as a csv.
*/
@Scheduled(cron = "${archive.csvUploadJob.cron}")
@SchedulerLock(name = "CsvUploadJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
public void csvUploadJob() {
log.info("Starting Job: csvUploadJob");
processCsvUploadBatchRecursion(cancellationService.getReadyToUploadBatch());
log.info("Completed Job: csvUploadJob");
}

private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
log.info("Process CSV Upload Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
try {
List<ArchiveCipherDtoV1> quicktests =
archiveService.getQuicktestsFromLongtermByTenantId(cancellation.getPartnerId());

StringWriter stringWriter = new StringWriter();
CSVWriter csvWriter =
new CSVWriter(stringWriter, '\t', CSVWriter.NO_QUOTE_CHARACTER,
CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END);
StatefulBeanToCsv<ArchiveCipherDtoV1> beanToCsv =
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter)
.build();
beanToCsv.write(quicktests);
byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);

String objectId = cancellation.getPartnerId() + ".csv";

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(csvBytes.length);

s3Client.putObject(
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);

log.info("File stored to S3 with id {}", objectId);

cancellationService.updateCsvCreated(cancellation, ZonedDateTime.now(), objectId,
getHash(csvBytes), quicktests.size(), csvBytes.length);
} catch (Exception e) {
String errorMessage = e.getClass().getName() + ": " + e.getMessage();

log.error("Could not convert Quicktest to CSV: " + errorMessage);
cancellationService.updateDataExportError(cancellation, errorMessage);
}
}

List<Cancellation> nextBatch = cancellationService.getReadyToUploadBatch();
if (!nextBatch.isEmpty()) {
processCsvUploadBatchRecursion(nextBatch);
}
}

private String getHash(byte[] bytes) {
try {
MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = sha256.digest(bytes);
return String.valueOf(Hex.encode(hashBytes));
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to load SHA-256 Message Digest");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*-
* ---license-start
* Corona-Warn-App / cwa-quick-test-backend
* ---
* Copyright (C) 2021 T-Systems International GmbH and all other contributors
* ---
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ---license-end
*/

package app.coronawarn.quicktest.service;

import app.coronawarn.quicktest.archive.domain.ArchiveCipherDtoV1;
import app.coronawarn.quicktest.config.CsvUploadConfig;
import app.coronawarn.quicktest.domain.Cancellation;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVWriter;
import com.opencsv.bean.StatefulBeanToCsv;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.ZonedDateTime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.keycloak.representations.idm.GroupRepresentation;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.crypto.codec.Hex;
import org.springframework.stereotype.Service;

@Slf4j
@RequiredArgsConstructor
@Service
public class CancellationSchedulingService {

private final ArchiveService archiveService;
private final CancellationService cancellationService;

private final KeycloakService keycloakService;

private final CsvUploadConfig s3Config;
private final AmazonS3 s3Client;

/**
* Scheduler used for moving quicktests from qt archive to longterm when a cancellation was triggered.
*/
@Scheduled(cron = "${archive.cancellationArchiveJob.cron}")
@SchedulerLock(name = "CancellationArchiveJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
public void cancellationArchiveJob() {
log.info("Starting Job: cancellationArchiveJob");
processCancellationArchiveBatchRecursion(cancellationService.getReadyToArchiveBatch());
log.info("Completed Job: cancellationArchiveJob");
}

private void processCancellationArchiveBatchRecursion(List<Cancellation> cancellations) {
log.info("Process Cancellation Archive Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
String partnerId = cancellation.getPartnerId();
archiveService.moveToArchiveByTenantId(partnerId);
cancellationService.updateMovedToLongterm(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for moving longterm archives to bucket as a csv.
*/
@Scheduled(cron = "${archive.csvUploadJob.cron}")
@SchedulerLock(name = "CsvUploadJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
public void csvUploadJob() {
log.info("Starting Job: csvUploadJob");
processCsvUploadBatchRecursion(cancellationService.getReadyToUploadBatch());
log.info("Completed Job: csvUploadJob");
}

private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
log.info("Process CSV Upload Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
try {
List<ArchiveCipherDtoV1> quicktests =
archiveService.getQuicktestsFromLongtermByTenantId(cancellation.getPartnerId());

StringWriter stringWriter = new StringWriter();
CSVWriter csvWriter =
new CSVWriter(stringWriter, '\t', CSVWriter.NO_QUOTE_CHARACTER,
CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END);
StatefulBeanToCsv<ArchiveCipherDtoV1> beanToCsv =
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter)
.build();
beanToCsv.write(quicktests);
byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);

String objectId = cancellation.getPartnerId() + ".csv";

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(csvBytes.length);

s3Client.putObject(
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);

log.info("File stored to S3 with id {}", objectId);

cancellationService.updateCsvCreated(cancellation, ZonedDateTime.now(), objectId,
getHash(csvBytes), quicktests.size(), csvBytes.length);
} catch (Exception e) {
String errorMessage = e.getClass().getName() + ": " + e.getMessage();

log.error("Could not convert Quicktest to CSV: " + errorMessage);
cancellationService.updateDataExportError(cancellation, errorMessage);
}
}

List<Cancellation> nextBatch = cancellationService.getReadyToUploadBatch();
if (!nextBatch.isEmpty()) {
processCsvUploadBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for deleting SearchPortal entries.
*/
@Scheduled(cron = "${archive.cancellationSearchPortalDeleteJob.cron}")
@SchedulerLock(name = "CancellationSearchPortalDeleteJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationSearchPortalDeleteJob.locklimit}")
public void cancellationSearchPortalDeleteJob() {
log.info("Starting Job: cancellationSearchPortalDeleteJob");
processCancellationDeleteSearchPortalBatch(cancellationService.getReadyToDeleteSearchPortal());
log.info("Completed Job: cancellationSearchPortalDeleteJob");
}

private void processCancellationDeleteSearchPortalBatch(List<Cancellation> cancellations) {
log.info("Process Cancellation DeleteSearchPortal Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
GroupRepresentation rootGroup = keycloakService.getRootGroupByName(cancellation.getPartnerId());

if (rootGroup == null) {
log.error("Could not find RootGroup for Partner {}", cancellation.getPartnerId());
} else {
keycloakService.deleteSubGroupsFromMapService(rootGroup);
}

cancellationService.updateSearchPortalDeleted(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

private String getHash(byte[] bytes) {
try {
MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = sha256.digest(bytes);
return String.valueOf(Hex.encode(hashBytes));
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to load SHA-256 Message Digest");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void updateCsvCreated(Cancellation cancellation, ZonedDateTime csvCreated
* @param requester Username of the user who requested the download link
*/
public void updateDownloadLinkRequested(
Cancellation cancellation, ZonedDateTime downloadLinkRequested, String requester) {
Cancellation cancellation, ZonedDateTime downloadLinkRequested, String requester) {
cancellation.setDownloadLinkRequested(downloadLinkRequested);
cancellation.setDownloadLinkRequestedBy(requester);
cancellationRepository.save(cancellation);
Expand Down Expand Up @@ -170,6 +170,17 @@ public void updateDataExportError(Cancellation cancellation, String errorMessage
cancellationRepository.save(cancellation);
}

/**
* Set SearchPortalDeleted Flag/Timestamp and persist entity.
*
* @param cancellation Cancellation Entity
* @param dataDeleted timestamp of search portal deletion
*/
public void updateSearchPortalDeleted(Cancellation cancellation, ZonedDateTime dataDeleted) {
cancellation.setSearchPortalDeleted(dataDeleted);
cancellationRepository.save(cancellation);
}

/**
* Searches in the DB for an existing cancellation entity which download request is older than 48h and not
* moved_to_longterm_archive.
Expand All @@ -185,6 +196,20 @@ public List<Cancellation> getReadyToArchiveBatch() {
ldt, PageRequest.of(0, archiveProperties.getCancellationArchiveJob().getChunkSize()));
}

/**
* Searches in the DB for an existing cancellation entity with searchPortalDeleted null and
* cancellation_date in past.
* Returns only one batch of entities. Batch Size depends on configuration.
*
* @return List holding all entities found.
*/
public List<Cancellation> getReadyToDeleteSearchPortal() {
ZonedDateTime ldt = ZonedDateTime.now();

return cancellationRepository.findBySearchPortalDeletedIsNullAndCancellationDateBefore(
ldt, PageRequest.of(0, archiveProperties.getCancellationArchiveJob().getChunkSize()));
}

/**
* Searches in the DB for an existing cancellation entity which moved_to_longterm_archive is not null but
* csv_created is null.
Expand Down
Loading