Skip to content
This repository has been archived by the owner on May 16, 2023. It is now read-only.

Commit

Permalink
Feature/skip large exports (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-burwig authored Jun 5, 2020
1 parent eb68e11 commit a0f322d
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import app.coronawarn.server.services.distribution.config.DistributionServiceConfig;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.Temporal;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,12 +36,16 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An instance of this class contains a collection of {@link DiagnosisKey DiagnosisKeys}.
*/
public abstract class DiagnosisKeyBundler {

private static final Logger logger = LoggerFactory.getLogger(DiagnosisKeyBundler.class);

/**
* The submission timestamp is counted in 1 hour intervals since epoch.
*/
Expand All @@ -51,8 +56,9 @@ public abstract class DiagnosisKeyBundler {
*/
public static final long TEN_MINUTES_INTERVAL_SECONDS = TimeUnit.MINUTES.toSeconds(10);

protected final int minNumberOfKeysPerBundle;
protected final long expiryPolicyMinutes;
protected final int minNumberOfKeysPerBundle;
private final int maxNumberOfKeysPerBundle;

// The hour at which the distribution runs. This field is needed to prevent the run from distributing any keys that
// have already been submitted but may only be distributed in the future (e.g. because they are not expired yet).
Expand All @@ -61,9 +67,13 @@ public abstract class DiagnosisKeyBundler {
// A map containing diagnosis keys, grouped by the LocalDateTime on which they may be distributed
protected final Map<LocalDateTime, List<DiagnosisKey>> distributableDiagnosisKeys = new HashMap<>();

/**
* Constructs a DiagnosisKeyBundler based on the specified service configuration.
*/
public DiagnosisKeyBundler(DistributionServiceConfig distributionServiceConfig) {
this.minNumberOfKeysPerBundle = distributionServiceConfig.getShiftingPolicyThreshold();
this.expiryPolicyMinutes = distributionServiceConfig.getExpiryPolicyMinutes();
this.minNumberOfKeysPerBundle = distributionServiceConfig.getShiftingPolicyThreshold();
this.maxNumberOfKeysPerBundle = distributionServiceConfig.getMaximumNumberOfKeysPerBundle();
}

/**
Expand Down Expand Up @@ -106,19 +116,38 @@ public List<DiagnosisKey> getAllDiagnosisKeys() {
public Set<LocalDate> getDatesWithDistributableDiagnosisKeys() {
return this.distributableDiagnosisKeys.keySet().stream()
.map(LocalDateTime::toLocalDate)
.filter(this::numberOfKeysForDateBelowMaximum)
.collect(Collectors.toSet());
}

public boolean numberOfKeysForDateBelowMaximum(LocalDate date) {
return numberOfKeysBelowMaximum(getDiagnosisKeysForDate(date).size(), date);
}

/**
* Returns a set of all {@link LocalDateTime hours} of a specified {@link LocalDate date} during which {@link
* DiagnosisKey diagnosis keys} shall be distributed.
*/
public Set<LocalDateTime> getHoursWithDistributableDiagnosisKeys(LocalDate currentDate) {
return this.distributableDiagnosisKeys.keySet().stream()
.filter(dateTime -> dateTime.toLocalDate().equals(currentDate))
.filter(this::numberOfKeysForHourBelowMaximum)
.collect(Collectors.toSet());
}

private boolean numberOfKeysForHourBelowMaximum(LocalDateTime hour) {
return numberOfKeysBelowMaximum(getDiagnosisKeysForHour(hour).size(), hour);
}

private boolean numberOfKeysBelowMaximum(int numberOfKeys, Temporal time) {
if (numberOfKeys > maxNumberOfKeysPerBundle) {
logger.error("Number of diagnosis keys ({}) for {} exceeds the configured maximum.", numberOfKeys, time);
return false;
} else {
return true;
}
}

/**
* Returns the submission timestamp of a {@link DiagnosisKey} as a {@link LocalDateTime}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public void prepare(ImmutableStack<Object> indices) {

private IndexDirectory<LocalDate, WritableOnDisk> decorateDateDirectory(DiagnosisKeysDateDirectory dateDirectory) {
return new DateAggregatingDecorator(new DateIndexingDecorator(dateDirectory, distributionServiceConfig),
cryptoProvider, distributionServiceConfig);
cryptoProvider, distributionServiceConfig, diagnosisKeyBundler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import app.coronawarn.server.common.protocols.external.exposurenotification.TemporaryExposureKey;
import app.coronawarn.server.common.protocols.external.exposurenotification.TemporaryExposureKeyExport;
import app.coronawarn.server.services.distribution.assembly.component.CryptoProvider;
import app.coronawarn.server.services.distribution.assembly.diagnosiskeys.DiagnosisKeyBundler;
import app.coronawarn.server.services.distribution.assembly.diagnosiskeys.structure.archive.decorator.singing.DiagnosisKeySigningDecorator;
import app.coronawarn.server.services.distribution.assembly.diagnosiskeys.structure.file.TemporaryExposureKeyExportFile;
import app.coronawarn.server.services.distribution.assembly.structure.Writable;
Expand Down Expand Up @@ -54,15 +55,17 @@ public class DateAggregatingDecorator extends IndexDirectoryDecorator<LocalDate,

private final CryptoProvider cryptoProvider;
private final DistributionServiceConfig distributionServiceConfig;
private final DiagnosisKeyBundler diagnosisKeyBundler;

/**
* Creates a new DateAggregatingDecorator.
*/
public DateAggregatingDecorator(IndexDirectory<LocalDate, WritableOnDisk> directory, CryptoProvider cryptoProvider,
DistributionServiceConfig distributionServiceConfig) {
DistributionServiceConfig distributionServiceConfig, DiagnosisKeyBundler diagnosisKeyBundler) {
super(directory);
this.cryptoProvider = cryptoProvider;
this.distributionServiceConfig = distributionServiceConfig;
this.diagnosisKeyBundler = diagnosisKeyBundler;
}

@Override
Expand All @@ -77,6 +80,7 @@ public void prepare(ImmutableStack<Object> indices) {
}

Set<String> dates = this.getIndex(indices).stream()
.filter(diagnosisKeyBundler::numberOfKeysForDateBelowMaximum)
.map(this.getIndexFormatter())
.map(Object::toString)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
*/
public class IO {

/**
* The maximum acceptable file size in bytes.
*/
public static final int MAXIMUM_FILE_SIZE = 16000000;

private IO() {
}

Expand All @@ -51,12 +56,20 @@ public static void makeNewFile(File root, String name) {
}

/**
* Writes bytes into a file.
* Writes bytes into a file. If the resulting file would exceed the specified maximum file size, it is not written but
* removed instead.
*
* @param bytes The content to write
* @param outputFile The file to write the content into.
*/
public static void writeBytesToFile(byte[] bytes, File outputFile) {
if (bytes.length > MAXIMUM_FILE_SIZE) {
String fileName = outputFile.getName();
throw new UncheckedIOException(
new IOException(
"File size of " + bytes.length + " bytes exceeds the maximum file size. Deleting" + fileName));
}

try (FileOutputStream outputFileStream = new FileOutputStream(outputFile)) {
outputFileStream.write(bytes);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DistributionServiceConfig {
private Integer retentionDays;
private Integer expiryPolicyMinutes;
private Integer shiftingPolicyThreshold;
private Integer maximumNumberOfKeysPerBundle;
private String outputFileName;
private Boolean includeIncompleteDays;
private Boolean includeIncompleteHours;
Expand Down Expand Up @@ -81,6 +82,14 @@ public void setShiftingPolicyThreshold(Integer shiftingPolicyThreshold) {
this.shiftingPolicyThreshold = shiftingPolicyThreshold;
}

public Integer getMaximumNumberOfKeysPerBundle() {
return this.maximumNumberOfKeysPerBundle;
}

public void setMaximumNumberOfKeysPerBundle(Integer maximumNumberOfKeysPerBundle) {
this.maximumNumberOfKeysPerBundle = maximumNumberOfKeysPerBundle;
}

public String getOutputFileName() {
return outputFileName;
}
Expand Down
1 change: 1 addition & 0 deletions services/distribution/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
retention-days: 14
expiry-policy-minutes: 120
shifting-policy-threshold: 140
maximum-number-of-keys-per-bundle: 600000
include-incomplete-days: false
include-incomplete-hours: false
paths:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import static app.coronawarn.server.services.distribution.common.Helpers.buildDiagnosisKeys;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import app.coronawarn.server.common.persistence.domain.DiagnosisKey;
import app.coronawarn.server.services.distribution.assembly.component.CryptoProvider;
Expand Down Expand Up @@ -96,6 +98,30 @@ void excludesCurrentDateFromIndex() {
assertThat(index).doesNotContain(LocalDate.of(1970, 1, 5));
}

@Test
void excludesDatesThatExceedTheMaximumNumberOfKeys() {
List<DiagnosisKey> diagnosisKeys = Stream
.of(buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 6, 0), 1),
buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 10, 0), 1))
.flatMap(List::stream)
.collect(Collectors.toList());

DistributionServiceConfig svcConfig = mock(DistributionServiceConfig.class);
when(svcConfig.getExpiryPolicyMinutes()).thenReturn(120);
when(svcConfig.getShiftingPolicyThreshold()).thenReturn(1);
when(svcConfig.getMaximumNumberOfKeysPerBundle()).thenReturn(1);

DiagnosisKeyBundler diagnosisKeyBundler = new ProdDiagnosisKeyBundler(svcConfig);
diagnosisKeyBundler.setDiagnosisKeys(diagnosisKeys, LocalDateTime.of(1970, 1, 5, 0, 0));

DateIndexingDecorator decorator = makeDecoratedDateDirectory(diagnosisKeyBundler);

decorator.prepare(new ImmutableStack<>().push("DE"));

Set<LocalDate> index = decorator.getIndex(new ImmutableStack<>());
assertThat(index).isEmpty();
}

private DateIndexingDecorator makeDecoratedDateDirectory(DiagnosisKeyBundler diagnosisKeyBundler) {
return new DateIndexingDecorator(
new DiagnosisKeysDateDirectory(diagnosisKeyBundler, cryptoProvider, distributionServiceConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import static app.coronawarn.server.services.distribution.common.Helpers.buildDiagnosisKeys;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import app.coronawarn.server.common.persistence.domain.DiagnosisKey;
import app.coronawarn.server.services.distribution.assembly.component.CryptoProvider;
Expand All @@ -34,8 +36,6 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -65,26 +65,27 @@ void setup() {
}

@Test
void excludesEmptyHoursFromIndex() {
List<DiagnosisKey> diagnosisKeys = Stream
.of(buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 4, 0), 5),
buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 5, 0), 0),
buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 6, 0), 5))
.flatMap(List::stream)
.collect(Collectors.toList());
void excludesHoursThatExceedTheMaximumNumberOfKeys() {
List<DiagnosisKey> diagnosisKeys = buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 3, 4, 0), 2);

DistributionServiceConfig svcConfig = mock(DistributionServiceConfig.class);
when(svcConfig.getExpiryPolicyMinutes()).thenReturn(120);
when(svcConfig.getShiftingPolicyThreshold()).thenReturn(1);
when(svcConfig.getMaximumNumberOfKeysPerBundle()).thenReturn(1);

DiagnosisKeyBundler diagnosisKeyBundler = new ProdDiagnosisKeyBundler(svcConfig);
diagnosisKeyBundler.setDiagnosisKeys(diagnosisKeys, LocalDateTime.of(1970, 1, 5, 0, 0));

HourIndexingDecorator decorator = makeDecoratedHourDirectory(diagnosisKeyBundler);
decorator.prepare(new ImmutableStack<>().push("DE").push(LocalDate.of(1970, 1, 3)));

decorator.prepare(new ImmutableStack<>().push("DE").push(LocalDate.of(1970, 1, 3)));
Set<LocalDateTime> index = decorator.getIndex(new ImmutableStack<>().push(LocalDate.of(1970, 1, 3)));

assertThat(index).contains(LocalDateTime.of(1970, 1, 3, 4, 0));
assertThat(index).doesNotContain(LocalDateTime.of(1970, 1, 3, 5, 0));
assertThat(index).contains(LocalDateTime.of(1970, 1, 3, 6, 0));
assertThat(index).isEmpty();
}

