Skip to content

Commit

Permalink
feat: Add primaryToSecondary mapper for Kafka
Browse files Browse the repository at this point in the history
* Add primaryToSecondary mapper for
Kafka and rename Mapper class.

Signed-off-by: Katherine Stanley <[email protected]>
  • Loading branch information
katheris committed Aug 23, 2023
1 parent c953248 commit 1444fac
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 79 deletions.
23 changes: 12 additions & 11 deletions src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaUser;
import io.strimzi.kafka.access.internal.KafkaAccessParser;
import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSour
public KafkaAccessReconciler(final KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
secretDependentResource = new SecretDependentResource();
commonSecretLabels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
commonSecretLabels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
}

/**
Expand Down Expand Up @@ -141,31 +141,32 @@ public Map<String, EventSource> prepareEventSources(final EventSourceContext<Kaf
LOGGER.info("Preparing event sources");
InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = new InformerEventSource<>(
InformerConfiguration.from(Kafka.class, context)
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessParser.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<KafkaUser, KafkaAccess> kafkaUserEventSource = new InformerEventSource<>(
InformerConfiguration.from(KafkaUser.class, context)
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessParser.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessParser.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziKafkaUserSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessParser.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/io/strimzi/kafka/access/SecretDependentResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Class to represent the data in the Secret created by the operator
Expand Down Expand Up @@ -61,26 +62,23 @@ public Map<String, String> desired(final KafkaAccessSpec spec, final String name
final KafkaReference kafkaReference = spec.getKafka();
final String kafkaClusterName = kafkaReference.getName();
final String kafkaClusterNamespace = Optional.ofNullable(kafkaReference.getNamespace()).orElse(namespace);
final InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = (InformerEventSource<Kafka, KafkaAccess>) context.eventSourceRetriever().getResourceEventSourceFor(Kafka.class);
final Kafka kafka = kafkaEventSource.get(new ResourceID(kafkaClusterName, kafkaClusterNamespace))
.orElseThrow(() -> new IllegalStateException(String.format("Kafka %s/%s missing", kafkaClusterNamespace, kafkaClusterName)));
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());
final Kafka kafka = context.getSecondaryResource(Kafka.class).orElseThrow(illegalStateException("Kafka", kafkaClusterNamespace, kafkaClusterName));
final Map<String, String> data = new HashMap<>(commonSecretData);
final Optional<KafkaUserReference> kafkaUserReference = Optional.ofNullable(spec.getUser());
if (kafkaUserReference.isPresent()) {
if (!KafkaUser.RESOURCE_KIND.equals(kafkaUserReference.get().getKind()) || !KafkaUser.RESOURCE_GROUP.equals(kafkaUserReference.get().getApiGroup())) {
throw new CustomResourceParseException(String.format("User kind must be %s and apiGroup must be %s", KafkaUser.RESOURCE_KIND, KafkaUser.RESOURCE_GROUP));
}
final String kafkaUserName = kafkaUserReference.get().getName();
final String kafkaUserNamespace = Optional.ofNullable(kafkaUserReference.get().getNamespace()).orElse(namespace);
final KafkaUser kafkaUser = context.getSecondaryResource(KafkaUser.class)
.orElseThrow(() -> new IllegalStateException(String.format("KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
final KafkaUser kafkaUser = context.getSecondaryResource(KafkaUser.class).orElseThrow(illegalStateException("KafkaUser", kafkaUserNamespace, kafkaUserName));
final String userSecretName = Optional.ofNullable(kafkaUser.getStatus())
.map(KafkaUserStatus::getSecret)
.orElseThrow(() -> new IllegalStateException(String.format("Secret for KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
.orElseThrow(illegalStateException("Secret in KafkaUser status", kafkaUserNamespace, kafkaUserName));
final InformerEventSource<Secret, KafkaAccess> kafkaUserSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
final Secret kafkaUserSecret = kafkaUserSecretEventSource.get(new ResourceID(userSecretName, kafkaUserNamespace))
.orElseThrow(() -> new IllegalStateException(String.format("Secret for KafkaUser %s/%s missing", kafkaUserNamespace, kafkaUserName)));
.orElseThrow(illegalStateException(String.format("Secret %s for KafkaUser", userSecretName), kafkaUserNamespace, kafkaUserName));
data.putAll(secretDataWithUser(spec, kafka, kafkaUser, new KafkaUserData(kafkaUser).withSecret(kafkaUserSecret)));
} else {
data.putAll(secretData(spec, kafka));
Expand Down Expand Up @@ -116,4 +114,8 @@ protected Map<String, String> secretDataWithUser(final KafkaAccessSpec spec, fin
data.putAll(listener.getConnectionSecretData());
return data;
}

private static Supplier<IllegalStateException> illegalStateException(String type, String namespace, String name) {
return () -> new IllegalStateException(String.format("%s %s/%s missing", type, namespace, name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.stream.Stream;

/**
* Representation of a KafkaAccess parser that collects the resource IDs from different sources
* Maps Strimzi and Kuberentes resources to and from KafkaAccess resources
*/
public class KafkaAccessParser {
public class KafkaAccessMapper {

/**
* The constant for managed-by label
Expand Down Expand Up @@ -63,7 +63,7 @@ public class KafkaAccessParser {
*/
public static final String KAFKA_ACCESS_LABEL_VALUE = "kafka-access-operator";

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessParser.class);
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessMapper.class);

