Skip to content

Commit

Permalink
JAMES-3763 Changes Blobstore api to enable deterministic BlobId gener…
Browse files Browse the repository at this point in the history
…ation

In the 4 implementations of the BlobStore interface, 2 are delegating
interfaces leaving only 2 actual implementations:
- PassThroughBlobStore
- DeDuplicationBlobStore

Before this change, PassThroughBlobStore used to implement save by generating
a random BlobId, while DeDuplicationBlobStore implemented it by hashing the
content of the data to be saved.

This approach broke down when trying to use the BlobStore as a backend for a
MailRepository. The mail repository already has a natural key (the MailKey)
which allows to compute a deterministic blobId.
Since mailets are allowed to mutate the mail, the MailRepository#save method
can be called several times with the same mail, expecting to overwrite the
mail currently stored.

Co-Authored-By: Matthieu Baechler <[email protected]>
  • Loading branch information
jeantil and mbaechler committed Jul 31, 2024
1 parent 39842a8 commit d55db91
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,13 @@

package org.apache.james.blob.api;

import java.util.UUID;

import com.google.common.io.ByteSource;

public interface BlobId {

interface Factory {
BlobId of(String id);

BlobId forPayload(byte[] payload);

BlobId forPayload(ByteSource payload);

//FIXME (in later commit) - rename to parse or parseFrom this should only be used for deserialization
BlobId from(String id);

default BlobId randomId() {
return from(UUID.randomUUID().toString());
}
}

String asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import com.google.common.io.ByteSource;

import reactor.util.function.Tuple2;

public interface BlobStore {
String DEFAULT_BUCKET_NAME_QUALIFIER = "defaultBucket";

Expand All @@ -34,12 +36,23 @@ enum StoragePolicy {
HIGH_PERFORMANCE
}

@FunctionalInterface
interface BlobIdProvider {
Publisher<Tuple2<BlobId, InputStream>> apply(InputStream stream);
}

Publisher<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);

Publisher<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);

Publisher<BlobId> save(BucketName bucketName, ByteSource data, StoragePolicy storagePolicy);

Publisher<BlobId> save(BucketName bucketName, byte[] data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);

Publisher<BlobId> save(BucketName bucketName, InputStream data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);

Publisher<BlobId> save(BucketName bucketName, ByteSource data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);

default Publisher<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,15 @@

package org.apache.james.blob.api;

import java.io.IOException;
import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteSource;

