Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter: load JSON definitions and policies from file #7

Merged
merged 2 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions kroxylicious-filter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@
<groupId>io.strimzi</groupId>
<artifactId>kms-test</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kms-vault</artifactId>
<version>${project.version}</version>
robobario marked this conversation as resolved.
Show resolved Hide resolved
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kms-keyprotect</artifactId>
<version>${project.version}</version>
robobario marked this conversation as resolved.
Show resolved Hide resolved
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.kroxylicious</groupId>
Expand Down Expand Up @@ -57,20 +70,29 @@
<version>0.94.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>vault</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.version}</version>
<version>3.5.0</version>
SamBarker marked this conversation as resolved.
Show resolved Hide resolved
<executions>
<execution>
<configuration>
<configuration combine.self="override">
robobario marked this conversation as resolved.
Show resolved Hide resolved
<createDependencyReducedPom>false</createDependencyReducedPom>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>fat</shadedClassifierName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
import io.kroxylicious.proxy.filter.KrpcFilterContext;
import io.kroxylicious.proxy.filter.MetadataResponseFilter;
import io.strimzi.kafka.topicenc.EncryptionModule;
import io.strimzi.kafka.topicenc.kms.KmsDefinition;
import io.strimzi.kafka.topicenc.kms.test.TestKms;
import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository;
import io.strimzi.kafka.topicenc.policy.TopicPolicy;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
Expand All @@ -22,21 +18,25 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;

import static io.strimzi.kafka.topicenc.common.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.toSet;