/**
* Filters the stream of KafkaAccess objects to find only the ones that reference the provided Kafka resource.
Expand Down Expand Up @@ -186,7 +186,7 @@ public static Set<ResourceID> secretSecondaryToPrimaryMapper(final Stream<KafkaA
.withNamespace(secretNamespace.get())
.endMetadata()
.build();
resourceIDS.addAll(KafkaAccessParser.kafkaSecondaryToPrimaryMapper(kafkaAccessList, kafka));
resourceIDS.addAll(KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(kafkaAccessList, kafka));
}
}

Expand Down Expand Up @@ -216,4 +216,28 @@ public static Set<ResourceID> kafkaUserPrimaryToSecondaryMapper(final KafkaAcces
});
return resourceIDS;
}

/**
* Finds the Kafka that is referenced by this KafkaAccess object.
*
* @param kafkaAccess KafkaAccess object to parse
*
* @return Set of ResourceIDs containing the Kafka that is referenced by the KafkaAccess
*/
public static Set<ResourceID> kafkaPrimaryToSecondaryMapper(final KafkaAccess kafkaAccess) {
final Set<ResourceID> resourceIDS = new HashSet<>();
Optional.ofNullable(kafkaAccess.getSpec())
.map(KafkaAccessSpec::getKafka)
.ifPresent(kafkaReference -> {
String name = kafkaReference.getName();
String namespace = Optional.ofNullable(kafkaReference.getNamespace())
.orElseGet(() -> Optional.ofNullable(kafkaAccess.getMetadata()).map(ObjectMeta::getNamespace).orElse(null));
if (name == null || namespace == null) {
LOGGER.error("Found Kafka for KafkaAccess instance, but metadata is missing.");
} else {
resourceIDS.add(new ResourceID(name, namespace));
}
});
return resourceIDS;
}
}
12 changes: 6 additions & 6 deletions src/test/java/io/strimzi/kafka/access/ResourceProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.strimzi.api.kafka.model.status.ListenerAddressBuilder;
import io.strimzi.api.kafka.model.status.ListenerStatus;
import io.strimzi.api.kafka.model.status.ListenerStatusBuilder;
import io.strimzi.kafka.access.internal.KafkaAccessParser;
import io.strimzi.kafka.access.internal.KafkaAccessMapper;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessSpec;
import io.strimzi.kafka.access.model.KafkaReference;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static KafkaAccess getKafkaAccess(final String kafkaAccessName, final Str

public static Secret getEmptyKafkaAccessSecret(String secretName, String secretNamespace, String kafkaAccessName) {
final Map<String, String> labels = new HashMap<>();
labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.KAFKA_ACCESS_LABEL_VALUE);
labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE);
final OwnerReference ownerReference = new OwnerReference();
ownerReference.setName(kafkaAccessName);
ownerReference.setKind(KafkaAccess.KIND);
Expand All @@ -85,15 +85,15 @@ public static Secret getEmptyKafkaAccessSecret(String secretName, String secretN

public static Secret getStrimziSecret(final String secretName, final String secretNamespace, final String kafkaInstanceName) {
final Map<String, String> labels = new HashMap<>();
labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_CLUSTER_LABEL_VALUE);
labels.put(KafkaAccessParser.INSTANCE_LABEL_KEY, kafkaInstanceName);
labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE);
labels.put(KafkaAccessMapper.INSTANCE_LABEL_KEY, kafkaInstanceName);
return buildSecret(secretName, secretNamespace, labels);
}

public static Secret getStrimziUserSecret(final String secretName, final String secretNamespace, final String kafkaInstanceName) {
final Map<String, String> labels = new HashMap<>();
labels.put(KafkaAccessParser.MANAGED_BY_LABEL_KEY, KafkaAccessParser.STRIMZI_USER_LABEL_VALUE);
labels.put(KafkaAccessParser.STRIMZI_CLUSTER_LABEL_KEY, kafkaInstanceName);
labels.put(KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE);
labels.put(KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_KEY, kafkaInstanceName);
return buildSecret(secretName, secretNamespace, labels);
}

Expand Down
Loading

0 comments on commit 1444fac

Please sign in to comment.