Skip to content

Commit

Permalink
OK-424: Added retry logic to document saving
Browse files Browse the repository at this point in the history
  • Loading branch information
pitkamak committed Feb 6, 2024
1 parent e5812f1 commit 706bf6c
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
Expand Down Expand Up @@ -228,6 +230,34 @@ public URL getDownloadUrl(final String key, final Duration expirationDuration) {
.url();
}

/**
* Fetches the object attributes of the given document object
*
* @param id Document id
* @param key Document key
* @return Metadata describing the document
*/
public CompletableFuture<ObjectMetadata> getObjectAttributesASync(final String id, final String key) {
return getClient()
.getObjectAttributes(
GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(key)
.objectAttributes(ObjectAttributes.E_TAG, ObjectAttributes.OBJECT_SIZE)
.build())
.thenApply(
attributesResponse ->
new ObjectMetadata(
key,
id,
extractTags(key),
attributesResponse.lastModified(),
attributesResponse.objectSize(),
attributesResponse.eTag()));

}


/**
* Saves document.
*
Expand All @@ -236,6 +266,7 @@ public URL getDownloadUrl(final String key, final Duration expirationDuration) {
* @param expirationDate Date when the document will be removed
* @param tags Collection of tags that the document can be searched with
* @param contentType Document's content type
* @param fetchDetailedAttributes Fetch detailed object attributes for saved document
* @param data Document's data input stream
* @return Metadata describing the document. If an existing document exists with same document id,
* will return a failed future with DocumentIdAlreadyExistsException.
Expand All @@ -246,6 +277,7 @@ public CompletableFuture<ObjectMetadata> saveAsync(
final Date expirationDate,
final Collection<String> tags,
final String contentType,
final boolean fetchDetailedAttributes,
final InputStream data) {
final String id = documentId != null ? documentId : UUID.randomUUID().toString();
final String key = composeKey(tags, id);
Expand All @@ -264,39 +296,32 @@ public CompletableFuture<ObjectMetadata> saveAsync(
} catch (final IOException e) {
throw new RuntimeException("Error reading input data", e);
}
PutObjectRequest putObjectRequest =
PutObjectRequest.builder()
.bucket(bucketName)
.key(key)
.expires(expirationDate.toInstant())
.contentType(contentType)
.metadata(Collections.singletonMap(METADATA_FILENAME, fileName))
.build();

return findAsync(Collections.singleton(documentId))
.thenCompose(
existing -> {
if (existing.isEmpty()) {
return getClient()
.putObject(
PutObjectRequest.builder()
.bucket(bucketName)
.key(key)
.expires(expirationDate.toInstant())
.contentType(contentType)
.metadata(Collections.singletonMap(METADATA_FILENAME, fileName))
.build(),
body)
.thenCompose(
putObjectResponse ->
getClient()
.getObjectAttributes(
GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(key)
.objectAttributes(
ObjectAttributes.E_TAG, ObjectAttributes.OBJECT_SIZE)
.build())
.thenApply(
attributesResponse ->
new ObjectMetadata(
key,
id,
extractTags(key),
attributesResponse.lastModified(),
attributesResponse.objectSize(),
attributesResponse.eTag())));
if (fetchDetailedAttributes) {
return getClient()
.putObject(putObjectRequest, body)
.thenCompose(
putObjectResponse -> getObjectAttributesASync(id, key));
} else {
return getClient().putObject(putObjectRequest, body)
.thenApply(putObjectResponse ->
new ObjectMetadata(
key,
id,
extractTags(key), null, null, null));
}
} else {
throw new CompletionException(
new DocumentIdAlreadyExistsException(
Expand All @@ -305,33 +330,87 @@ public CompletableFuture<ObjectMetadata> saveAsync(
});
}

/**
* Saves document.
*
* @param documentId Document's id, can be left null, then it will be generated as a new UUID
* @param fileName File name, will be saved as part of document's metadata
* @param expirationDate Date when the document will be removed
* @param tags Collection of tags that the document can be searched with
* @param contentType Document's content type
* @param data Document's data input stream
* @return Metadata describing the document
*/
public CompletableFuture<ObjectMetadata> saveAsync(
final String documentId,
final String fileName,
final Date expirationDate,
final Collection<String> tags,
final String contentType,
final InputStream data) {
return saveAsync(documentId, fileName, expirationDate, tags, contentType, true, data);
}

private boolean retryable(RuntimeException exp) {
Throwable checkedException = exp.getCause() != null ? exp.getCause() : exp;
if (checkedException instanceof SdkException) {
SdkException sdkException = (SdkException) checkedException;
return sdkException.retryable();
}
return false;
}

/**
* Saves document.
*
* @param documentId Document's id, can be left null, then it will be generated as a new UUID
* @param fileName File name, will be saved as part of document's metadata
* @param expirationDate Date when the document will be removed
* @param tags Collection of tags that the document can be searched with
* @param contentType Document's content type
* @param data Document's data input stream
* @param retryCount Number of retries performed in case save operation failed, and failure was caused by some
* temporary / recoverable error
* @return Metadata describing the document
*/
public ObjectMetadata save(
final String documentId,
final String fileName,
final Date expirationDate,
final Collection<String> tags,
final String contentType,
final InputStream data) {
return saveAsync(documentId, fileName, expirationDate, tags, contentType, data).join();
final InputStream data,
final Integer retryCount
) {
if (retryCount == 0) {
return saveAsync(documentId, fileName, expirationDate, tags, contentType, data).join();
}

int retryNumber = 0;
ObjectMetadata metadata = null;

while(retryNumber++ < retryCount) {
try {
if (metadata == null) {
metadata = saveAsync(documentId, fileName, expirationDate, tags, contentType, false, data).join();
retryNumber = 1;
}
metadata = getObjectAttributesASync(metadata.documentId, metadata.key).join();
break;
} catch (RuntimeException exp) {
if (!retryable(exp) || retryNumber == retryCount) {
throw exp;
}
}
}
return metadata;
}

/**
* Renames a document to a new file name.
*
* @param key Existing document's key
* @param fileName New file name
*/
public ObjectMetadata save(
final String documentId,
final String fileName,
final Date expirationDate,
final Collection<String> tags,
final String contentType,
final InputStream data
) {
return save(documentId, fileName, expirationDate, tags, contentType, data, 0);
}
/**
* Renames a document to a new file name.
*
* @param key Existing document's key
* @param fileName New file name
*/
public CompletableFuture<Void> renameAsync(final String key, final String fileName) {
LOGGER.info("renameAsync: key={}, fileName={}", key, fileName);
return getClient()
Expand Down
Loading

0 comments on commit 706bf6c

Please sign in to comment.