From d55db912a305da812be3a701a1b963563cb4d0af Mon Sep 17 00:00:00 2001 From: Jean Helou Date: Thu, 18 Jul 2024 22:45:32 +0200 Subject: [PATCH] JAMES-3763 Changes Blobstore api to enable deterministic BlobId generation In the 4 implementations of the BlobStore interface, 2 are delegating interfaces leaving only 2 actual implementations: - PassThroughBlobStore - DeDuplicationBlobStore Before this change, PassThroughBlobStore used to implement save by generating a random BlobId, while DeDuplicationBlobStore implemented it by hashing the content of the data to be saved. This approach broke down when trying to use the BlobStore as a backend for a MailRepository. The mail repository already has a natural key (the MailKey) which allows to compute a deterministic blobId. Since mailets are allowed to mutate the mail, the MailRepository#save method can be called several times with the same mail, expecting to overwrite the mail currently stored. Co-Authored-By: Matthieu Baechler --- .../org/apache/james/blob/api/BlobId.java | 13 +-- .../org/apache/james/blob/api/BlobStore.java | 13 +++ .../org/apache/james/blob/api/HashBlobId.java | 56 +-------- .../james/blob/api/MetricableBlobStore.java | 17 ++- .../api/DeduplicationBlobStoreContract.java | 6 +- .../apache/james/blob/api/HashBlobIdTest.java | 70 ----------- .../org/apache/james/blob/api/TestBlobId.java | 14 --- .../blob/cassandra/cache/CachedBlobStore.java | 79 +++++++++++-- .../CassandraBlobStoreClOneTest.java | 11 +- .../cassandra/CassandraBlobStoreTest.java | 10 +- .../aws/S3DeDuplicationBlobStoreTest.java | 1 + .../aws/S3PrefixAndNamespaceTest.java | 1 + .../deduplication/GenerationAwareBlobId.java | 16 --- .../DeDuplicationBlobStore.scala | 110 +++++++++++++----- .../deduplication/PassThroughBlobStore.scala | 69 +++++++---- 15 files changed, 250 insertions(+), 236 deletions(-) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobId.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobId.java index 6cefc21a06f..220bd206f56 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobId.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobId.java @@ -19,24 +19,13 @@ package org.apache.james.blob.api; -import java.util.UUID; - -import com.google.common.io.ByteSource; - public interface BlobId { interface Factory { BlobId of(String id); - BlobId forPayload(byte[] payload); - - BlobId forPayload(ByteSource payload); - + //FIXME (in later commit) - rename to parse or parseFrom this should only be used for deserialization BlobId from(String id); - - default BlobId randomId() { - return from(UUID.randomUUID().toString()); - } } String asString(); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index 7c4dd0e4d74..22f08167e04 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -25,6 +25,8 @@ import com.google.common.io.ByteSource; +import reactor.util.function.Tuple2; + public interface BlobStore { String DEFAULT_BUCKET_NAME_QUALIFIER = "defaultBucket"; @@ -34,12 +36,23 @@ enum StoragePolicy { HIGH_PERFORMANCE } + @FunctionalInterface + interface BlobIdProvider { + Publisher> apply(InputStream stream); + } + Publisher save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy); Publisher save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy); Publisher save(BucketName bucketName, ByteSource data, StoragePolicy storagePolicy); + Publisher save(BucketName bucketName, byte[] data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy); + + Publisher save(BucketName bucketName, InputStream data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy); + + Publisher save(BucketName bucketName, ByteSource data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy); + default Publisher save(BucketName bucketName, String data, StoragePolicy storagePolicy) { return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy); } diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/HashBlobId.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/HashBlobId.java index 0134802ce87..6d5245cabc7 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/HashBlobId.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/HashBlobId.java @@ -19,51 +19,15 @@ package org.apache.james.blob.api; -import java.io.IOException; -import java.util.Optional; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; -import com.google.common.io.BaseEncoding; -import com.google.common.io.ByteSource; +// FIXME (in later commit) use java record public class HashBlobId implements BlobId { - private static final String HASH_BLOB_ID_ENCODING_TYPE_PROPERTY = "james.blob.id.hash.encoding"; - private static final BaseEncoding HASH_BLOB_ID_ENCODING_DEFAULT = BaseEncoding.base64Url(); - public static class Factory implements BlobId.Factory { - private final BaseEncoding baseEncoding; - - public Factory() { - this.baseEncoding = Optional.ofNullable(System.getProperty(HASH_BLOB_ID_ENCODING_TYPE_PROPERTY)) - .map(Factory::baseEncodingFrom) - .orElse(HASH_BLOB_ID_ENCODING_DEFAULT); - } - - @Override - public HashBlobId forPayload(byte[] payload) { - Preconditions.checkArgument(payload != null); - return base64(Hashing.sha256().hashBytes(payload)); - } - - @Override - public BlobId forPayload(ByteSource payload) { - try { - return base64(payload.hash(Hashing.sha256())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private HashBlobId base64(HashCode hashCode) { - byte[] bytes = hashCode.asBytes(); - return new HashBlobId(baseEncoding.encode(bytes)); - } @Override public HashBlobId of(String id) { @@ -75,24 +39,6 @@ public HashBlobId of(String id) { public HashBlobId from(String id) { return of(id); } - - private static BaseEncoding baseEncodingFrom(String encodingType) { - switch (encodingType) { - case "base16": - case "hex": - return BaseEncoding.base16(); - case "base64": - return BaseEncoding.base64(); - case "base64Url": - return BaseEncoding.base64Url(); - case "base32": - return BaseEncoding.base32(); - case "base32Hex": - return BaseEncoding.base32Hex(); - default: - throw new IllegalArgumentException("Unknown encoding type: " + encodingType); - } - } } private final String id; diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java index 702d0042dd1..e5abf56c28c 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java @@ -62,7 +62,22 @@ public Publisher save(BucketName bucketName, InputStream data, StoragePo @Override public Publisher save(BucketName bucketName, ByteSource data, StoragePolicy storagePolicy) { - return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy)); + return metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy)); + } + + @Override + public Publisher save(BucketName bucketName, byte[] data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + return metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy)); + } + + @Override + public Publisher save(BucketName bucketName, InputStream data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy)); + } + + @Override + public Publisher save(BucketName bucketName, ByteSource data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy)); } @Override diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeduplicationBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeduplicationBlobStoreContract.java index 86c2121971f..a133833a301 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeduplicationBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeduplicationBlobStoreContract.java @@ -77,7 +77,9 @@ default void saveShouldReturnBlobIdOfInputStream(BlobStore.StoragePolicy storage BucketName defaultBucketName = store.getDefaultBucketName(); BlobId blobId = Mono.from(store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY), storagePolicy)).block(); - - assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); + // This fix is ok because it will only affect deduplication, after this change the same content might be assigned a different blobid + // and thus might be duplicated in the store. No data can be lost since no api allows for externally deterministic blob id construction + // before this change. + assertThat(blobId).isEqualTo(blobIdFactory().of("MfemXjFVhqwZi9eYtmKc5JA9CJlHbVdBqfMuLlIbamY=")); } } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/HashBlobIdTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/HashBlobIdTest.java index 428f331ee16..045f92230c8 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/HashBlobIdTest.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/HashBlobIdTest.java @@ -22,15 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import java.nio.charset.StandardCharsets; -import java.util.stream.Stream; - -import org.apache.james.util.ClassLoaderUtils; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import nl.jqno.equalsverifier.EqualsVerifier; @@ -38,11 +30,6 @@ class HashBlobIdTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); - @BeforeEach - void beforeEach() { - System.clearProperty("james.blob.id.hash.encoding"); - } - @Test void shouldRespectBeanContract() { EqualsVerifier.forClass(HashBlobId.class).verify(); @@ -66,61 +53,4 @@ void fromShouldThrowOnEmpty() { assertThatThrownBy(() -> BLOB_ID_FACTORY.from("")) .isInstanceOf(IllegalArgumentException.class); } - - @Test - void forPayloadShouldThrowOnNull() { - assertThatThrownBy(() -> BLOB_ID_FACTORY.forPayload((byte[]) null)) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void forPayloadShouldHashEmptyArray() { - BlobId blobId = BLOB_ID_FACTORY.forPayload(new byte[0]); - - assertThat(blobId.asString()).isEqualTo("47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU="); - } - - @Test - void forPayloadShouldHashArray() { - BlobId blobId = BLOB_ID_FACTORY.forPayload("content".getBytes(StandardCharsets.UTF_8)); - - assertThat(blobId.asString()).isEqualTo("7XACtDnprIRfIjV9giusFERzD722AW0-yUMil7nsn3M="); - } - - - @ParameterizedTest - @MethodSource("encodingTypeAndExpectedHash") - void forPayloadShouldSupportEncodingWhenConfigured(String encoding, String expectedHash) { - System.setProperty("james.blob.id.hash.encoding", encoding); - BlobId blobId = new HashBlobId.Factory().forPayload("content".getBytes(StandardCharsets.UTF_8)); - assertThat(blobId.asString()).isEqualTo(expectedHash); - } - - static Stream encodingTypeAndExpectedHash() { - return Stream.of( - Arguments.of("base16", "ED7002B439E9AC845F22357D822BAC1444730FBDB6016D3EC9432297B9EC9F73"), - Arguments.of("hex", "ED7002B439E9AC845F22357D822BAC1444730FBDB6016D3EC9432297B9EC9F73"), - Arguments.of("base32", "5VYAFNBZ5GWIIXZCGV6YEK5MCRCHGD55WYAW2PWJIMRJPOPMT5ZQ===="), - Arguments.of("base64", "7XACtDnprIRfIjV9giusFERzD722AW0+yUMil7nsn3M="), - Arguments.of("base64Url", "7XACtDnprIRfIjV9giusFERzD722AW0-yUMil7nsn3M="), - Arguments.of("base32", "5VYAFNBZ5GWIIXZCGV6YEK5MCRCHGD55WYAW2PWJIMRJPOPMT5ZQ===="), - Arguments.of("base32Hex", "TLO05D1PT6M88NP26LUO4ATC2H2763TTMO0MQFM98CH9FEFCJTPG====")); - } - - @Test - void newFactoryShouldFailWhenInvalidEncoding() { - System.setProperty("james.blob.id.hash.encoding", "invalid"); - assertThatThrownBy(HashBlobId.Factory::new) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Unknown encoding type: invalid"); - } - - @Test - void forPayloadShouldCalculateDifferentHashesWhenCraftedSha1Collision() throws Exception { - byte[] payload1 = ClassLoaderUtils.getSystemResourceAsByteArray("shattered-1.pdf"); - byte[] payload2 = ClassLoaderUtils.getSystemResourceAsByteArray("shattered-2.pdf"); - BlobId blobId1 = BLOB_ID_FACTORY.forPayload(payload1); - BlobId blobId2 = BLOB_ID_FACTORY.forPayload(payload2); - assertThat(blobId1).isNotEqualTo(blobId2); - } } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/TestBlobId.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/TestBlobId.java index d7619065237..42a5ca6570f 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/TestBlobId.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/TestBlobId.java @@ -21,23 +21,9 @@ import java.util.Objects; -import org.apache.commons.lang3.NotImplementedException; - -import com.google.common.io.ByteSource; - public class TestBlobId implements BlobId { public static class Factory implements BlobId.Factory { - @Override - public BlobId forPayload(byte[] payload) { - throw new NotImplementedException("Use from(String) instead"); - } - - @Override - public BlobId forPayload(ByteSource payload) { - throw new NotImplementedException("Use from(String) instead"); - } - @Override public BlobId of(String id) { return new TestBlobId(id); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java index 2156c99bf35..063c71fad1d 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java @@ -211,7 +211,7 @@ private Mono readBytesInDefaultBucket(BucketName bucketName, BlobId blob } @Override - public Mono save(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) { + public Publisher save(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) { return Mono.from(backend.save(bucketName, bytes, storagePolicy)) .flatMap(blobId -> { if (isAbleToCache(bucketName, bytes, storagePolicy)) { @@ -226,7 +226,13 @@ public Publisher save(BucketName bucketName, InputStream inputStream, St Preconditions.checkNotNull(inputStream, "InputStream must not be null"); if (isAbleToCache(bucketName, storagePolicy)) { - return saveInCache(bucketName, inputStream, storagePolicy); + return saveInCache( + bucketName, + inputStream, + storagePolicy, + readAhead -> + Mono.from(backend.save(bucketName, readAhead.in, storagePolicy)) + ); } return backend.save(bucketName, inputStream, storagePolicy); @@ -237,14 +243,64 @@ public Publisher save(BucketName bucketName, ByteSource byteSource, Stor Preconditions.checkNotNull(byteSource, "ByteSource must not be null"); if (isAbleToCache(bucketName, storagePolicy)) { - return saveInCache(bucketName, byteSource, storagePolicy); + return saveInCache( + byteSource, + () -> Mono.from(backend.save(bucketName, byteSource, storagePolicy)) + ); } return backend.save(bucketName, byteSource, storagePolicy); } - private Mono saveInCache(BucketName bucketName, ByteSource byteSource, StoragePolicy storagePolicy) { - return Mono.from(backend.save(bucketName, byteSource, storagePolicy)) + @Override + public Mono save(BucketName bucketName, byte[] bytes, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + return Mono.from(backend.save(bucketName, bytes, blobIdProvider, storagePolicy)) + .flatMap(blobId -> { + if (isAbleToCache(bucketName, bytes, storagePolicy)) { + return saveInCache(blobId, bytes).thenReturn(blobId); + } + return Mono.just(blobId); + }); + } + + @Override + public Publisher save(BucketName bucketName, InputStream inputStream, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(inputStream, "InputStream must not be null"); + + if (isAbleToCache(bucketName, storagePolicy)) { + return saveInCache( + bucketName, + inputStream, + storagePolicy, + readAhead -> Mono.from(backend.save(bucketName, readAhead.in, blobIdProvider, storagePolicy)) + ); + } + + return backend.save(bucketName, inputStream, blobIdProvider, storagePolicy); + } + + @Override + public Publisher save(BucketName bucketName, ByteSource byteSource, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(byteSource, "ByteSource must not be null"); + + if (isAbleToCache(bucketName, storagePolicy)) { + return saveInCache( + byteSource, + () -> Mono.from(backend.save(bucketName, byteSource, blobIdProvider, storagePolicy)) + ); + } + + return backend.save(bucketName, byteSource, blobIdProvider, storagePolicy); + } + + @FunctionalInterface + private interface ByteSourceBackendSaver { + Mono save(); + } + + + private Mono saveInCache(ByteSource byteSource, ByteSourceBackendSaver backendSaver) { + return Mono.from(backendSaver.save()) .flatMap(Throwing.function(blobId -> ReadAheadInputStream.eager().of(byteSource.openStream()) .length(sizeThresholdInBytes) @@ -274,17 +330,18 @@ public Publisher deleteBucket(BucketName bucketName) { return Mono.from(backend.deleteBucket(bucketName)); } - private Mono saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) { + @FunctionalInterface + private interface InputStreamBackendSaver { + Mono save(ReadAheadInputStream readAheadInputStream); + } + + private Mono saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy, InputStreamBackendSaver backendSaver) { return Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes)) - .flatMap(readAhead -> saveToBackend(bucketName, storagePolicy, readAhead) + .flatMap(readAhead -> backendSaver.save(readAhead) .flatMap(blobId -> putInCacheIfNeeded(bucketName, storagePolicy, readAhead, blobId) .thenReturn(blobId))); } - private Mono saveToBackend(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead) { - return Mono.from(backend.save(bucketName, readAhead.in, storagePolicy)); - } - private Mono putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) { return readAhead.firstBytes .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy)) diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java index a1bc456d170..772529a8cec 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java @@ -39,6 +39,7 @@ import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DeduplicationBlobStoreContract; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.MetricableBlobStore; import org.apache.james.blob.api.ObjectStoreException; @@ -61,9 +62,15 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract { @BeforeEach void setUp(CassandraCluster cassandra) { + this.cassandra = cassandra; + this.testee = createBlobStore(); + } + + @Override + public MetricableBlobStore createBlobStore() { HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); - CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf()); - defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf(), blobIdFactory)); + CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, this.cassandra.getConf()); + defaultBucketDAO = spy(new CassandraDefaultBucketDAO(this.cassandra.getConf(), blobIdFactory)); CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder() .blobPartSize(CHUNK_SIZE) .optimisticConsistencyLevel(true) diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java index eaa6883fb15..2ca4489740b 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java @@ -44,9 +44,15 @@ public class CassandraBlobStoreTest implements CassandraBlobStoreContract, Dedup @BeforeEach void setUp(CassandraCluster cassandra) { + this.cassandra = cassandra; + this.testee = createBlobStore(); + } + + @Override + public MetricableBlobStore createBlobStore() { HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); - CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf()); - defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf(), blobIdFactory)); + CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, this.cassandra.getConf()); + defaultBucketDAO = spy(new CassandraDefaultBucketDAO(this.cassandra.getConf(), blobIdFactory)); CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder() .blobPartSize(CHUNK_SIZE) .build(); diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java index 9f823a836c8..1d4c5d13390 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java @@ -22,6 +22,7 @@ import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.DeduplicationBlobStoreContract; import org.apache.james.blob.api.HashBlobId; import org.apache.james.metrics.api.NoopGaugeRegistry; import org.apache.james.metrics.tests.RecordingMetricFactory; diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java index dbfaf0425b3..0928b64e5d6 100644 --- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java +++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java @@ -23,6 +23,7 @@ import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BlobStoreContract; import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DeduplicationBlobStoreContract; import org.apache.james.blob.api.HashBlobId; import org.apache.james.metrics.api.NoopGaugeRegistry; import org.apache.james.metrics.tests.RecordingMetricFactory; diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/GenerationAwareBlobId.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/GenerationAwareBlobId.java index 1bf60419c8f..0b435442eb1 100644 --- a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/GenerationAwareBlobId.java +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/GenerationAwareBlobId.java @@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.io.ByteSource; public class GenerationAwareBlobId implements BlobId { @@ -122,16 +121,6 @@ public Factory(Clock clock, BlobId.Factory delegate, Configuration configuration this.configuration = configuration; } - @Override - public GenerationAwareBlobId forPayload(byte[] payload) { - return decorate(delegate.forPayload(payload)); - } - - @Override - public GenerationAwareBlobId forPayload(ByteSource payload) { - return decorate(delegate.forPayload(payload)); - } - @Override public GenerationAwareBlobId of(String id) { return decorate(delegate.of(id)); @@ -154,11 +143,6 @@ public GenerationAwareBlobId from(String id) { return new GenerationAwareBlobId(generation, family, wrapped); } - @Override - public GenerationAwareBlobId randomId() { - return decorate(delegate.randomId()); - } - private GenerationAwareBlobId decorateWithoutGeneration(String id) { return new GenerationAwareBlobId(NO_GENERATION, NO_FAMILY, delegate.from(id)); } diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala index d4200a1bddb..3ca823f45b5 100644 --- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala +++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala @@ -19,76 +19,124 @@ package org.apache.james.server.blob.deduplication -import java.io.InputStream -import java.util.concurrent.Callable - import com.google.common.base.Preconditions -import com.google.common.hash.{Hashing, HashingInputStream} -import com.google.common.io.{ByteSource, FileBackedOutputStream} +import com.google.common.hash.{HashCode, Hashing, HashingInputStream} +import com.google.common.io.{BaseEncoding, ByteSource, FileBackedOutputStream} import jakarta.inject.{Inject, Named} import org.apache.commons.io.IOUtils +import org.apache.james.blob.api.BlobStore.BlobIdProvider import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName} import org.reactivestreams.Publisher import reactor.core.publisher.{Flux, Mono} -import reactor.core.scala.publisher.SMono +import reactor.core.scala.publisher.{SMono, tupleTwo2ScalaTuple2} import reactor.core.scheduler.Schedulers import reactor.util.function.{Tuple2, Tuples} +import java.io.{ByteArrayInputStream, InputStream} +import java.util.concurrent.Callable import scala.compat.java8.FunctionConverters._ object DeDuplicationBlobStore { val LAZY_RESOURCE_CLEANUP = false val FILE_THRESHOLD = 10000 + + private def baseEncodingFrom(encodingType: String): BaseEncoding = encodingType match { + case "base16" => + BaseEncoding.base16 + case "hex" => + BaseEncoding.base16 + case "base64" => + BaseEncoding.base64 + case "base64Url" => + BaseEncoding.base64Url + case "base32" => + BaseEncoding.base32 + case "base32Hex" => + BaseEncoding.base32Hex + case _ => + throw new IllegalArgumentException("Unknown encoding type: " + encodingType) + } } class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) defaultBucketName: BucketName, blobIdFactory: BlobId.Factory) extends BlobStore { - override def save(bucketName: BucketName, data: Array[Byte], storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { - Preconditions.checkNotNull(bucketName) - Preconditions.checkNotNull(data) + private val HASH_BLOB_ID_ENCODING_TYPE_PROPERTY = "james.blob.id.hash.encoding" + private val HASH_BLOB_ID_ENCODING_DEFAULT = BaseEncoding.base64Url + private val baseEncoding = Option(System.getProperty(HASH_BLOB_ID_ENCODING_TYPE_PROPERTY)).map(DeDuplicationBlobStore.baseEncodingFrom).getOrElse(HASH_BLOB_ID_ENCODING_DEFAULT) - val blobId = blobIdFactory.forPayload(data) + override def save(bucketName: BucketName, data: Array[Byte], storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + save(bucketName, data, withBlobId, storagePolicy) + } - SMono(blobStoreDAO.save(bucketName, blobId, data)) - .`then`(SMono.just(blobId)) + override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + save(bucketName, data, withBlobId, storagePolicy) } override def save(bucketName: BucketName, data: ByteSource, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + save(bucketName, data, withBlobId, storagePolicy) + } + + override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { Preconditions.checkNotNull(bucketName) Preconditions.checkNotNull(data) - - SMono.fromCallable(() => blobIdFactory.forPayload(data)) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data)) - .`then`(SMono.just(blobId))) + save(bucketName, new ByteArrayInputStream(data), blobIdProvider, storagePolicy) } - override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { Preconditions.checkNotNull(bucketName) Preconditions.checkNotNull(data) + + SMono.fromCallable(() => data.openStream()) + .using( + use = stream => SMono(blobIdProvider.apply(stream)) + .subscribeOn(Schedulers.boundedElastic()) + .map(tupleTwo2ScalaTuple2) + .flatMap { case (blobId, inputStream) => + SMono(blobStoreDAO.save(bucketName, blobId, inputStream)) + .`then`(SMono.just(blobId)) + })( + release = _.close()) + } + + private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId, InputStream]] = { val hashingInputStream = new HashingInputStream(Hashing.sha256, data) - val sourceSupplier: FileBackedOutputStream => Mono[BlobId] = (fileBackedOutputStream: FileBackedOutputStream) => saveAndGenerateBlobId(bucketName, hashingInputStream, fileBackedOutputStream).asJava() val ressourceSupplier: Callable[FileBackedOutputStream] = () => new FileBackedOutputStream(DeDuplicationBlobStore.FILE_THRESHOLD) - - Mono.using( + val sourceSupplier: FileBackedOutputStream => Mono[(BlobId, InputStream)] = + (fileBackedOutputStream: FileBackedOutputStream) => + SMono.fromCallable(() => { + IOUtils.copy(hashingInputStream, fileBackedOutputStream) + (blobIdFactory.of(base64(hashingInputStream.hash)), fileBackedOutputStream.asByteSource.openStream()) + }).asJava() + + Mono.using[(BlobId, InputStream),FileBackedOutputStream]( ressourceSupplier, sourceSupplier.asJava, ((fileBackedOutputStream: FileBackedOutputStream) => fileBackedOutputStream.reset()).asJava, - DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP) - .subscribeOn(Schedulers.boundedElastic()) + DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP + ) .subscribeOn(Schedulers.boundedElastic()) + .map{ case (blobId, data) => Tuples.of(blobId, data)} } - private def saveAndGenerateBlobId(bucketName: BucketName, hashingInputStream: HashingInputStream, fileBackedOutputStream: FileBackedOutputStream): SMono[BlobId] = - SMono.fromCallable(() => { - IOUtils.copy(hashingInputStream, fileBackedOutputStream) - Tuples.of(blobIdFactory.from(hashingInputStream.hash.toString), fileBackedOutputStream.asByteSource) - }).subscribeOn(Schedulers.boundedElastic()) - .flatMap((tuple: Tuple2[BlobId, ByteSource]) => - SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2)) - .`then`(SMono.just(tuple.getT1))) + private def base64(hashCode: HashCode) = { + val bytes = hashCode.asBytes + baseEncoding.encode(bytes) + } + + override def save(bucketName: BucketName, + data: InputStream, + blobIdProvider: BlobIdProvider, + storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + Preconditions.checkNotNull(bucketName) + Preconditions.checkNotNull(data) + Mono.from(blobIdProvider(data)).subscribeOn(Schedulers.boundedElastic()) + .flatMap { tuple => + SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2)) + .`then`(SMono.just(tuple.getT1)).asJava() + } + } override def readBytes(bucketName: BucketName, blobId: BlobId): Publisher[Array[Byte]] = { Preconditions.checkNotNull(bucketName) diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala index 48fbea9531b..ec451d75dc2 100644 --- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala +++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala @@ -1,4 +1,4 @@ -/***************************************************************** +/** *************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,62 +6,91 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * + * * * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - *****************************************************************/ + * *************************************************************** */ package org.apache.james.server.blob.deduplication -import java.io.InputStream - import com.google.common.base.Preconditions import com.google.common.io.ByteSource import jakarta.inject.{Inject, Named} +import org.apache.james.blob.api.BlobStore.BlobIdProvider import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName} import org.reactivestreams.Publisher import reactor.core.publisher.Flux -import reactor.core.scala.publisher.SMono +import reactor.core.scala.publisher.{SMono, tupleTwo2ScalaTuple2} +import reactor.core.scheduler.Schedulers +import reactor.util.function.{Tuple2, Tuples} +import java.io.{ByteArrayInputStream, InputStream} +import java.util.UUID class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO, @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) defaultBucketName: BucketName, blobIdFactory: BlobId.Factory) extends BlobStore { override def save(bucketName: BucketName, data: Array[Byte], storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { - Preconditions.checkNotNull(bucketName) - Preconditions.checkNotNull(data) + save(bucketName, data, withBlobId, storagePolicy) + } - val blobId = blobIdFactory.randomId() + override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + save(bucketName, data, withBlobId, storagePolicy) + } - SMono(blobStoreDAO.save(bucketName, blobId, data)) - .`then`(SMono.just(blobId)) + override def save(bucketName: BucketName, data: ByteSource, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + save(bucketName, data, withBlobId, storagePolicy) } - override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + Preconditions.checkNotNull(bucketName) + Preconditions.checkNotNull(data) + save(bucketName, new ByteArrayInputStream(data), blobIdProvider, storagePolicy) + } + + override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { Preconditions.checkNotNull(bucketName) Preconditions.checkNotNull(data) - val blobId = blobIdFactory.randomId() - SMono(blobStoreDAO.save(bucketName, blobId, data)) - .`then`(SMono.just(blobId)) + SMono.fromCallable(() => data.openStream()) + .using( + use = stream => SMono(blobIdProvider.apply(stream)) + .subscribeOn(Schedulers.boundedElastic()) + .map(tupleTwo2ScalaTuple2) + .flatMap { case (blobId, inputStream) => + SMono(blobStoreDAO.save(bucketName, blobId, inputStream)) + .`then`(SMono.just(blobId)) + })( + release = _.close()) + } - override def save(bucketName: BucketName, data: ByteSource, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { + override def save( + bucketName: BucketName, + data: InputStream, + blobIdProvider: BlobIdProvider, + storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = { Preconditions.checkNotNull(bucketName) Preconditions.checkNotNull(data) - val blobId = blobIdFactory.randomId() - SMono(blobStoreDAO.save(bucketName, blobId, data)) - .`then`(SMono.just(blobId)) + SMono(blobIdProvider(data)) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap { tuple => + SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2)) + .`then`(SMono.just(tuple.getT1)) + } } + private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId, InputStream]] = + SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data)) + override def readBytes(bucketName: BucketName, blobId: BlobId): Publisher[Array[Byte]] = { Preconditions.checkNotNull(bucketName)