public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilter, MetadataResponseFilter {

private static final Logger log = LoggerFactory.getLogger(FetchDecryptFilter.class);
public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 10;
public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12;
private final Map<Uuid, String> topicUuidToName = new HashMap<>();

private final EncryptionModule module = new EncryptionModule(new InMemoryPolicyRepository(List.of(new TopicPolicy().setTopic(TopicPolicy.ALL_TOPICS).setKms(new TestKms(new KmsDefinition())))));
private final EncryptionModule module;

public FetchDecryptFilter(TopicEncryptionConfig config) {
module = new EncryptionModule(config.getPolicyRepository());
}

@Override
public void onFetchRequest(short apiVersion, RequestHeaderData header, FetchRequestData request, KrpcFilterContext context) {
Expand Down Expand Up @@ -99,7 +99,7 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r
for (FetchResponseData.FetchableTopicResponse fetchResponse : response.responses()) {
Uuid originalUuid = fetchResponse.topicId();
String originalName = fetchResponse.topic();
if (Strings.isNullOrBlank(originalName)) {
if (isNullOrEmpty(originalName)) {
SamBarker marked this conversation as resolved.
Show resolved Hide resolved
fetchResponse.setTopic(topicUuidToName.get(originalUuid));
fetchResponse.setTopicId(null);
}
Expand All @@ -117,11 +117,11 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r


private boolean isResolvable(FetchResponseData.FetchableTopicResponse fetchableTopicResponse) {
return !Strings.isNullOrBlank(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId());
return !isNullOrEmpty(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId());
}

private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) {
return !Strings.isNullOrBlank(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId());
return !isNullOrEmpty(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId());
}

@Override
Expand All @@ -134,6 +134,16 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion)
if (log.isTraceEnabled()) {
log.trace("received metadata response: {}", MetadataResponseDataJsonConverter.write(response, apiVersion));
}
response.topics().forEach(topic -> topicUuidToName.put(topic.topicId(), topic.name()));
response.topics().forEach(topic -> {
if (topic.errorCode() == 0) {
if (topic.topicId() != null && !isNullOrEmpty(topic.name())) {
topicUuidToName.put(topic.topicId(), topic.name());
} else {
log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name());
}
} else {
log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name());
robobario marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kroxylicious.proxy.config.BaseConfig;
import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository;
import io.strimzi.kafka.topicenc.policy.JsonPolicyLoader;
import io.strimzi.kafka.topicenc.policy.PolicyRepository;
import io.strimzi.kafka.topicenc.policy.TopicPolicy;

import java.io.File;
import java.util.List;

public class InMemoryPolicyRepositoryConfig extends BaseConfig {

public static final String KMS_DEFINITIONS_FILE_PROP_NAME = "kmsDefinitionsFile";
public static final String TOPIC_POLICIES_FILE_PROP_NAME = "topicPoliciesFile";
private final PolicyRepository policyRepository;

@JsonCreator
public InMemoryPolicyRepositoryConfig(@JsonProperty(KMS_DEFINITIONS_FILE_PROP_NAME) String kmsDefinitionsFile, @JsonProperty(TOPIC_POLICIES_FILE_PROP_NAME) String topicPoliciesFile) {
File kmsDefsFile = new File(kmsDefinitionsFile);
if (!kmsDefsFile.exists()) {
throw new IllegalArgumentException(KMS_DEFINITIONS_FILE_PROP_NAME + " " + kmsDefinitionsFile + " does not exist");
}
File policyFile = new File(topicPoliciesFile);
if (!policyFile.exists()) {
throw new IllegalArgumentException(TOPIC_POLICIES_FILE_PROP_NAME + " " + policyFile + " does not exist");
}
try {
List<TopicPolicy> topicPolicies = JsonPolicyLoader.loadTopicPolicies(kmsDefsFile, policyFile);
policyRepository = new InMemoryPolicyRepository(topicPolicies);
} catch (Exception e) {
throw new RuntimeException("Failed to create topic policies", e);
}
}

public PolicyRepository getPolicyRepository() {
return policyRepository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@
import io.kroxylicious.proxy.filter.KrpcFilterContext;
import io.kroxylicious.proxy.filter.ProduceRequestFilter;
import io.strimzi.kafka.topicenc.EncryptionModule;
import io.strimzi.kafka.topicenc.kms.KmsDefinition;
import io.strimzi.kafka.topicenc.kms.test.TestKms;
import io.strimzi.kafka.topicenc.policy.InMemoryPolicyRepository;
import io.strimzi.kafka.topicenc.policy.TopicPolicy;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ProduceEncryptFilter implements ProduceRequestFilter {


private static final Logger log = LoggerFactory.getLogger(ProduceEncryptFilter.class);

private final EncryptionModule module = new EncryptionModule(new InMemoryPolicyRepository(List.of(new TopicPolicy().setTopic(TopicPolicy.ALL_TOPICS).setKms(new TestKms(new KmsDefinition())))));
private final EncryptionModule module;

public ProduceEncryptFilter(TopicEncryptionConfig config) {
module = new EncryptionModule(config.getPolicyRepository());
}

@Override
public void onProduceRequest(short apiVersion, RequestHeaderData header, ProduceRequestData request, KrpcFilterContext context) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kroxylicious.proxy.config.BaseConfig;
import io.strimzi.kafka.topicenc.policy.PolicyRepository;

import java.util.Objects;

public class TopicEncryptionConfig extends BaseConfig {

public static final String IN_MEMORY_POLICY_REPOSITORY_PROP_NAME = "inMemoryPolicyRepository";
private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository;

@JsonCreator
public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) {
this.inMemoryPolicyRepository = inMemoryPolicyRepository;
Objects.requireNonNull(inMemoryPolicyRepository, "Currently " + IN_MEMORY_POLICY_REPOSITORY_PROP_NAME
+ " configuration is required as it is the only PolicyRepository implementation");
}

public PolicyRepository getPolicyRepository() {
return inMemoryPolicyRepository.getPolicyRepository();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ public class TopicEncryptionContributor extends BaseContributor<KrpcFilter> impl
public static final String DECRYPT_FETCH = "TopicEncryption::DecryptFetch";
public static final String ENCRYPT_PRODUCE = "TopicEncryption::EncryptProduce";
public static final BaseContributorBuilder<KrpcFilter> FILTERS = BaseContributor.<KrpcFilter>builder()
.add(DECRYPT_FETCH, FetchDecryptFilter::new)
.add(ENCRYPT_PRODUCE, ProduceEncryptFilter::new);
.add(DECRYPT_FETCH, TopicEncryptionConfig.class, FetchDecryptFilter::new)
.add(ENCRYPT_PRODUCE, TopicEncryptionConfig.class, ProduceEncryptFilter::new);

public TopicEncryptionContributor() {
super(FILTERS);
Expand Down
Loading