From fb9abaebfefdc1be0d2a490a30f4e681b2c3c8ea Mon Sep 17 00:00:00 2001 From: Robert Young Date: Thu, 29 Jun 2023 16:42:15 +1200 Subject: [PATCH 1/2] Fix bug with requesting topic metadata Why: At apiVersion 10, the MetadataRequest is only allowed to take null topic ids. So it responds with an error when we set the id. We need to be at apiVersion 12. We were also missing a guard to prevent caching in this error condition which allowed an empty topic name to be put in the cache on error. Resulting in no decryption being applied. Signed-off-by: Robert Young --- .../topicenc/kroxylicious/FetchDecryptFilter.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index 83a294a..229af62 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -33,7 +33,7 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilter, MetadataResponseFilter { private static final Logger log = LoggerFactory.getLogger(FetchDecryptFilter.class); - public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 10; + public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12; private final Map topicUuidToName = new HashMap<>(); private final EncryptionModule module = new EncryptionModule(new InMemoryPolicyRepository(List.of(new TopicPolicy().setTopic(TopicPolicy.ALL_TOPICS).setKms(new TestKms(new KmsDefinition()))))); @@ -134,6 +134,16 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) if (log.isTraceEnabled()) { log.trace("received metadata response: {}", MetadataResponseDataJsonConverter.write(response, apiVersion)); } - response.topics().forEach(topic -> topicUuidToName.put(topic.topicId(), topic.name())); + response.topics().forEach(topic -> { + if (topic.errorCode() == 0) { + if (topic.topicId() != null && !Strings.isNullOrBlank(topic.name())) { + topicUuidToName.put(topic.topicId(), topic.name()); + } else { + log.warn("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); + } + } else { + log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name()); + } + }); } } From 264af3c135d467e3b81cfd7ce2255f5081c96566 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Thu, 22 Jun 2023 16:01:03 +1200 Subject: [PATCH 2/2] Filter: load JSON definitions and policies from file Why: We want users to have access to the full Smorgasbord of configuration including different encryption policies per-topic and support for multiple KMS implementations. With this change the Kroxylicious Filter will require configuration to point to the two JSON files. This gives us the closest integration with the encryption module because `JsonPolicyLoader.loadTopicPolicies` works in terms of File objects and does some useful work like validating and associating policies and KMS implementations. In future, if we want to be able to embed the KMS definitions and topic policies in the Filter configuration it might be nice if `JsonPolicyLoader` had another method like: `List loadTopicPolicies(List topicPolicies, Map kmsDefs)` that did the same work without the dependency on loading JSON from File. Signed-off-by: Robert Young --- kroxylicious-filter/pom.xml | 26 ++- .../kroxylicious/FetchDecryptFilter.java | 22 +- .../InMemoryPolicyRepositoryConfig.java | 41 ++++ .../kroxylicious/ProduceEncryptFilter.java | 13 +- .../kafka/topicenc/kroxylicious/Strings.java | 7 - .../kroxylicious/TopicEncryptionConfig.java | 25 +++ .../TopicEncryptionContributor.java | 4 +- ...tionTest.java => FetchDecryptionTest.java} | 194 +++++++++--------- .../kroxylicious/KafkaAssertions.java | 45 ++++ .../kroxylicious/ProduceEncryptionTest.java | 111 ++++++++++ .../topicenc/kroxylicious/TestCrypter.java | 47 +++++ .../kafka/topicenc/kroxylicious/Vault.java | 29 +++ .../VaultFetchDecryptionTest.java | 105 ++++++++++ .../VaultProduceEncryptionTest.java | 110 ++++++++++ .../config/EncryptionModuleConfigurer.java | 73 +++++++ .../topicenc/kroxylicious/kms/MockKms.java | 32 +++ .../kroxylicious/kms/MockKmsFactory.java | 20 ++ .../io.strimzi.kafka.topicenc.kms.KmsFactory | 1 + 18 files changed, 783 insertions(+), 122 deletions(-) create mode 100644 kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/InMemoryPolicyRepositoryConfig.java delete mode 100644 kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/Strings.java create mode 100644 kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java rename kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/{TopicEncryptionTest.java => FetchDecryptionTest.java} (53%) create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/KafkaAssertions.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptionTest.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TestCrypter.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/Vault.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultFetchDecryptionTest.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultProduceEncryptionTest.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/config/EncryptionModuleConfigurer.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKms.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKmsFactory.java create mode 100644 kroxylicious-filter/src/test/resources/META-INF/services/io.strimzi.kafka.topicenc.kms.KmsFactory diff --git a/kroxylicious-filter/pom.xml b/kroxylicious-filter/pom.xml index 6c8aa90..f599cef 100644 --- a/kroxylicious-filter/pom.xml +++ b/kroxylicious-filter/pom.xml @@ -21,6 +21,19 @@ io.strimzi kms-test ${project.version} + true + + + io.strimzi + kms-vault + ${project.version} + true + + + io.strimzi + kms-keyprotect + ${project.version} + true io.kroxylicious @@ -57,6 +70,12 @@ 0.94.0 test + + org.testcontainers + vault + 1.18.3 + test + @@ -64,13 +83,16 @@ org.apache.maven.plugins maven-shade-plugin - ${maven.shade.version} + 3.5.0 - + false true fat + + + org.slf4j:* diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index 229af62..7d391e2 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -5,10 +5,6 @@ import io.kroxylicious.proxy.filter.KrpcFilterContext; import io.kroxylicious.proxy.filter.MetadataResponseFilter; import io.strimzi.kafka.topicenc.EncryptionModule; -import io.strimzi.kafka.topicenc.kms.KmsDefinition; -import io.strimzi.kafka.topicenc.kms.test.TestKms; -import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository; -import io.strimzi.kafka.topicenc.policy.TopicPolicy; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; @@ -22,12 +18,12 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.function.Predicate; +import static io.strimzi.kafka.topicenc.common.Strings.isNullOrEmpty; import static java.util.stream.Collectors.toSet; public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilter, MetadataResponseFilter { @@ -36,7 +32,11 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilt public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12; private final Map topicUuidToName = new HashMap<>(); - private final EncryptionModule module = new EncryptionModule(new InMemoryPolicyRepository(List.of(new TopicPolicy().setTopic(TopicPolicy.ALL_TOPICS).setKms(new TestKms(new KmsDefinition()))))); + private final EncryptionModule module; + + public FetchDecryptFilter(TopicEncryptionConfig config) { + module = new EncryptionModule(config.getPolicyRepository()); + } @Override public void onFetchRequest(short apiVersion, RequestHeaderData header, FetchRequestData request, KrpcFilterContext context) { @@ -99,7 +99,7 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r for (FetchResponseData.FetchableTopicResponse fetchResponse : response.responses()) { Uuid originalUuid = fetchResponse.topicId(); String originalName = fetchResponse.topic(); - if (Strings.isNullOrBlank(originalName)) { + if (isNullOrEmpty(originalName)) { fetchResponse.setTopic(topicUuidToName.get(originalUuid)); fetchResponse.setTopicId(null); } @@ -117,11 +117,11 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r private boolean isResolvable(FetchResponseData.FetchableTopicResponse fetchableTopicResponse) { - return !Strings.isNullOrBlank(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId()); + return !isNullOrEmpty(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId()); } private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) { - return !Strings.isNullOrBlank(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId()); + return !isNullOrEmpty(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId()); } @Override @@ -136,10 +136,10 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) } response.topics().forEach(topic -> { if (topic.errorCode() == 0) { - if (topic.topicId() != null && !Strings.isNullOrBlank(topic.name())) { + if (topic.topicId() != null && !isNullOrEmpty(topic.name())) { topicUuidToName.put(topic.topicId(), topic.name()); } else { - log.warn("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); + log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); } } else { log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name()); diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/InMemoryPolicyRepositoryConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/InMemoryPolicyRepositoryConfig.java new file mode 100644 index 0000000..a1df6cf --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/InMemoryPolicyRepositoryConfig.java @@ -0,0 +1,41 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.kroxylicious.proxy.config.BaseConfig; +import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository; +import io.strimzi.kafka.topicenc.policy.JsonPolicyLoader; +import io.strimzi.kafka.topicenc.policy.PolicyRepository; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; + +import java.io.File; +import java.util.List; + +public class InMemoryPolicyRepositoryConfig extends BaseConfig { + + public static final String KMS_DEFINITIONS_FILE_PROP_NAME = "kmsDefinitionsFile"; + public static final String TOPIC_POLICIES_FILE_PROP_NAME = "topicPoliciesFile"; + private final PolicyRepository policyRepository; + + @JsonCreator + public InMemoryPolicyRepositoryConfig(@JsonProperty(KMS_DEFINITIONS_FILE_PROP_NAME) String kmsDefinitionsFile, @JsonProperty(TOPIC_POLICIES_FILE_PROP_NAME) String topicPoliciesFile) { + File kmsDefsFile = new File(kmsDefinitionsFile); + if (!kmsDefsFile.exists()) { + throw new IllegalArgumentException(KMS_DEFINITIONS_FILE_PROP_NAME + " " + kmsDefinitionsFile + " does not exist"); + } + File policyFile = new File(topicPoliciesFile); + if (!policyFile.exists()) { + throw new IllegalArgumentException(TOPIC_POLICIES_FILE_PROP_NAME + " " + policyFile + " does not exist"); + } + try { + List topicPolicies = JsonPolicyLoader.loadTopicPolicies(kmsDefsFile, policyFile); + policyRepository = new InMemoryPolicyRepository(topicPolicies); + } catch (Exception e) { + throw new RuntimeException("Failed to create topic policies", e); + } + } + + public PolicyRepository getPolicyRepository() { + return policyRepository; + } +} diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptFilter.java index f91f694..0e46436 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptFilter.java @@ -3,10 +3,6 @@ import io.kroxylicious.proxy.filter.KrpcFilterContext; import io.kroxylicious.proxy.filter.ProduceRequestFilter; import io.strimzi.kafka.topicenc.EncryptionModule; -import io.strimzi.kafka.topicenc.kms.KmsDefinition; -import io.strimzi.kafka.topicenc.kms.test.TestKms; -import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository; -import io.strimzi.kafka.topicenc.policy.TopicPolicy; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.message.RequestHeaderData; @@ -14,13 +10,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - public class ProduceEncryptFilter implements ProduceRequestFilter { + private static final Logger log = LoggerFactory.getLogger(ProduceEncryptFilter.class); - private final EncryptionModule module = new EncryptionModule(new InMemoryPolicyRepository(List.of(new TopicPolicy().setTopic(TopicPolicy.ALL_TOPICS).setKms(new TestKms(new KmsDefinition()))))); + private final EncryptionModule module; + + public ProduceEncryptFilter(TopicEncryptionConfig config) { + module = new EncryptionModule(config.getPolicyRepository()); + } @Override public void onProduceRequest(short apiVersion, RequestHeaderData header, ProduceRequestData request, KrpcFilterContext context) { diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/Strings.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/Strings.java deleted file mode 100644 index 27afa2d..0000000 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/Strings.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.strimzi.kafka.topicenc.kroxylicious; - -public class Strings { - public static boolean isNullOrBlank(String originalName) { - return originalName == null || originalName.strip().equals(""); - } -} diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java new file mode 100644 index 0000000..43e1cbc --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java @@ -0,0 +1,25 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.kroxylicious.proxy.config.BaseConfig; +import io.strimzi.kafka.topicenc.policy.PolicyRepository; + +import java.util.Objects; + +public class TopicEncryptionConfig extends BaseConfig { + + public static final String IN_MEMORY_POLICY_REPOSITORY_PROP_NAME = "inMemoryPolicyRepository"; + private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository; + + @JsonCreator + public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) { + this.inMemoryPolicyRepository = inMemoryPolicyRepository; + Objects.requireNonNull(inMemoryPolicyRepository, "Currently " + IN_MEMORY_POLICY_REPOSITORY_PROP_NAME + + " configuration is required as it is the only PolicyRepository implementation"); + } + + public PolicyRepository getPolicyRepository() { + return inMemoryPolicyRepository.getPolicyRepository(); + } +} diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionContributor.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionContributor.java index f768071..5cc2fbb 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionContributor.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionContributor.java @@ -9,8 +9,8 @@ public class TopicEncryptionContributor extends BaseContributor impl public static final String DECRYPT_FETCH = "TopicEncryption::DecryptFetch"; public static final String ENCRYPT_PRODUCE = "TopicEncryption::EncryptProduce"; public static final BaseContributorBuilder FILTERS = BaseContributor.builder() - .add(DECRYPT_FETCH, FetchDecryptFilter::new) - .add(ENCRYPT_PRODUCE, ProduceEncryptFilter::new); + .add(DECRYPT_FETCH, TopicEncryptionConfig.class, FetchDecryptFilter::new) + .add(ENCRYPT_PRODUCE, TopicEncryptionConfig.class, ProduceEncryptFilter::new); public TopicEncryptionContributor() { super(FILTERS); diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptionTest.java similarity index 53% rename from kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionTest.java rename to kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptionTest.java index c05c8d4..cfd4837 100644 --- a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionTest.java +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptionTest.java @@ -6,14 +6,13 @@ import io.kroxylicious.test.tester.KroxyliciousTester; import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Uuid; @@ -22,49 +21,99 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; -import java.nio.charset.StandardCharsets; -import java.time.Duration; +import javax.crypto.SecretKey; +import java.io.File; import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.withDefaultFilters; import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; +import static io.strimzi.kafka.topicenc.kroxylicious.KafkaAssertions.assertSingletonRecordEquals; import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionContributor.DECRYPT_FETCH; -import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionContributor.ENCRYPT_PRODUCE; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.getConfiguration; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.mockKmsDefinition; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.fail; @ExtendWith(KafkaClusterExtension.class) -class TopicEncryptionTest { +class FetchDecryptionTest { + public static final String TOPIC_NAME_A = "apple"; + public static final String TOPIC_NAME_B = "banana"; + public static final String UNENCRYPTED_TOPIC = "unencryptedTopic"; + public static final String UNENCRYPTED_VALUE = "unencryptedValue"; + public static final String KMS_NAME = "test"; private static final short PRE_TOPIC_ID_SCHEMA = (short) 12; private static final short POST_TOPIC_ID_SCHEMA = (short) 13; - public static final String TOPIC_NAME = "example"; - public static final String UNENCRYPTED_VALUE = "unencryptedValue"; - + private final Map secretKeys = TestCrypter.uniqueKeyPerKeyReference(Set.of(TOPIC_NAME_A, TOPIC_NAME_B)); + private final byte[] ENCRYPTED_VALUE_TOPIC_A = TestCrypter.toEncryptedRecordValue(UNENCRYPTED_VALUE.getBytes(UTF_8), secretKeys.get(TOPIC_NAME_A)); + private final byte[] ENCRYPTED_VALUE_TOPIC_B = TestCrypter.toEncryptedRecordValue(UNENCRYPTED_VALUE.getBytes(UTF_8), secretKeys.get(TOPIC_NAME_B)); KafkaCluster cluster; private KroxyliciousTester tester; + @NotNull + private static FetchRequestData fetchRequestWith(java.util.function.Consumer func) { + FetchRequestData message = new FetchRequestData(); + message.setReplicaId(-1); + message.setMaxWaitMs(5000); + message.setMinBytes(1); + message.setMaxBytes(1024); + message.setIsolationLevel((byte) 0); + message.setSessionId(0); + message.setSessionEpoch(0); + FetchRequestData.FetchTopic topic = new FetchRequestData.FetchTopic(); + func.accept(topic); + FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition(); + fetchPartition.setPartition(0); + topic.setPartitions(List.of(fetchPartition)); + message.setTopics(List.of(topic)); + return message; + } + + @NotNull + private static String getOnlyRecordValueFromResponse(java.util.function.Consumer responseConsumer, Response responseCompletableFuture) { + FetchResponseData response = (FetchResponseData) responseCompletableFuture.message(); + FetchResponseData.FetchableTopicResponse fetchableTopicResponse = response.responses().get(0); + responseConsumer.accept(fetchableTopicResponse); + FetchResponseData.PartitionData partitionData = fetchableTopicResponse.partitions().get(0); + assertEquals(0, partitionData.partitionIndex()); + MemoryRecords records = (MemoryRecords) partitionData.records(); + Record record = records.records().iterator().next(); + byte[] valueBuffer = new byte[record.valueSize()]; + record.value().get(valueBuffer); + return new String(valueBuffer, UTF_8); + } + + private static void createTopics(Admin admin) throws InterruptedException, ExecutionException, TimeoutException { + admin.createTopics(List.of(new NewTopic(TOPIC_NAME_A, 1, (short) 1), + new NewTopic(TOPIC_NAME_B, 1, (short) 1), + new NewTopic(UNENCRYPTED_TOPIC, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); + } + @BeforeEach - public void setup() { + public void setup(@TempDir File tempDir) { + assertFalse(Arrays.equals(ENCRYPTED_VALUE_TOPIC_A, ENCRYPTED_VALUE_TOPIC_B), "value should be encrypted differently for each topic"); + List definitions = List.of(mockKmsDefinition(KMS_NAME, secretKeys)); + List policies = List.of(new TopicPolicy().setTopic(TOPIC_NAME_A).setKeyReference(TOPIC_NAME_A).setKmsName(KMS_NAME), + new TopicPolicy().setTopic(TOPIC_NAME_B).setKeyReference(TOPIC_NAME_B).setKmsName(KMS_NAME)); + Map topicEncryptionConfig = getConfiguration(tempDir, definitions, policies); tester = kroxyliciousTester(withDefaultFilters(proxy(cluster)) - .addNewFilter().withType(DECRYPT_FETCH).endFilter() - .addNewFilter().withType(ENCRYPT_PRODUCE).endFilter() + .addNewFilter().withType(DECRYPT_FETCH).withConfig(topicEncryptionConfig).endFilter() ); } @@ -74,33 +123,46 @@ public void teardown() { } @Test - public void testEncryptionRoundtrip(Admin admin) { - try (Producer producer = tester.producer(); - Consumer kafkaClusterConsumer = getConsumer(cluster); - Consumer proxyConsumer = tester.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + public void testFetchDecryption(Admin admin) { + try ( + Producer producer = tester.producer(Serdes.String(), Serdes.ByteArray(), Map.of()); + Consumer proxyConsumer = tester.consumer(Serdes.String(), Serdes.String(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + ) { + createTopics(admin); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, ENCRYPTED_VALUE_TOPIC_A)).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_B, ENCRYPTED_VALUE_TOPIC_B)).get(10, TimeUnit.SECONDS); + assertSingletonRecordEquals(proxyConsumer, TOPIC_NAME_A, UNENCRYPTED_VALUE); + assertSingletonRecordEquals(proxyConsumer, TOPIC_NAME_B, UNENCRYPTED_VALUE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testTopicWithoutPolicyIsNotDecrypted(Admin admin) { + try ( + Producer producer = tester.producer(Serdes.String(), Serdes.ByteArray(), Map.of()); + Consumer proxyConsumer = tester.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) ) { - admin.createTopics(List.of(new NewTopic(TOPIC_NAME, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); - producer.send(new ProducerRecord<>(TOPIC_NAME, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); - ConsumerRecord clusterRecord = getOnlyRecord(kafkaClusterConsumer, TOPIC_NAME); - ConsumerRecord proxiedRecord = getOnlyRecord(proxyConsumer, TOPIC_NAME); - assertFalse(Arrays.equals(clusterRecord.value(), proxiedRecord.value())); // todo check encryption? - assertEquals(UNENCRYPTED_VALUE, new String(proxiedRecord.value(), StandardCharsets.UTF_8)); + createTopics(admin); + producer.send(new ProducerRecord<>(UNENCRYPTED_TOPIC, ENCRYPTED_VALUE_TOPIC_A)).get(10, TimeUnit.SECONDS); + KafkaAssertions.assertSingletonRecordEquals(proxyConsumer, UNENCRYPTED_TOPIC, ENCRYPTED_VALUE_TOPIC_A); } catch (Exception e) { throw new RuntimeException(e); } } @Test - public void testEncryptionRoundtripWithPreTopicIdFetchRequest(Admin admin) { - try (Producer producer = tester.producer(); + public void testFetchDecryptionWithPreTopicIdFetchRequest(Admin admin) { + try (Producer producer = tester.producer(Serdes.String(), Serdes.ByteArray(), Map.of()); KafkaClient client = tester.singleRequestClient() ) { - admin.createTopics(List.of(new NewTopic(TOPIC_NAME, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); - producer.send(new ProducerRecord<>(TOPIC_NAME, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); - FetchRequestData message = fetchRequestWith(fetchTopic -> fetchTopic.setTopic(TOPIC_NAME)); + createTopics(admin); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, ENCRYPTED_VALUE_TOPIC_A)).get(10, TimeUnit.SECONDS); + FetchRequestData message = fetchRequestWith(fetchTopic -> fetchTopic.setTopic(TOPIC_NAME_A)); Response responseCompletableFuture = client.getSync(new Request(ApiKeys.FETCH, PRE_TOPIC_ID_SCHEMA, "clientId", message)); String valueString = getOnlyRecordValueFromResponse( - fetchableTopicResponse -> assertEquals(TOPIC_NAME, fetchableTopicResponse.topic()) + fetchableTopicResponse -> assertEquals(TOPIC_NAME_A, fetchableTopicResponse.topic()) , responseCompletableFuture); assertEquals(UNENCRYPTED_VALUE, valueString); } catch (Exception e) { @@ -109,13 +171,13 @@ public void testEncryptionRoundtripWithPreTopicIdFetchRequest(Admin admin) { } @Test - public void testEncryptionRoundtripWithPostTopicIdFetchRequest(Admin admin) { - try (Producer producer = tester.producer(); + public void testFetchDecryptionWithTopicIdFetchRequest(Admin admin) { + try (Producer producer = tester.producer(Serdes.String(), Serdes.ByteArray(), Map.of()); KafkaClient client = tester.singleRequestClient() ) { - CreateTopicsResult result = admin.createTopics(List.of(new NewTopic(TOPIC_NAME, 1, (short) 1))); - Uuid topicUuid = result.topicId(TOPIC_NAME).get(10, TimeUnit.SECONDS); - producer.send(new ProducerRecord<>(TOPIC_NAME, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + CreateTopicsResult result = admin.createTopics(List.of(new NewTopic(TOPIC_NAME_A, 1, (short) 1))); + Uuid topicUuid = result.topicId(TOPIC_NAME_A).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, ENCRYPTED_VALUE_TOPIC_A)).get(10, TimeUnit.SECONDS); FetchRequestData message = fetchRequestWith(fetchTopic -> fetchTopic.setTopicId(topicUuid)); Response responseCompletableFuture = client.getSync(new Request(ApiKeys.FETCH, POST_TOPIC_ID_SCHEMA, "clientId", message)); String valueString = getOnlyRecordValueFromResponse( @@ -127,58 +189,4 @@ public void testEncryptionRoundtripWithPostTopicIdFetchRequest(Admin admin) { } } - @NotNull - private static FetchRequestData fetchRequestWith(java.util.function.Consumer func) { - FetchRequestData message = new FetchRequestData(); - message.setReplicaId(-1); - message.setMaxWaitMs(5000); - message.setMinBytes(1); - message.setMaxBytes(1024); - message.setIsolationLevel((byte) 0); - message.setSessionId(0); - message.setSessionEpoch(0); - FetchRequestData.FetchTopic topic = new FetchRequestData.FetchTopic(); - func.accept(topic); - FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition(); - fetchPartition.setPartition(0); - topic.setPartitions(List.of(fetchPartition)); - message.setTopics(List.of(topic)); - return message; - } - - @NotNull - private static String getOnlyRecordValueFromResponse(java.util.function.Consumer responseConsumer, Response responseCompletableFuture) { - FetchResponseData response = (FetchResponseData) responseCompletableFuture.message(); - FetchResponseData.FetchableTopicResponse fetchableTopicResponse = response.responses().get(0); - responseConsumer.accept(fetchableTopicResponse); - FetchResponseData.PartitionData partitionData = fetchableTopicResponse.partitions().get(0); - assertEquals(0, partitionData.partitionIndex()); - MemoryRecords records = (MemoryRecords) partitionData.records(); - Record record = records.records().iterator().next(); - byte[] valueBuffer = new byte[record.valueSize()]; - record.value().get(valueBuffer); - return new String(valueBuffer, StandardCharsets.UTF_8); - } - - private static ConsumerRecord getOnlyRecord(Consumer kafkaClusterConsumer, String topic) { - kafkaClusterConsumer.subscribe(List.of(topic)); - ConsumerRecords poll = kafkaClusterConsumer.poll(Duration.ofSeconds(10)); - if (poll.count() != 1) { - fail("expected to poll exactly one record from Kafka, received " + poll.count()); - } - Iterable> records = poll.records(topic); - Iterator> iterator = records.iterator(); - return iterator.next(); - } - - @NotNull - private static KafkaConsumer getConsumer(KafkaCluster cluster) { - HashMap config = new HashMap<>(cluster.getKafkaClientConfiguration()); - config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); - config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return new KafkaConsumer<>(config); - } - } \ No newline at end of file diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/KafkaAssertions.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/KafkaAssertions.java new file mode 100644 index 0000000..0e6b0ad --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/KafkaAssertions.java @@ -0,0 +1,45 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class KafkaAssertions { + + public static void assertSingletonRecordEquals(Consumer kafkaClusterConsumer, String topic, byte[] expected) { + ConsumerRecord onlyRecord = getSingletonRecord(kafkaClusterConsumer, topic); + assertArrayEquals(expected, onlyRecord.value(), "only record value in " + topic + " did not match expectation"); + } + + public static void assertSingletonRecordEquals(Consumer kafkaClusterConsumer, String topic, Function transform, String expected) { + ConsumerRecord onlyRecord = getSingletonRecord(kafkaClusterConsumer, topic); + String transformed = transform.apply(onlyRecord.value()); + assertEquals(expected, transformed, "only record value in " + topic + " did not match expectation"); + } + + public static void assertSingletonRecordEquals(Consumer kafkaClusterConsumer, String topic, String expected) { + ConsumerRecord onlyRecord = getSingletonRecord(kafkaClusterConsumer, topic); + assertEquals(expected, onlyRecord.value(), "only record value in " + topic + " did not match expectation"); + } + + private static ConsumerRecord getSingletonRecord(Consumer kafkaClusterConsumer, String topic) { + kafkaClusterConsumer.subscribe(List.of(topic)); + ConsumerRecords poll = kafkaClusterConsumer.poll(Duration.ofSeconds(10)); + if (poll.count() != 1) { + fail("expected to poll exactly one record from Kafka, received " + poll.count()); + } + Iterable> records = poll.records(topic); + Iterator> iterator = records.iterator(); + return iterator.next(); + } + +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptionTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptionTest.java new file mode 100644 index 0000000..5bcc7db --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/ProduceEncryptionTest.java @@ -0,0 +1,111 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import io.kroxylicious.test.tester.KroxyliciousTester; +import io.kroxylicious.testing.kafka.api.KafkaCluster; +import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serdes; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.crypto.SecretKey; +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.withDefaultFilters; +import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; +import static io.strimzi.kafka.topicenc.kroxylicious.KafkaAssertions.assertSingletonRecordEquals; +import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionContributor.ENCRYPT_PRODUCE; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.getConfiguration; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.mockKmsDefinition; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(KafkaClusterExtension.class) +class ProduceEncryptionTest { + public static final String TOPIC_NAME_A = "apple"; + public static final String TOPIC_NAME_B = "banana"; + public static final String UNENCRYPTED_TOPIC = "unencryptedTopic"; + public static final String UNENCRYPTED_VALUE = "unencryptedValue"; + public static final String KMS_NAME = "test"; + private final Map secretKeys = TestCrypter.uniqueKeyPerKeyReference(Set.of(TOPIC_NAME_A, TOPIC_NAME_B)); + KafkaCluster cluster; + private KroxyliciousTester tester; + + private static void createTopics(Admin admin) throws InterruptedException, ExecutionException, TimeoutException { + admin.createTopics(List.of(new NewTopic(TOPIC_NAME_A, 1, (short) 1), + new NewTopic(TOPIC_NAME_B, 1, (short) 1), + new NewTopic(UNENCRYPTED_TOPIC, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); + } + + @BeforeEach + public void setup(@TempDir File tempDir) { + assertFalse(Arrays.equals(secretKeys.get(TOPIC_NAME_A).getEncoded(), secretKeys.get(TOPIC_NAME_B).getEncoded()), "value should be encrypted differently for each topic"); + List definitions = List.of(mockKmsDefinition(KMS_NAME, secretKeys)); + List policies = List.of(new TopicPolicy().setTopic(TOPIC_NAME_A).setKeyReference(TOPIC_NAME_A).setKmsName(KMS_NAME), + new TopicPolicy().setTopic(TOPIC_NAME_B).setKeyReference(TOPIC_NAME_B).setKmsName(KMS_NAME)); + Map topicEncryptionConfig = getConfiguration(tempDir, definitions, policies); + tester = kroxyliciousTester(withDefaultFilters(proxy(cluster)) + .addNewFilter().withType(ENCRYPT_PRODUCE).withConfig(topicEncryptionConfig).endFilter() + ); + } + + @AfterEach + public void teardown() { + tester.close(); + } + + @Test + public void testFetchDecryption(Admin admin) { + try ( + Producer producer = tester.producer(); + Consumer proxyConsumer = tester.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + ) { + createTopics(admin); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_B, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + assertSingletonRecordEquals(proxyConsumer, TOPIC_NAME_A, (s) -> decrypt(s, TOPIC_NAME_A), UNENCRYPTED_VALUE); + assertSingletonRecordEquals(proxyConsumer, TOPIC_NAME_B, (s) -> decrypt(s, TOPIC_NAME_B), UNENCRYPTED_VALUE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @NotNull + private String decrypt(byte[] s, String topicName) { + return new String(TestCrypter.toDecryptedRecordValue(s, secretKeys.get(topicName)), UTF_8); + } + + @Test + public void testTopicWithoutPolicyIsNotEncrypted(Admin admin) { + try ( + Producer producer = tester.producer(); + Consumer proxyConsumer = tester.consumer() + ) { + createTopics(admin); + producer.send(new ProducerRecord<>(UNENCRYPTED_TOPIC, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + assertSingletonRecordEquals(proxyConsumer, UNENCRYPTED_TOPIC, UNENCRYPTED_VALUE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TestCrypter.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TestCrypter.java new file mode 100644 index 0000000..2bf1b97 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TestCrypter.java @@ -0,0 +1,47 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import io.strimzi.kafka.topicenc.common.EncUtils; +import io.strimzi.kafka.topicenc.enc.AesGcmEncrypter; +import io.strimzi.kafka.topicenc.enc.EncData; +import io.strimzi.kafka.topicenc.ser.AesGcmV1SerDer; + +import javax.crypto.SecretKey; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class TestCrypter { + private static final AesGcmV1SerDer aesGcmV1SerDer = new AesGcmV1SerDer(); + + public static SecretKey randomAesKey() { + try { + return EncUtils.generateAesKey(256); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + public static byte[] toEncryptedRecordValue(byte[] unencryptedBytes, SecretKey key) { + try { + AesGcmEncrypter crypterForKey = new AesGcmEncrypter(key); + return aesGcmV1SerDer.serialize(crypterForKey.encrypt(unencryptedBytes)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static byte[] toDecryptedRecordValue(byte[] encryptedRecordValue, SecretKey key) { + try { + EncData encData = aesGcmV1SerDer.deserialize(encryptedRecordValue); + AesGcmEncrypter crypterForKey = new AesGcmEncrypter(key); + return crypterForKey.decrypt(encData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static Map uniqueKeyPerKeyReference(Set keyReferences) { + return keyReferences.stream().collect(Collectors.toMap(n -> n, n -> randomAesKey())); + } +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/Vault.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/Vault.java new file mode 100644 index 0000000..0a91cbf --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/Vault.java @@ -0,0 +1,29 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import org.testcontainers.vault.VaultContainer; + +import java.io.Closeable; +import java.util.UUID; +import java.util.function.Consumer; + +public class Vault { + private static final String LATEST_VAULT_IMAGE = "hashicorp/vault:1.13"; + + public record VaultTester(VaultContainer vault, String token) implements Closeable { + + @Override + public void close() { + vault.close(); + } + } + + public static VaultTester startVaultContainer(Consumer> withConfiguration) { + String vaultToken = UUID.randomUUID().toString(); + VaultContainer vaultContainer = new VaultContainer<>(LATEST_VAULT_IMAGE) + .withVaultToken(vaultToken) + .withInitCommand("secrets enable transit", "write -f transit/keys/my-key"); + withConfiguration.accept(vaultContainer); + vaultContainer.start(); + return new VaultTester(vaultContainer, vaultToken); + } +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultFetchDecryptionTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultFetchDecryptionTest.java new file mode 100644 index 0000000..ef29fe3 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultFetchDecryptionTest.java @@ -0,0 +1,105 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import io.kroxylicious.test.tester.KroxyliciousTester; +import io.kroxylicious.testing.kafka.api.KafkaCluster; +import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serdes; +import org.jose4j.base64url.Base64; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.crypto.SecretKey; +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.withDefaultFilters; +import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; +import static io.strimzi.kafka.topicenc.kroxylicious.KafkaAssertions.assertSingletonRecordEquals; +import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionContributor.DECRYPT_FETCH; +import static io.strimzi.kafka.topicenc.kroxylicious.Vault.startVaultContainer; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.getConfiguration; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.vaultKmsDefinition; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(KafkaClusterExtension.class) +class VaultFetchDecryptionTest { + public static final String TOPIC_NAME_A = "circle"; + public static final String TOPIC_NAME_B = "square"; + public static final String UNENCRYPTED_VALUE = "unencryptedValue"; + public static final String KMS_NAME = "vault"; + public static final String SECRET_NAMESPACE = "secret"; + private static final Map secretKeys = TestCrypter.uniqueKeyPerKeyReference(Set.of(TOPIC_NAME_A, TOPIC_NAME_B)); + private static final byte[] ENCRYPTED_VALUE_TOPIC_A = TestCrypter.toEncryptedRecordValue(UNENCRYPTED_VALUE.getBytes(UTF_8), secretKeys.get(TOPIC_NAME_A)); + private static final byte[] ENCRYPTED_VALUE_TOPIC_B = TestCrypter.toEncryptedRecordValue(UNENCRYPTED_VALUE.getBytes(UTF_8), secretKeys.get(TOPIC_NAME_B)); + public static Vault.VaultTester vaultTester; + + KafkaCluster cluster; + private KroxyliciousTester tester; + + private static String base64KeyFor(String topicName) { + return Base64.encode(secretKeys.get(topicName).getEncoded()); + } + + @BeforeAll + public static void beforeAll() { + vaultTester = startVaultContainer(vault -> { + vault.withSecretInVault(SECRET_NAMESPACE + "/" + TOPIC_NAME_A, TOPIC_NAME_A + "=" + base64KeyFor(TOPIC_NAME_A)) + .withSecretInVault(SECRET_NAMESPACE + "/" + TOPIC_NAME_B, TOPIC_NAME_B + "=" + base64KeyFor(TOPIC_NAME_B)); + }); + } + + @AfterAll + public static void afterAll() { + vaultTester.close(); + } + + @BeforeEach + public void setup(@TempDir File tempDir) { + assertFalse(Arrays.equals(secretKeys.get(TOPIC_NAME_A).getEncoded(), secretKeys.get(TOPIC_NAME_B).getEncoded()), "value should be encrypted differently for each topic"); + List definitions = List.of(vaultKmsDefinition(KMS_NAME, vaultTester, vaultTester.token(), SECRET_NAMESPACE)); + List policies = List.of(new TopicPolicy().setTopic(TOPIC_NAME_A).setKeyReference(TOPIC_NAME_A).setKmsName(KMS_NAME), new TopicPolicy().setTopic(TOPIC_NAME_B).setKeyReference(TOPIC_NAME_B).setKmsName(KMS_NAME)); + Map topicEncryptionConfig = getConfiguration(tempDir, definitions, policies); + tester = kroxyliciousTester(withDefaultFilters(proxy(cluster)) + .addNewFilter().withType(DECRYPT_FETCH).withConfig(topicEncryptionConfig).endFilter() + ); + } + + @AfterEach + public void teardown() { + tester.close(); + } + + @Test + public void testFetchDecryption(Admin admin) { + try (Producer producer = tester.producer(Serdes.String(), Serdes.ByteArray(), Map.of()); + Consumer consumer = tester.consumer(Serdes.String(), Serdes.String(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + ) { + admin.createTopics(List.of(new NewTopic(TOPIC_NAME_A, 1, (short) 1), new NewTopic(TOPIC_NAME_B, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, ENCRYPTED_VALUE_TOPIC_A)).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_B, ENCRYPTED_VALUE_TOPIC_B)).get(10, TimeUnit.SECONDS); + assertSingletonRecordEquals(consumer, TOPIC_NAME_A, UNENCRYPTED_VALUE); + assertSingletonRecordEquals(consumer, TOPIC_NAME_B, UNENCRYPTED_VALUE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultProduceEncryptionTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultProduceEncryptionTest.java new file mode 100644 index 0000000..6cb5cc2 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/VaultProduceEncryptionTest.java @@ -0,0 +1,110 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import io.kroxylicious.test.tester.KroxyliciousTester; +import io.kroxylicious.testing.kafka.api.KafkaCluster; +import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serdes; +import org.jose4j.base64url.Base64; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import javax.crypto.SecretKey; +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; +import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.withDefaultFilters; +import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; +import static io.strimzi.kafka.topicenc.kroxylicious.KafkaAssertions.assertSingletonRecordEquals; +import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionContributor.ENCRYPT_PRODUCE; +import static io.strimzi.kafka.topicenc.kroxylicious.Vault.startVaultContainer; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.getConfiguration; +import static io.strimzi.kafka.topicenc.kroxylicious.config.EncryptionModuleConfigurer.vaultKmsDefinition; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(KafkaClusterExtension.class) +class VaultProduceEncryptionTest { + public static final String TOPIC_NAME_A = "circle"; + public static final String TOPIC_NAME_B = "square"; + public static final String UNENCRYPTED_VALUE = "unencryptedValue"; + public static final String KMS_NAME = "vault"; + public static final String SECRET_NAMESPACE = "secret"; + private static final Map secretKeys = TestCrypter.uniqueKeyPerKeyReference(Set.of(TOPIC_NAME_A, TOPIC_NAME_B)); + public static Vault.VaultTester vaultTester; + + KafkaCluster cluster; + private KroxyliciousTester tester; + + private static String base64KeyFor(String topicName) { + return Base64.encode(secretKeys.get(topicName).getEncoded()); + } + + + @BeforeAll + public static void beforeAll() { + vaultTester = startVaultContainer(vault -> { + vault.withSecretInVault(SECRET_NAMESPACE + "/" + TOPIC_NAME_A, TOPIC_NAME_A + "=" + base64KeyFor(TOPIC_NAME_A)) + .withSecretInVault(SECRET_NAMESPACE + "/" + TOPIC_NAME_B, TOPIC_NAME_B + "=" + base64KeyFor(TOPIC_NAME_B)); + }); + } + + @AfterAll + public static void afterAll() { + vaultTester.close(); + } + + @BeforeEach + public void setup(@TempDir File tempDir) { + assertFalse(Arrays.equals(secretKeys.get(TOPIC_NAME_A).getEncoded(), secretKeys.get(TOPIC_NAME_B).getEncoded()), "value should be encrypted differently for each topic"); + List definitions = List.of(vaultKmsDefinition(KMS_NAME, vaultTester, vaultTester.token(), SECRET_NAMESPACE)); + List policies = List.of(new TopicPolicy().setTopic(TOPIC_NAME_A).setKeyReference(TOPIC_NAME_A).setKmsName(KMS_NAME), new TopicPolicy().setTopic(TOPIC_NAME_B).setKeyReference(TOPIC_NAME_B).setKmsName(KMS_NAME)); + Map topicEncryptionConfig = getConfiguration(tempDir, definitions, policies); + tester = kroxyliciousTester(withDefaultFilters(proxy(cluster)) + .addNewFilter().withType(ENCRYPT_PRODUCE).withConfig(topicEncryptionConfig).endFilter() + ); + } + + @AfterEach + public void teardown() { + tester.close(); + } + + @Test + public void testProduceEncryption(Admin admin) { + try (Producer producer = tester.producer(Serdes.String(), Serdes.String(), Map.of()); + Consumer consumer = tester.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "another-group-id", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")) + ) { + admin.createTopics(List.of(new NewTopic(TOPIC_NAME_A, 1, (short) 1), new NewTopic(TOPIC_NAME_B, 1, (short) 1))).all().get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_A, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(TOPIC_NAME_B, UNENCRYPTED_VALUE)).get(10, TimeUnit.SECONDS); + assertSingletonRecordEquals(consumer, TOPIC_NAME_A, (s) -> decrypt(s, TOPIC_NAME_A), UNENCRYPTED_VALUE); + assertSingletonRecordEquals(consumer, TOPIC_NAME_B, (s) -> decrypt(s, TOPIC_NAME_B), UNENCRYPTED_VALUE); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private String decrypt(byte[] s, String topicName) { + return new String(TestCrypter.toDecryptedRecordValue(s, secretKeys.get(topicName)), UTF_8); + } + + +} \ No newline at end of file diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/config/EncryptionModuleConfigurer.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/config/EncryptionModuleConfigurer.java new file mode 100644 index 0000000..587aa0b --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/config/EncryptionModuleConfigurer.java @@ -0,0 +1,73 @@ +package io.strimzi.kafka.topicenc.kroxylicious.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.kroxylicious.Vault; +import io.strimzi.kafka.topicenc.kroxylicious.kms.MockKms; +import io.strimzi.kafka.topicenc.policy.TopicPolicy; + +import javax.crypto.SecretKey; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static io.strimzi.kafka.topicenc.kroxylicious.InMemoryPolicyRepositoryConfig.KMS_DEFINITIONS_FILE_PROP_NAME; +import static io.strimzi.kafka.topicenc.kroxylicious.InMemoryPolicyRepositoryConfig.TOPIC_POLICIES_FILE_PROP_NAME; +import static io.strimzi.kafka.topicenc.kroxylicious.TopicEncryptionConfig.IN_MEMORY_POLICY_REPOSITORY_PROP_NAME; +import static io.strimzi.kafka.topicenc.kroxylicious.kms.MockKmsFactory.MOCK_KMS_NAME; +import static java.nio.charset.StandardCharsets.UTF_8; + +public class EncryptionModuleConfigurer { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + record TestTopicPolicy(String topic, String keyReference, String kmsName) { + + } + + public static Map getConfiguration(File tempDir, List kmsDefinitions, List policies) { + File topicPolicies = writeTopicPolicyFile(tempDir, policies); + File kmsDefinitionFile = writeKmsDefinitionFile(tempDir, kmsDefinitions); + Map configFiles = Map.of(KMS_DEFINITIONS_FILE_PROP_NAME, kmsDefinitionFile.getPath(), TOPIC_POLICIES_FILE_PROP_NAME, topicPolicies.getPath()); + return Map.of(IN_MEMORY_POLICY_REPOSITORY_PROP_NAME, configFiles); + } + + + private static File writeTopicPolicyFile(File tempDir, List policies) { + try { + File file = new File(tempDir, "topicPolicies-" + UUID.randomUUID() + ".json"); + // cannot serialize the real TopicPolicy class because it serializes some unexpected fields like isWildcard + // that cannot be deserialized + Set policiesToWrite = policies.stream().map(topicPolicy -> new TestTopicPolicy(topicPolicy.getTopic(), topicPolicy.getKeyReference(), topicPolicy.getKmsName())).collect(Collectors.toSet()); + Files.writeString(file.toPath(), MAPPER.writeValueAsString(policiesToWrite), UTF_8); + return file; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static File writeKmsDefinitionFile(File tempDir, List definitions) { + try { + File file = new File(tempDir, "kmsDef-" + UUID.randomUUID() + ".json"); + Files.writeString(file.toPath(), MAPPER.writeValueAsString(definitions), UTF_8); + return file; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static KmsDefinition vaultKmsDefinition(String kmsName, Vault.VaultTester tester, String vaultToken, String namespace) { + return new KmsDefinition().setName(kmsName).setType("io.strimzi.kafka.topicenc.kms.vault.VaultKmsFactory").setCredential(vaultToken).setUri(URI.create(tester.vault().getHttpHostAddress() + "/v1/" + namespace + "/data")); + } + + public static KmsDefinition mockKmsDefinition(String kmsName, Map keyReferenceToSecret) { + return new KmsDefinition().setName(kmsName).setType(MOCK_KMS_NAME).setCredential(MockKms.encodeCredentialString(keyReferenceToSecret)); + } + +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKms.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKms.java new file mode 100644 index 0000000..6c1252c --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKms.java @@ -0,0 +1,32 @@ +package io.strimzi.kafka.topicenc.kroxylicious.kms; + +import io.strimzi.kafka.topicenc.common.EncUtils; +import io.strimzi.kafka.topicenc.kms.KeyMgtSystem; + +import javax.crypto.SecretKey; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public class MockKms implements KeyMgtSystem { + private final Map secrets; + + // hacky but with the current API KMS only has a few string values available to it, we are abusing the credential field + // to pass in a map from key reference to key. + public MockKms(String keys) { + secrets = Arrays.stream(keys.split(",")).map(s -> s.split(":")) + .collect(Collectors.toMap(strings -> strings[0], strings -> EncUtils.base64Decode(strings[1]))); + } + + @Override + public SecretKey getKey(String keyReference) { + if(!secrets.containsKey(keyReference)){ + throw new IllegalArgumentException("expect all key references to have a secret"); + } + return secrets.get(keyReference); + } + + public static String encodeCredentialString(Map keyReferenceToSecret){ + return keyReferenceToSecret.entrySet().stream().map(stringSecretKeyEntry -> stringSecretKeyEntry.getKey() + ":" + EncUtils.base64Encode(stringSecretKeyEntry.getValue())).collect(Collectors.joining(",")); + } +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKmsFactory.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKmsFactory.java new file mode 100644 index 0000000..380a635 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/kms/MockKmsFactory.java @@ -0,0 +1,20 @@ +package io.strimzi.kafka.topicenc.kroxylicious.kms; + +import io.strimzi.kafka.topicenc.kms.KeyMgtSystem; +import io.strimzi.kafka.topicenc.kms.KmsDefinition; +import io.strimzi.kafka.topicenc.kms.KmsFactory; + +public class MockKmsFactory implements KmsFactory { + + public static final String MOCK_KMS_NAME = "mock-kms"; + + @Override + public String getName() { + return MOCK_KMS_NAME; + } + + @Override + public KeyMgtSystem createKms(KmsDefinition kmsDef) { + return new MockKms(kmsDef.getCredential()); + } +} diff --git a/kroxylicious-filter/src/test/resources/META-INF/services/io.strimzi.kafka.topicenc.kms.KmsFactory b/kroxylicious-filter/src/test/resources/META-INF/services/io.strimzi.kafka.topicenc.kms.KmsFactory new file mode 100644 index 0000000..ff977dc --- /dev/null +++ b/kroxylicious-filter/src/test/resources/META-INF/services/io.strimzi.kafka.topicenc.kms.KmsFactory @@ -0,0 +1 @@ +io.strimzi.kafka.topicenc.kroxylicious.kms.MockKmsFactory \ No newline at end of file