// FIXME (in later commit) use java record
public class HashBlobId implements BlobId {
private static final String HASH_BLOB_ID_ENCODING_TYPE_PROPERTY = "james.blob.id.hash.encoding";
private static final BaseEncoding HASH_BLOB_ID_ENCODING_DEFAULT = BaseEncoding.base64Url();

public static class Factory implements BlobId.Factory {
private final BaseEncoding baseEncoding;

public Factory() {
this.baseEncoding = Optional.ofNullable(System.getProperty(HASH_BLOB_ID_ENCODING_TYPE_PROPERTY))
.map(Factory::baseEncodingFrom)
.orElse(HASH_BLOB_ID_ENCODING_DEFAULT);
}

@Override
public HashBlobId forPayload(byte[] payload) {
Preconditions.checkArgument(payload != null);
return base64(Hashing.sha256().hashBytes(payload));
}

@Override
public BlobId forPayload(ByteSource payload) {
try {
return base64(payload.hash(Hashing.sha256()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private HashBlobId base64(HashCode hashCode) {
byte[] bytes = hashCode.asBytes();
return new HashBlobId(baseEncoding.encode(bytes));
}

@Override
public HashBlobId of(String id) {
Expand All @@ -75,24 +39,6 @@ public HashBlobId of(String id) {
public HashBlobId from(String id) {
return of(id);
}

private static BaseEncoding baseEncodingFrom(String encodingType) {
switch (encodingType) {
case "base16":
case "hex":
return BaseEncoding.base16();
case "base64":
return BaseEncoding.base64();
case "base64Url":
return BaseEncoding.base64Url();
case "base32":
return BaseEncoding.base32();
case "base32Hex":
return BaseEncoding.base32Hex();
default:
throw new IllegalArgumentException("Unknown encoding type: " + encodingType);
}
}
}

private final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,22 @@ public Publisher<BlobId> save(BucketName bucketName, InputStream data, StoragePo

@Override
public Publisher<BlobId> save(BucketName bucketName, ByteSource data, StoragePolicy storagePolicy) {
return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
return metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
}

@Override
public Publisher<BlobId> save(BucketName bucketName, byte[] data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
return metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}

@Override
public Publisher<BlobId> save(BucketName bucketName, InputStream data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}

@Override
public Publisher<BlobId> save(BucketName bucketName, ByteSource data, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
return metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ default void saveShouldReturnBlobIdOfInputStream(BlobStore.StoragePolicy storage
BucketName defaultBucketName = store.getDefaultBucketName();

BlobId blobId = Mono.from(store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY), storagePolicy)).block();

assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
// This fix is ok because it will only affect deduplication, after this change the same content might be assigned a different blobid
// and thus might be duplicated in the store. No data can be lost since no api allows for externally deterministic blob id construction
// before this change.
assertThat(blobId).isEqualTo(blobIdFactory().of("MfemXjFVhqwZi9eYtmKc5JA9CJlHbVdBqfMuLlIbamY="));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;

import org.apache.james.util.ClassLoaderUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import nl.jqno.equalsverifier.EqualsVerifier;

class HashBlobIdTest {

private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();

@BeforeEach
void beforeEach() {
System.clearProperty("james.blob.id.hash.encoding");
}

@Test
void shouldRespectBeanContract() {
EqualsVerifier.forClass(HashBlobId.class).verify();
Expand All @@ -66,61 +53,4 @@ void fromShouldThrowOnEmpty() {
assertThatThrownBy(() -> BLOB_ID_FACTORY.from(""))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void forPayloadShouldThrowOnNull() {
assertThatThrownBy(() -> BLOB_ID_FACTORY.forPayload((byte[]) null))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void forPayloadShouldHashEmptyArray() {
BlobId blobId = BLOB_ID_FACTORY.forPayload(new byte[0]);

assertThat(blobId.asString()).isEqualTo("47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU=");
}

@Test
void forPayloadShouldHashArray() {
BlobId blobId = BLOB_ID_FACTORY.forPayload("content".getBytes(StandardCharsets.UTF_8));

assertThat(blobId.asString()).isEqualTo("7XACtDnprIRfIjV9giusFERzD722AW0-yUMil7nsn3M=");
}


@ParameterizedTest
@MethodSource("encodingTypeAndExpectedHash")
void forPayloadShouldSupportEncodingWhenConfigured(String encoding, String expectedHash) {
System.setProperty("james.blob.id.hash.encoding", encoding);
BlobId blobId = new HashBlobId.Factory().forPayload("content".getBytes(StandardCharsets.UTF_8));
assertThat(blobId.asString()).isEqualTo(expectedHash);
}

static Stream<Arguments> encodingTypeAndExpectedHash() {
return Stream.of(
Arguments.of("base16", "ED7002B439E9AC845F22357D822BAC1444730FBDB6016D3EC9432297B9EC9F73"),
Arguments.of("hex", "ED7002B439E9AC845F22357D822BAC1444730FBDB6016D3EC9432297B9EC9F73"),
Arguments.of("base32", "5VYAFNBZ5GWIIXZCGV6YEK5MCRCHGD55WYAW2PWJIMRJPOPMT5ZQ===="),
Arguments.of("base64", "7XACtDnprIRfIjV9giusFERzD722AW0+yUMil7nsn3M="),
Arguments.of("base64Url", "7XACtDnprIRfIjV9giusFERzD722AW0-yUMil7nsn3M="),
Arguments.of("base32", "5VYAFNBZ5GWIIXZCGV6YEK5MCRCHGD55WYAW2PWJIMRJPOPMT5ZQ===="),
Arguments.of("base32Hex", "TLO05D1PT6M88NP26LUO4ATC2H2763TTMO0MQFM98CH9FEFCJTPG===="));
}

@Test
void newFactoryShouldFailWhenInvalidEncoding() {
System.setProperty("james.blob.id.hash.encoding", "invalid");
assertThatThrownBy(HashBlobId.Factory::new)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unknown encoding type: invalid");
}

@Test
void forPayloadShouldCalculateDifferentHashesWhenCraftedSha1Collision() throws Exception {
byte[] payload1 = ClassLoaderUtils.getSystemResourceAsByteArray("shattered-1.pdf");
byte[] payload2 = ClassLoaderUtils.getSystemResourceAsByteArray("shattered-2.pdf");
BlobId blobId1 = BLOB_ID_FACTORY.forPayload(payload1);
BlobId blobId2 = BLOB_ID_FACTORY.forPayload(payload2);
assertThat(blobId1).isNotEqualTo(blobId2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,9 @@

import java.util.Objects;

import org.apache.commons.lang3.NotImplementedException;

import com.google.common.io.ByteSource;

public class TestBlobId implements BlobId {

public static class Factory implements BlobId.Factory {
@Override
public BlobId forPayload(byte[] payload) {
throw new NotImplementedException("Use from(String) instead");
}

@Override
public BlobId forPayload(ByteSource payload) {
throw new NotImplementedException("Use from(String) instead");
}

@Override
public BlobId of(String id) {
return new TestBlobId(id);
Expand Down
Loading

0 comments on commit d55db91

Please sign in to comment.