@Test
void excludesCurrentHourFromIndex() {
void excludesEmptyHoursFromIndex() {
List<DiagnosisKey> diagnosisKeys = buildDiagnosisKeys(6, LocalDateTime.of(1970, 1, 5, 0, 0), 5);
diagnosisKeyBundler.setDiagnosisKeys(diagnosisKeys, LocalDateTime.of(1970, 1, 5, 1, 0));
HourIndexingDecorator decorator = makeDecoratedHourDirectory(diagnosisKeyBundler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*-
* ---license-start
* Corona-Warn-App
* ---
* Copyright (C) 2020 SAP SE and all other contributors
* ---
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ---license-end
*/

package app.coronawarn.server.services.distribution.assembly.io;


import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.UncheckedIOException;
import org.junit.jupiter.api.Test;

class IOTest {

@Test
void doesNotWriteIfMaximumFileSize() {
File file = mock(File.class);
assertThatExceptionOfType(UncheckedIOException.class)
.isThrownBy(() -> IO.writeBytesToFile(new byte[IO.MAXIMUM_FILE_SIZE + 1], file));
verify(file, never()).getPath();
}
}
1 change: 1 addition & 0 deletions services/distribution/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
retention-days: 14
expiry-policy-minutes: 120
shifting-policy-threshold: 5
maximum-number-of-keys-per-bundle: 600000
include-incomplete-days: false
include-incomplete-hours: false
paths:
Expand Down

0 comments on commit a0f322d

Please sign in to comment.