From 706bf6cc06dbe6f18924da42387a110d4f4d3d37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matti=20Pitk=C3=A4m=C3=A4ki?= Date: Tue, 6 Feb 2024 12:02:20 +0200 Subject: [PATCH] OK-424: Added retry logic to document saving --- .../dokumenttipalvelu/Dokumenttipalvelu.java | 175 +++++++++++++----- .../SiirtotiedostoPalvelu.java | 154 +++++++-------- .../DokumenttipalveluTest.java | 116 +++++++++++- 3 files changed, 316 insertions(+), 129 deletions(-) diff --git a/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/Dokumenttipalvelu.java b/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/Dokumenttipalvelu.java index 89ab139..0305bbc 100644 --- a/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/Dokumenttipalvelu.java +++ b/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/Dokumenttipalvelu.java @@ -16,7 +16,9 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; @@ -228,6 +230,34 @@ public URL getDownloadUrl(final String key, final Duration expirationDuration) { .url(); } + /** + * Fetches the object attributes of the given document object + * + * @param id Document id + * @param key Document key + * @return Metadata describing the document + */ + public CompletableFuture getObjectAttributesASync(final String id, final String key) { + return getClient() + .getObjectAttributes( + GetObjectAttributesRequest.builder() + .bucket(bucketName) + .key(key) + .objectAttributes(ObjectAttributes.E_TAG, ObjectAttributes.OBJECT_SIZE) + .build()) + .thenApply( + attributesResponse -> + new ObjectMetadata( + key, + id, + extractTags(key), + attributesResponse.lastModified(), + attributesResponse.objectSize(), + attributesResponse.eTag())); + + } + + /** * Saves document. * @@ -236,6 +266,7 @@ public URL getDownloadUrl(final String key, final Duration expirationDuration) { * @param expirationDate Date when the document will be removed * @param tags Collection of tags that the document can be searched with * @param contentType Document's content type + * @param fetchDetailedAttributes Fetch detailed object attributes for saved document * @param data Document's data input stream * @return Metadata describing the document. If an existing document exists with same document id, * will return a failed future with DocumentIdAlreadyExistsException. @@ -246,6 +277,7 @@ public CompletableFuture saveAsync( final Date expirationDate, final Collection tags, final String contentType, + final boolean fetchDetailedAttributes, final InputStream data) { final String id = documentId != null ? documentId : UUID.randomUUID().toString(); final String key = composeKey(tags, id); @@ -264,39 +296,32 @@ public CompletableFuture saveAsync( } catch (final IOException e) { throw new RuntimeException("Error reading input data", e); } + PutObjectRequest putObjectRequest = + PutObjectRequest.builder() + .bucket(bucketName) + .key(key) + .expires(expirationDate.toInstant()) + .contentType(contentType) + .metadata(Collections.singletonMap(METADATA_FILENAME, fileName)) + .build(); + return findAsync(Collections.singleton(documentId)) .thenCompose( existing -> { if (existing.isEmpty()) { - return getClient() - .putObject( - PutObjectRequest.builder() - .bucket(bucketName) - .key(key) - .expires(expirationDate.toInstant()) - .contentType(contentType) - .metadata(Collections.singletonMap(METADATA_FILENAME, fileName)) - .build(), - body) - .thenCompose( - putObjectResponse -> - getClient() - .getObjectAttributes( - GetObjectAttributesRequest.builder() - .bucket(bucketName) - .key(key) - .objectAttributes( - ObjectAttributes.E_TAG, ObjectAttributes.OBJECT_SIZE) - .build()) - .thenApply( - attributesResponse -> - new ObjectMetadata( - key, - id, - extractTags(key), - attributesResponse.lastModified(), - attributesResponse.objectSize(), - attributesResponse.eTag()))); + if (fetchDetailedAttributes) { + return getClient() + .putObject(putObjectRequest, body) + .thenCompose( + putObjectResponse -> getObjectAttributesASync(id, key)); + } else { + return getClient().putObject(putObjectRequest, body) + .thenApply(putObjectResponse -> + new ObjectMetadata( + key, + id, + extractTags(key), null, null, null)); + } } else { throw new CompletionException( new DocumentIdAlreadyExistsException( @@ -305,33 +330,87 @@ public CompletableFuture saveAsync( }); } - /** - * Saves document. - * - * @param documentId Document's id, can be left null, then it will be generated as a new UUID - * @param fileName File name, will be saved as part of document's metadata - * @param expirationDate Date when the document will be removed - * @param tags Collection of tags that the document can be searched with - * @param contentType Document's content type - * @param data Document's data input stream - * @return Metadata describing the document - */ + public CompletableFuture saveAsync( + final String documentId, + final String fileName, + final Date expirationDate, + final Collection tags, + final String contentType, + final InputStream data) { + return saveAsync(documentId, fileName, expirationDate, tags, contentType, true, data); + } + + private boolean retryable(RuntimeException exp) { + Throwable checkedException = exp.getCause() != null ? exp.getCause() : exp; + if (checkedException instanceof SdkException) { + SdkException sdkException = (SdkException) checkedException; + return sdkException.retryable(); + } + return false; + } + + /** + * Saves document. + * + * @param documentId Document's id, can be left null, then it will be generated as a new UUID + * @param fileName File name, will be saved as part of document's metadata + * @param expirationDate Date when the document will be removed + * @param tags Collection of tags that the document can be searched with + * @param contentType Document's content type + * @param data Document's data input stream + * @param retryCount Number of retries performed in case save operation failed, and failure was caused by some + * temporary / recoverable error + * @return Metadata describing the document + */ public ObjectMetadata save( final String documentId, final String fileName, final Date expirationDate, final Collection tags, final String contentType, - final InputStream data) { - return saveAsync(documentId, fileName, expirationDate, tags, contentType, data).join(); + final InputStream data, + final Integer retryCount + ) { + if (retryCount == 0) { + return saveAsync(documentId, fileName, expirationDate, tags, contentType, data).join(); + } + + int retryNumber = 0; + ObjectMetadata metadata = null; + + while(retryNumber++ < retryCount) { + try { + if (metadata == null) { + metadata = saveAsync(documentId, fileName, expirationDate, tags, contentType, false, data).join(); + retryNumber = 1; + } + metadata = getObjectAttributesASync(metadata.documentId, metadata.key).join(); + break; + } catch (RuntimeException exp) { + if (!retryable(exp) || retryNumber == retryCount) { + throw exp; + } + } + } + return metadata; } - /** - * Renames a document to a new file name. - * - * @param key Existing document's key - * @param fileName New file name - */ + public ObjectMetadata save( + final String documentId, + final String fileName, + final Date expirationDate, + final Collection tags, + final String contentType, + final InputStream data + ) { + return save(documentId, fileName, expirationDate, tags, contentType, data, 0); + } + /** + * Renames a document to a new file name. + * + * @param key Existing document's key + * @param fileName New file name + */ public CompletableFuture renameAsync(final String key, final String fileName) { LOGGER.info("renameAsync: key={}, fileName={}", key, fileName); return getClient() diff --git a/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/SiirtotiedostoPalvelu.java b/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/SiirtotiedostoPalvelu.java index 10007df..47ea748 100644 --- a/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/SiirtotiedostoPalvelu.java +++ b/src/main/java/fi/vm/sade/valinta/dokumenttipalvelu/SiirtotiedostoPalvelu.java @@ -1,8 +1,6 @@ package fi.vm.sade.valinta.dokumenttipalvelu; import fi.vm.sade.valinta.dokumenttipalvelu.dto.ObjectMetadata; -import software.amazon.awssdk.utils.StringUtils; - import java.io.InputStream; import java.time.Duration; import java.time.Instant; @@ -15,87 +13,93 @@ import java.util.Date; import java.util.Optional; import java.util.UUID; - +import software.amazon.awssdk.utils.StringUtils; public class SiirtotiedostoPalvelu extends Dokumenttipalvelu { - final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() - .appendPattern("[yyyy-MM-dd'T'HH:mm:ss]") - .appendPattern("[dd.MM.yyyy HH:mm]") - .appendPattern("[yyyy-MM-dd HH:mm:ss.SSSSSS XXX]") - .toFormatter(); + final DateTimeFormatter dateTimeFormatter = + new DateTimeFormatterBuilder() + .appendPattern("[yyyy-MM-dd'T'HH:mm:ss]") + .appendPattern("[dd.MM.yyyy HH:mm]") + .appendPattern("[yyyy-MM-dd HH:mm:ss.SSSSSS XXX]") + .toFormatter(); - public SiirtotiedostoPalvelu(String awsRegion, String bucketName) { - super(awsRegion, bucketName); - } + public SiirtotiedostoPalvelu(String awsRegion, String bucketName) { + super(awsRegion, bucketName); + } - private String timeRangeString( - final Optional timeRangeStart, final Optional timeRangeEnd) { - final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy_HH.mm.ss"); - if (timeRangeStart.isPresent() && timeRangeEnd.isPresent()) { - return String.format( - "%s-%s_", formatter.format(timeRangeStart.get()), formatter.format(timeRangeEnd.get())); - } - if (timeRangeStart.isPresent()) { - return String.format("%s-_", formatter.format(timeRangeStart.get())); - } - if (timeRangeEnd.isPresent()) { - return String.format("-%s_", formatter.format(timeRangeEnd.get())); - } - return ""; + private String timeRangeString( + final Optional timeRangeStart, final Optional timeRangeEnd) { + final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy_HH.mm.ss"); + if (timeRangeStart.isPresent() && timeRangeEnd.isPresent()) { + return String.format( + "%s-%s_", formatter.format(timeRangeStart.get()), formatter.format(timeRangeEnd.get())); } - - private Collection tags(String sourceSystem, String subCategory) { - return StringUtils.isEmpty(subCategory) - ? Arrays.asList(sourceSystem) - : Arrays.asList(sourceSystem, subCategory); + if (timeRangeStart.isPresent()) { + return String.format("%s-_", formatter.format(timeRangeStart.get())); } - - @Override - String composeKey(final Collection tags, final String documentId) { - return String.format("%s/%s", tags.stream().findFirst().orElse("unknown"), documentId); + if (timeRangeEnd.isPresent()) { + return String.format("-%s_", formatter.format(timeRangeEnd.get())); } + return ""; + } + + private Collection tags(String sourceSystem, String subCategory) { + return StringUtils.isEmpty(subCategory) + ? Arrays.asList(sourceSystem) + : Arrays.asList(sourceSystem, subCategory); + } - /** - * Saves siirtotiedosto document. - * - * @param timeRangeStart Startpoint of the data timerange contained by this siirtotiedosto, - * allowed formats: '2024-01-24T10:15:30', '24.01.2024 10.15.30'. - * Value is optional, can be null or empty. - * @param timeRangeEnd Endpoint of the data timerange contained by this siirtotiedosto - * allowed formats: '2024-01-24T10:15:30', '24.01.2024 10.15.30'. - * Value is optional, can be null or empty. - * @param sourceSystem Source system of this siirtotiedosto, e.g. kouta, ataru, etc. Value is mandatory. - * @param subCategory More detailed description of the contents - * Value is optional, can be null or empty. - * @param data Document's data input stream. Value is mandatory. - * @return Metadata describing the document. - */ - public ObjectMetadata saveSiirtotiedosto( - final String timeRangeStart, - final String timeRangeEnd, - final String sourceSystem, - final String subCategory, - final InputStream data) { + @Override + String composeKey(final Collection tags, final String documentId) { + return String.format("%s/%s", tags.stream().findFirst().orElse("unknown"), documentId); + } - if (StringUtils.isEmpty(sourceSystem)) { - throw new IllegalArgumentException("Source system cannot be empty"); - } - if (data == null) { - throw new IllegalArgumentException("Data cannot be null"); - } - Optional timeRangeStartVal = StringUtils.isEmpty(timeRangeStart) ? - Optional.empty() : Optional.of(LocalDateTime.parse(timeRangeStart, dateTimeFormatter)); - Optional timeRangeEndVal = StringUtils.isEmpty(timeRangeEnd) ? - Optional.empty() : Optional.of(LocalDateTime.parse(timeRangeEnd, dateTimeFormatter)); - final String categoryStr = - StringUtils.isEmpty(subCategory) ? "" : String.format("%s_%s_", sourceSystem, subCategory); - final String documentId = - String.format( - "%s%s%s.json", - categoryStr, timeRangeString(timeRangeStartVal, timeRangeEndVal), UUID.randomUUID()); - final Collection tags = tags(sourceSystem, subCategory); - // TODO Poista expirationDate siinä vaiheessa kun se poistuu save -metodista - final Date expirationDate = Date.from(Instant.now().plus(Duration.of(5, ChronoUnit.DAYS))); - return save(documentId, documentId, expirationDate, tags, "json", data); + /** + * Saves siirtotiedosto document. + * + * @param timeRangeStart Startpoint of the data timerange contained by this siirtotiedosto, + * allowed formats: '2024-01-24T10:15:30', '24.01.2024 10.15.30'. Value is optional, can be + * null or empty. + * @param timeRangeEnd Endpoint of the data timerange contained by this siirtotiedosto allowed + * formats: '2024-01-24T10:15:30', '24.01.2024 10.15.30'. Value is optional, can be null or + * empty. + * @param sourceSystem Source system of this siirtotiedosto, e.g. kouta, ataru, etc. Value is + * mandatory. + * @param subCategory More detailed description of the contents Value is optional, can be null or + * empty. + * @param data Document's data input stream. Value is mandatory. + * @return Metadata describing the document. + */ + public ObjectMetadata saveSiirtotiedosto( + final String timeRangeStart, + final String timeRangeEnd, + final String sourceSystem, + final String subCategory, + final InputStream data) { + + if (StringUtils.isEmpty(sourceSystem)) { + throw new IllegalArgumentException("Source system cannot be empty"); + } + if (data == null) { + throw new IllegalArgumentException("Data cannot be null"); } + Optional timeRangeStartVal = + StringUtils.isEmpty(timeRangeStart) + ? Optional.empty() + : Optional.of(LocalDateTime.parse(timeRangeStart, dateTimeFormatter)); + Optional timeRangeEndVal = + StringUtils.isEmpty(timeRangeEnd) + ? Optional.empty() + : Optional.of(LocalDateTime.parse(timeRangeEnd, dateTimeFormatter)); + final String categoryStr = + StringUtils.isEmpty(subCategory) ? "" : String.format("%s_%s_", sourceSystem, subCategory); + final String documentId = + String.format( + "%s%s%s.json", + categoryStr, timeRangeString(timeRangeStartVal, timeRangeEndVal), UUID.randomUUID()); + final Collection tags = tags(sourceSystem, subCategory); + // TODO Poista expirationDate siinä vaiheessa kun se poistuu save -metodista + final Date expirationDate = Date.from(Instant.now().plus(Duration.of(5, ChronoUnit.DAYS))); + return save(documentId, documentId, expirationDate, tags, "json", data); + } } diff --git a/src/test/java/fi/vm/sade/valinta/dokumenttipalvelu/DokumenttipalveluTest.java b/src/test/java/fi/vm/sade/valinta/dokumenttipalvelu/DokumenttipalveluTest.java index e788561..09d80e7 100644 --- a/src/test/java/fi/vm/sade/valinta/dokumenttipalvelu/DokumenttipalveluTest.java +++ b/src/test/java/fi/vm/sade/valinta/dokumenttipalvelu/DokumenttipalveluTest.java @@ -10,11 +10,15 @@ import java.nio.file.Paths; import java.time.Instant; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.OngoingStubbing; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.presigner.S3Presigner; @@ -43,6 +47,17 @@ public S3Presigner getPresigner() { private final Dokumenttipalvelu dokumenttipalvelu = new MockDokumenttipalvelu("eu-west-1", bucketName); + class RetryableException extends SdkException { + + protected RetryableException(Builder builder) { + super(builder); + } + + @Override + public boolean retryable() { + return true; + } + } @BeforeEach public void beforeEach() { reset(client); @@ -146,14 +161,30 @@ public void testConvert() { .build())); } + private void mockSequenceForSave(int nbrOfRecoverablePutFailures, int nbrOfRecoverableGetAttributesFailures) { + RetryableException exception = new RetryableException(SdkException.builder()); + + OngoingStubbing> putCall = + when(client.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))); + for (int retryCount = 0; retryCount < nbrOfRecoverablePutFailures; retryCount++) { + putCall = putCall.thenThrow(exception); + } + putCall.thenReturn(completedFuture(PutObjectResponse.builder().build())); + + OngoingStubbing> getAttributesCall = + when(client.getObjectAttributes(any(GetObjectAttributesRequest.class))); + for (int retryCount = 0; retryCount < nbrOfRecoverableGetAttributesFailures; retryCount++) { + getAttributesCall = getAttributesCall.thenThrow(exception); + } + getAttributesCall.thenReturn(completedFuture(GetObjectAttributesResponse.builder().build())); + + when(client.listObjectsV2(any(ListObjectsV2Request.class))) + .thenReturn(completedFuture(ListObjectsV2Response.builder().isTruncated(false).build())); + } + @Test public void testSaveGeneratesIdWhenNotProvided() throws IOException { - when(client.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) - .thenReturn(completedFuture(PutObjectResponse.builder().build())); - when(client.getObjectAttributes(any(GetObjectAttributesRequest.class))) - .thenReturn(completedFuture(GetObjectAttributesResponse.builder().build())); - when(client.listObjectsV2(any(ListObjectsV2Request.class))) - .thenReturn(completedFuture(ListObjectsV2Response.builder().isTruncated(false).build())); + mockSequenceForSave(0, 0); final ObjectMetadata metadata = dokumenttipalvelu.save( null, @@ -164,4 +195,77 @@ public void testSaveGeneratesIdWhenNotProvided() throws IOException { Files.newInputStream(Paths.get("src/test/resources/testfile.txt"))); assertNotNull(UUID.fromString(metadata.documentId)); } + + @Test + public void testSaveSucceedsAfterRetry() throws IOException { + mockSequenceForSave(2, 2); + final ObjectMetadata metadata = + dokumenttipalvelu.save( + UUID.randomUUID().toString(), + "testfile.txt", + new Date(), + Collections.emptySet(), + "text/plain", + Files.newInputStream(Paths.get("src/test/resources/testfile.txt")), + 3 + ); + assertNotNull(metadata); + } + + @Test + public void testSaveFailsAfterPutObjectRetries() throws IOException { + mockSequenceForSave(4, 2); + try { + dokumenttipalvelu.save( + UUID.randomUUID().toString(), + "testfile.txt", + new Date(), + Collections.emptySet(), + "text/plain", + Files.newInputStream(Paths.get("src/test/resources/testfile.txt")), + 3 + ); + fail("Expected exception not thrown"); + } catch(CompletionException | RetryableException ignored) {} + } + + @Test + public void testSaveFailsAfterGetAttributesRetries() throws IOException { + mockSequenceForSave(0, 4); + try { + dokumenttipalvelu.save( + UUID.randomUUID().toString(), + "testfile.txt", + new Date(), + Collections.emptySet(), + "text/plain", + Files.newInputStream(Paths.get("src/test/resources/testfile.txt")), + 3 + ); + fail("Expected exception not thrown"); + } catch(CompletionException | RetryableException ignored) {} + } + + @Test + public void testSaveFailsWhenNonRetryableError() throws IOException { + when(client.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + .thenReturn(completedFuture(PutObjectResponse.builder().build())); + when(client.getObjectAttributes(any(GetObjectAttributesRequest.class))) + .thenThrow(new RuntimeException()); + when(client.listObjectsV2(any(ListObjectsV2Request.class))) + .thenReturn(completedFuture(ListObjectsV2Response.builder().isTruncated(false).build())); + try { + dokumenttipalvelu.save( + UUID.randomUUID().toString(), + "testfile.txt", + new Date(), + Collections.emptySet(), + "text/plain", + Files.newInputStream(Paths.get("src/test/resources/testfile.txt")), + 3 + ); + fail("Expected exception not thrown"); + } catch(RuntimeException ignored) { + } + } }